长文前排提醒,收藏向前排提醒,素质三连 (转发 + 在看 + 留言) 前排提醒!

前言

Redis 作为一个开源的,高级的键值存储和一个适用的解决方案,已经越来越在构建 「高性能」「可扩展」 的 Web 应用上发挥着举足轻重的作用。

当今互联网技术架构中 Redis 已然成为了应用得最广泛的中间件之一,它也是中高级后端工程 技术面试 中面试官最喜欢问的工程技能之一,不仅仅要求着我们对 基本的使用 进行掌握,更要深层次地理解 Redis 内部实现 的细节原理。

熟练掌握 Redis,甚至可以毫不夸张地说已经半只脚踏入心仪的公司了。下面我们一起来盘点回顾一下 Redis 的面试经典问题,就不要再被面试官问得 脸都绿了 呀!

  • Ps: 我把 重要的知识点 都做成了 图片,希望各位 “用餐愉快”(不错记得付餐费.. 点个赞留个言..)

一、基础篇

什么是 Redis ?

先解释 Redis 基本概念

Redis (Remote Dictionary Server) 是一个使用 C 语言 编写的,开源的 (BSD许可) 高性能 非关系型 (NoSQL)键值对数据库

简单提一下 Redis 数据结构

Redis 可以存储 不同类型数据结构值 之间的映射关系。键的类型只能是字符串,而值除了支持最 基础的五种数据类型 外,还支持一些 高级数据类型

一定要说出一些高级数据结构 (当然你自己也要了解.. 下面会说到的别担心),这样面试官的眼睛才会亮。

Redis 小总结

与传统数据库不同的是 Redis 的数据是 存在内存 中的,所以 读写速度 非常 ,因此 Redis 被广泛应用于 缓存 方向,每秒可以处理超过 10 万次读写操作,是已知性能最快的 Key-Value 数据库。另外,Redis 也经常用来做 分布式锁

除此之外,Redis 支持事务 、持久化、LUA脚本、LRU驱动事件、多种集群方案。

Redis 优缺点

优点

  • 读写性能优异, Redis能读的速度是 110000 次/s,写的速度是 81000 次/s。
  • 支持数据持久化,支持 AOF 和 RDB 两种持久化方式。
  • 支持事务,Redis 的所有操作都是原子性的,同时 Redis 还支持对几个操作合并后的原子性执行。
  • 数据结构丰富,除了支持 string 类型的 value 外还支持 hash、set、zset、list 等数据结构。
  • 支持主从复制,主机会自动将数据同步到从机,可以进行读写分离。

缺点

  • 数据库 容量受到物理内存的限制,不能用作海量数据的高性能读写,因此 Redis 适合的场景主要局限在较小数据量的高性能操作和运算上。
  • Redis 不具备自动容错和恢复功能,主机从机的宕机都会导致前端部分读写请求失败,需要等待机器重启或者手动切换前端的 IP 才能恢复。
  • 主机宕机,宕机前有部分数据未能及时同步到从机,切换 IP 后还会引入数据不一致的问题,降低了 系统的可用性
  • Redis 较难支持在线扩容,在集群容量达到上限时在线扩容会变得很复杂。为避免这一问题,运维人员在系统上线时必须确保有足够的空间,这对资源造成了很大的浪费。

为什么要用缓存?为什么使用 Redis?

提一下现在 Web 应用的现状

在日常的 Web 应用对数据库的访问中,读操作的次数远超写操作,比例大概在 1:93:7,所以需要读的可能性是比写的可能大得多的。当我们使用 SQL 语句去数据库进行读写操作时,数据库就会 去磁盘把对应的数据索引取回来,这是一个相对较慢的过程。

使用 Redis or 使用缓存带来的优势

如果我们把数据放在 Redis 中,也就是直接放在内存之中,让服务端直接去读取内存中的数据,那么这样 速度 明显就会快上不少 (高性能),并且会 极大减小数据库的压力 (特别是在高并发情况下)

记得是 两个角度 啊.. 高性能高并发..

也要提一下使用缓存的考虑

但是使用内存进行数据存储开销也是比较大的,限于成本 的原因,一般我们只是使用 Redis 存储一些 常用和主要的数据,比如用户登录的信息等。

一般而言在使用 Redis 进行存储的时候,我们需要从以下几个方面来考虑:

  • 业务数据常用吗?命中率如何? 如果命中率很低,就没有必要写入缓存;
  • 该业务数据是读操作多,还是写操作多? 如果写操作多,频繁需要写入数据库,也没有必要使用缓存;
  • 业务数据大小如何? 如果要存储几百兆字节的文件,会给缓存带来很大的压力,这样也没有必要;

在考虑了这些问题之后,如果觉得有必要使用缓存,那么就使用它!

使用缓存会出现什么问题?

一般来说有如下几个问题,回答思路遵照 是什么为什么怎么解决

  1. 缓存雪崩问题;
  2. 缓存穿透问题;
  3. 缓存和数据库双写一致性问题;

缓存雪崩问题

另外对于 “Redis 挂掉了,请求全部走数据库” 这样的情况,我们还可以有如下的思路:

  • 事发前:实现 Redis 的高可用(主从架构 + Sentinel 或者 Redis Cluster),尽量避免 Redis 挂掉这种情况发生。
  • 事发中:万一 Redis 真的挂了,我们可以设置本地缓存(ehcache) + 限流(hystrix),尽量避免我们的数据库被干掉(起码能保证我们的服务还是能正常工作的)
  • 事发后:Redis 持久化,重启后自动从磁盘上加载数据,快速恢复缓存数据。

缓存穿透问题

缓存与数据库双写一致问题

双写一致性上图还是稍微粗糙了些,你还需要知道两种方案 (先操作数据库和先操作缓存) 分别都有什么优势和对应的问题,这里不作赘述,可以参考一下下方的文章,写得非常详细。

Redis 为什么早期版本选择单线程?

官方解释

因为 Redis 是基于内存的操作,CPU 不是 Redis 的瓶颈,Redis 的瓶颈最有可能是 机器内存的大小 或者 网络带宽。既然单线程容易实现,而且 CPU 不会成为瓶颈,那就顺理成章地采用单线程的方案了。

简单总结一下

  1. 使用单线程模型能带来更好的 可维护性,方便开发和调试;
  2. 使用单线程模型也能 并发 的处理客户端的请求;(I/O 多路复用机制)
  3. Redis 服务中运行的绝大多数操作的 性能瓶颈都不是 CPU

强烈推荐 各位亲看一下这篇文章:

Redis 为什么这么快?

简单总结:

  1. 纯内存操作:读取不需要进行磁盘 I/O,所以比传统数据库要快上不少;(但不要有误区说磁盘就一定慢,例如 Kafka 就是使用磁盘顺序读取但仍然较快)
  2. 单线程,无锁竞争:这保证了没有线程的上下文切换,不会因为多线程的一些操作而降低性能;
  3. 多路 I/O 复用模型,非阻塞 I/O:采用多路 I/O 复用技术可以让单个线程高效的处理多个网络连接请求(尽量减少网络 IO 的时间消耗);
  4. 高效的数据结构,加上底层做了大量优化:Redis 对于底层的数据结构和内存占用做了大量的优化,例如不同长度的字符串使用不同的结构体表示,HyperLogLog 的密集型存储结构等等..

二、数据结构篇

简述一下 Redis 常用数据结构及实现?

首先在 Redis 内部会使用一个 RedisObject 对象来表示所有的 keyvalue

其次 Redis 为了 平衡空间和时间效率,针对 value 的具体类型在底层会采用不同的数据结构来实现,下图展示了他们之间的映射关系:(好像乱糟糟的,但至少能看清楚..)

Redis 的 SDS 和 C 中字符串相比有什么优势?

先简单总结一下

C 语言使用了一个长度为 N+1 的字符数组来表示长度为 N 的字符串,并且字符数组最后一个元素总是 \0,这种简单的字符串表示方式 不符合 Redis 对字符串在安全性、效率以及功能方面的要求

再来说 C 语言字符串的问题

这样简单的数据结构可能会造成以下一些问题:

  • 获取字符串长度为 O(N) 级别的操作 → 因为 C 不保存数组的长度,每次都需要遍历一遍整个数组;
  • 不能很好的杜绝 缓冲区溢出/内存泄漏 的问题 → 跟上述问题原因一样,如果执行拼接 or 缩短字符串的操作,如果操作不当就很容易造成上述问题;
  • C 字符串 只能保存文本数据 → 因为 C 语言中的字符串必须符合某种编码(比如 ASCII),例如中间出现的 '\0' 可能会被判定为提前结束的字符串而识别不了;

Redis 如何解决的 | SDS 的优势

如果去看 Redis 的源码 sds.h/sdshdr 文件,你会看到 SDS 完整的实现细节,这里简单来说一下 Redis 如何解决的:

  1. 多增加 len 表示当前字符串的长度:这样就可以直接获取长度了,复杂度 O(1);
  2. 自动扩展空间:当 SDS 需要对字符串进行修改时,首先借助于 lenalloc 检查空间是否满足修改所需的要求,如果空间不够的话,SDS 会自动扩展空间,避免了像 C 字符串操作中的覆盖情况;
  3. 有效降低内存分配次数:C 字符串在涉及增加或者清除操作时会改变底层数组的大小造成重新分配,SDS 使用了 空间预分配惰性空间释放 机制,简单理解就是每次在扩展时是成倍的多分配的,在缩容是也是先留着并不正式归还给 OS;
  4. 二进制安全:C 语言字符串只能保存 ascii 码,对于图片、音频等信息无法保存,SDS 是二进制安全的,写入什么读取就是什么,不做任何过滤和限制;

字典是如何实现的?Rehash 了解吗?

先总体聊一下 Redis 中的字典

字典是 Redis 服务器中出现最为频繁的复合型数据结构。除了 hash 结构的数据会用到字典外,整个 Redis 数据库的所有 keyvalue 也组成了一个 全局字典,还有带过期时间的 key 也是一个字典。(存储在 RedisDb 数据结构中)

说明字典内部结构和 rehash

Redis 中的字典相当于 Java 中的 HashMap,内部实现也差不多类似,都是通过 “数组 + 链表”链地址法 来解决部分 哈希冲突,同时这样的结构也吸收了两种不同数据结构的优点。

字典结构内部包含 两个 hashtable,通常情况下只有一个 hashtable 有值,但是在字典扩容缩容时,需要分配新的 hashtable,然后进行 渐进式搬迁 (rehash),这时候两个 hashtable 分别存储旧的和新的 hashtable,待搬迁结束后,旧的将被删除,新的 hashtable 取而代之。

扩缩容的条件

正常情况下,当 hash 表中 元素的个数等于第一维数组的长度时,就会开始扩容,扩容的新数组是 原数组大小的 2 倍。不过如果 Redis 正在做 bgsave(持久化命令),为了减少内存也得过多分离,Redis 尽量不去扩容,但是如果 hash 表非常满了,达到了第一维数组长度的 5 倍了,这个时候就会 强制扩容

当 hash 表因为元素逐渐被删除变得越来越稀疏时,Redis 会对 hash 表进行缩容来减少 hash 表的第一维数组空间占用。所用的条件是 元素个数低于数组长度的 10%,缩容不会考虑 Redis 是否在做 bgsave

跳跃表是如何实现的?原理?

这是 Redis 中比较重要的一个数据结构,建议阅读 之前写过的文章,里面详细介绍了原理和一些细节:

HyperLogLog 有了解吗?

建议阅读 之前的系列文章:

布隆过滤器有了解吗?

建议阅读 之前的系列文章:

GeoHash 了解吗?

建议阅读 之前的系列文章:

压缩列表了解吗?

这是 Redis 为了节约内存 而使用的一种数据结构,zsethash 容器对象会在元素个数较少的时候,采用压缩列表(ziplist)进行存储。压缩列表是 一块连续的内存空间,元素之间紧挨着存储,没有任何冗余空隙。

因为之前自己也没有学习过,所以找了一篇比较好比较容易理解的文章:

快速列表 quicklist 了解吗?

Redis 早期版本存储 list 列表数据结构使用的是压缩列表 ziplist 和普通的双向链表 linkedlist,也就是说当元素少时使用 ziplist,当元素多时用 linkedlist。但考虑到链表的附加空间相对较高,prevnext 指针就要占去 16 个字节(64 位操作系统占用 8 个字节),另外每个节点的内存都是单独分配,会家具内存的碎片化,影响内存管理效率。

后来 Redis 新版本(3.2)对列表数据结构进行了改造,使用 quicklist 代替了 ziplistlinkedlist

同上..建议阅读一下以下的文章:

Stream 结构有了解吗?

Redis Stream 从概念上来说,就像是一个 仅追加内容消息链表,把所有加入的消息都一个一个串起来,每个消息都有一个唯一的 ID 和内容,这很简单,让它复杂的是从 Kafka 借鉴的另一种概念:消费者组(Consumer Group) (思路一致,实现不同)

上图就展示了一个典型的 Stream 结构。每个 Stream 都有唯一的名称,它就是 Redis 的 key,在我们首次使用 xadd 指令追加消息时自动创建。我们对图中的一些概念做一下解释:

  • Consumer Group:消费者组,可以简单看成记录流状态的一种数据结构。消费者既可以选择使用 XREAD 命令进行 独立消费,也可以多个消费者同时加入一个消费者组进行 组内消费。同一个消费者组内的消费者共享所有的 Stream 信息,同一条消息只会有一个消费者消费到,这样就可以应用在分布式的应用场景中来保证消息的唯一性。
  • last_delivered_id:用来表示消费者组消费在 Stream 上 消费位置 的游标信息。每个消费者组都有一个 Stream 内 唯一的名称,消费者组不会自动创建,需要使用 XGROUP CREATE 指令来显式创建,并且需要指定从哪一个消息 ID 开始消费,用来初始化 last_delivered_id 这个变量。
  • pending_ids:每个消费者内部都有的一个状态变量,用来表示 已经 被客户端 获取,但是 还没有 ack 的消息。记录的目的是为了 保证客户端至少消费了消息一次,而不会在网络传输的中途丢失而没有对消息进行处理。如果客户端没有 ack,那么这个变量里面的消息 ID 就会越来越多,一旦某个消息被 ack,它就会对应开始减少。这个变量也被 Redis 官方称为 PEL (Pending Entries List)

Stream 消息太多怎么办?

很容易想到,要是消息积累太多,Stream 的链表岂不是很长,内容会不会爆掉就是个问题了。xdel 指令又不会删除消息,它只是给消息做了个标志位。

Redis 自然考虑到了这一点,所以它提供了一个定长 Stream 功能。在 xadd 的指令提供一个定长长度 maxlen,就可以将老的消息干掉,确保最多不超过指定长度,使用起来也很简单:

> XADD mystream MAXLEN 2 * value 1
1526654998691-0
> XADD mystream MAXLEN 2 * value 2
1526654999635-0
> XADD mystream MAXLEN 2 * value 3
1526655000369-0
> XLEN mystream
(integer) 2
> XRANGE mystream - +
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

如果使用 MAXLEN 选项,当 Stream 的达到指定长度后,老的消息会自动被淘汰掉,因此 Stream 的大小是恒定的。目前还没有选项让 Stream 只保留给定数量的条目,因为为了一致地运行,这样的命令必须在很长一段时间内阻塞以淘汰消息。(例如在添加数据的高峰期间,你不得不长暂停来淘汰旧消息和添加新的消息)

另外使用 MAXLEN 选项的花销是很大的,Stream 为了节省内存空间,采用了一种特殊的结构表示,而这种结构的调整是需要额外的花销的。所以我们可以使用一种带有 ~ 的特殊命令:

XADD mystream MAXLEN ~ 1000 * ... entry fields here ...

它会基于当前的结构合理地对节点执行裁剪,来保证至少会有 1000 条数据,可能是 1010 也可能是 1030

PEL 是如何避免消息丢失的?

在客户端消费者读取 Stream 消息时,Redis 服务器将消息回复给客户端的过程中,客户端突然断开了连接,消息就丢失了。但是 PEL 里已经保存了发出去的消息 ID,待客户端重新连上之后,可以再次收到 PEL 中的消息 ID 列表。不过此时 xreadgroup 的起始消息 ID 不能为参数 > ,而必须是任意有效的消息 ID,一般将参数设为 0-0,表示读取所有的 PEL 消息以及自 last_delivered_id 之后的新消息。

和 Kafka 对比起来呢?

Redis 基于内存存储,这意味着它会比基于磁盘的 Kafka 快上一些,也意味着使用 Redis 我们 不能长时间存储大量数据。不过如果您想以 最小延迟 实时处理消息的话,您可以考虑 Redis,但是如果 消息很大并且应该重用数据 的话,则应该首先考虑使用 Kafka。

另外从某些角度来说,Redis Stream 也更适用于小型、廉价的应用程序,因为 Kafka 相对来说更难配置一些。

推荐阅读 之前的系列文章,里面 也对 Pub/ Sub 做了详细的描述

三、持久化篇

什么是持久化?

先简单谈一谈是什么

Redis 的数据 全部存储内存 中,如果 突然宕机,数据就会全部丢失,因此必须有一套机制来保证 Redis 的数据不会因为故障而丢失,这种机制就是 Redis 的 持久化机制,它会将内存中的数据库状态 保存到磁盘 中。

解释一下持久化发生了什么

我们来稍微考虑一下 Redis 作为一个 “内存数据库” 要做的关于持久化的事情。通常来说,从客户端发起请求开始,到服务器真实地写入磁盘,需要发生如下几件事情:

详细版 的文字描述大概就是下面这样:

  1. 客户端向数据库 发送写命令 (数据在客户端的内存中)
  2. 数据库 接收 到客户端的 写请求 (数据在服务器的内存中)
  3. 数据库 调用系统 API 将数据写入磁盘 (数据在内核缓冲区中)
  4. 操作系统将 写缓冲区 传输到 磁盘控控制器 (数据在磁盘缓存中)
  5. 操作系统的磁盘控制器将数据 写入实际的物理媒介(数据在磁盘中)

分析如何保证持久化安全

如果我们故障仅仅涉及到 软件层面 (该进程被管理员终止或程序崩溃) 并且没有接触到内核,那么在 上述步骤 3 成功返回之后,我们就认为成功了。即使进程崩溃,操作系统仍然会帮助我们把数据正确地写入磁盘。

如果我们考虑 停电/ 火灾更具灾难性 的事情,那么只有在完成了第 5 步之后,才是安全的。

机房”火了“

所以我们可以总结得出数据安全最重要的阶段是:步骤三、四、五,即:

  • 数据库软件调用写操作将用户空间的缓冲区转移到内核缓冲区的频率是多少?
  • 内核多久从缓冲区取数据刷新到磁盘控制器?
  • 磁盘控制器多久把数据写入物理媒介一次?
  • 注意: 如果真的发生灾难性的事件,我们可以从上图的过程中看到,任何一步都可能被意外打断丢失,所以只能 尽可能地保证 数据的安全,这对于所有数据库来说都是一样的。

我们从 第三步 开始。Linux 系统提供了清晰、易用的用于操作文件的 POSIX file API20 多年过去,仍然还有很多人对于这一套 API 的设计津津乐道,我想其中一个原因就是因为你光从 API 的命名就能够很清晰地知道这一套 API 的用途:

int open(const char *path, int oflag, .../*,mode_t mode */);
int close (int filedes);int remove( const char *fname );
ssize_t write(int fildes, const void *buf, size_t nbyte);
ssize_t read(int fildes, void *buf, size_t nbyte);

所以,我们有很好的可用的 API 来完成 第三步,但是对于成功返回之前,我们对系统调用花费的时间没有太多的控制权。

然后我们来说说 第四步。我们知道,除了早期对电脑特别了解那帮人 (操作系统就这帮人搞的),实际的物理硬件都不是我们能够 直接操作 的,都是通过 操作系统调用 来达到目的的。为了防止过慢的 I/O 操作拖慢整个系统的运行,操作系统层面做了很多的努力,譬如说 上述第四步 提到的 写缓冲区,并不是所有的写操作都会被立即写入磁盘,而是要先经过一个缓冲区,默认情况下,Linux 将在 30 秒 后实际提交写入。

但是很明显,30 秒 并不是 Redis 能够承受的,这意味着,如果发生故障,那么最近 30 秒内写入的所有数据都可能会丢失。幸好 PROSIX API 提供了另一个解决方案:fsync,该命令会 强制 内核将 缓冲区 写入 磁盘,但这是一个非常消耗性能的操作,每次调用都会 阻塞等待 直到设备报告 IO 完成,所以一般在生产环境的服务器中,Redis 通常是每隔 1s 左右执行一次 fsync 操作。

到目前为止,我们了解到了如何控制 第三步第四步,但是对于 第五步,我们 完全无法控制。也许一些内核实现将试图告诉驱动实际提交物理介质上的数据,或者控制器可能会为了提高速度而重新排序写操作,不会尽快将数据真正写到磁盘上,而是会等待几个多毫秒。这完全是我们无法控制的。

普通人简单说一下第一条就过了,如果你详细地对后面两方面 侃侃而谈,那面试官就会对你另眼相看了。

Redis 中的两种持久化方式?

方式一:快照

Redis 快照 是最简单的 Redis 持久性模式。当满足特定条件时,它将生成数据集的时间点快照,例如,如果先前的快照是在 2 分钟前创建的,并且现在已经至少有 100 次新写入,则将创建一个新的快照。此条件可以由用户配置 Redis 实例来控制,也可以在运行时修改而无需重新启动服务器。快照作为包含整个数据集的单个 .rdb 文件生成。

方式二:AOF

快照不是很持久。如果运行 Redis 的计算机停止运行,电源线出现故障或者您 kill -9 的实例意外发生,则写入 Redis 的最新数据将丢失。尽管这对于某些应用程序可能不是什么大问题,但有些使用案例具有充分的耐用性,在这些情况下,快照并不是可行的选择。

AOF(Append Only File - 仅追加文件) 它的工作方式非常简单:每次执行 修改内存 中数据集的写操作时,都会 记录 该操作。假设 AOF 日志记录了自 Redis 实例创建以来 所有的修改性指令序列,那么就可以通过对一个空的 Redis 实例 顺序执行所有的指令,也就是 「重放」,来恢复 Redis 当前实例的内存数据结构的状态。

Redis 4.0 的混合持久化

重启 Redis 时,我们很少使用 rdb 来恢复内存状态,因为会丢失大量数据。我们通常使用 AOF 日志重放,但是重放 AOF 日志性能相对 rdb 来说要慢很多,这样在 Redis 实例很大的情况下,启动需要花费很长的时间。

Redis 4.0 为了解决这个问题,带来了一个新的持久化选项——混合持久化。将 rdb 文件的内容和增量的 AOF 日志文件存在一起。这里的 AOF 日志不再是全量的日志,而是 自持久化开始到持久化结束 的这段时间发生的增量 AOF 日志,通常这部分 AOF 日志很小:

于是在 Redis 重启的时候,可以先加载 rdb 的内容,然后再重放增量 AOF 日志就可以完全替代之前的 AOF 全量文件重放,重启效率因此大幅得到提升。

关于两种持久化方式的更多细节 (原理) 可以参考:

RDB 和 AOF 各自有什么优缺点?

RDB | 优点

  1. 只有一个文件 dump.rdb方便持久化
  2. 容灾性好,一个文件可以保存到安全的磁盘。
  3. 性能最大化fork 子进程来完成写操作,让主进程继续处理命令,所以使 IO 最大化。使用单独子进程来进行持久化,主进程不会进行任何 IO 操作,保证了 Redis 的高性能
  4. 相对于数据集大时,比 AOF 的 启动效率 更高。

RDB | 缺点

  1. 数据安全性低。RDB 是间隔一段时间进行持久化,如果持久化之间 Redis 发生故障,会发生数据丢失。所以这种方式更适合数据要求不严谨的时候;

AOF | 优点

  1. 数据安全,aof 持久化可以配置 appendfsync 属性,有 always,每进行一次命令操作就记录到 aof 文件中一次。
  2. 通过 append 模式写文件,即使中途服务器宕机,可以通过 redis-check-aof 工具解决数据一致性问题。
  3. AOF 机制的 rewrite 模式。AOF 文件没被 rewrite 之前(文件过大时会对命令 进行合并重写),可以删除其中的某些命令(比如误操作的 flushall)

AOF | 缺点

  1. AOF 文件比 RDB 文件大,且 恢复速度慢
  2. 数据集大 的时候,比 rdb 启动效率低

两种方式如何选择?

  • 一般来说, 如果想达到足以媲美 PostgreSQL 的 数据安全性,你应该 同时使用两种持久化功能。在这种情况下,当 Redis 重启的时候会优先载入 AOF 文件来恢复原始的数据,因为在通常情况下 AOF 文件保存的数据集要比 RDB 文件保存的数据集要完整。
  • 如果你非常关心你的数据, 但仍然 可以承受数分钟以内的数据丢失,那么你可以 只使用 RDB 持久化
  • 有很多用户都只使用 AOF 持久化,但并不推荐这种方式,因为定时生成 RDB 快照(snapshot)非常便于进行数据库备份, 并且 RDB 恢复数据集的速度也要比 AOF 恢复的速度要快,除此之外,使用 RDB 还可以避免 AOF 程序的 bug。
  • 如果你只希望你的数据在服务器运行的时候存在,你也可以不使用任何持久化方式。

Redis 的数据恢复

Redis 的数据恢复有着如下的优先级:

  1. 如果只配置 AOF ,重启时加载 AOF 文件恢复数据;
  2. 如果同时配置了 RDB 和 AOF ,启动只加载 AOF 文件恢复数据;
  3. 如果只配置 RDB,启动将加载 dump 文件恢复数据。

拷贝 AOF 文件到 Redis 的数据目录,启动 redis-server AOF 的数据恢复过程:Redis 虚拟一个客户端,读取 AOF 文件恢复 Redis 命令和参数,然后执行命令从而恢复数据,这些过程主要在 loadAppendOnlyFile() 中实现。

拷贝 RDB 文件到 Redis 的数据目录,启动 redis-server 即可,因为 RDB 文件和重启前保存的是真实数据而不是命令状态和参数。

四、集群篇

主从同步了解吗?

主从复制,是指将一台 Redis 服务器的数据,复制到其他的 Redis 服务器。前者称为 主节点(master),后者称为 从节点(slave)。且数据的复制是 单向 的,只能由主节点到从节点。Redis 主从复制支持 主从同步从从同步 两种,后者是 Redis 后续版本新增的功能,以减轻主节点的同步负担。

主从复制主要的作用

  • 数据冗余: 主从复制实现了数据的热备份,是持久化之外的一种数据冗余方式。
  • 故障恢复: 当主节点出现问题时,可以由从节点提供服务,实现快速的故障恢复 (实际上是一种服务的冗余)
  • 负载均衡: 在主从复制的基础上,配合读写分离,可以由主节点提供写服务,由从节点提供读服务 (即写 Redis 数据时应用连接主节点,读 Redis 数据时应用连接从节点),分担服务器负载。尤其是在写少读多的场景下,通过多个从节点分担读负载,可以大大提高 Redis 服务器的并发量。
  • 高可用基石: 除了上述作用以外,主从复制还是哨兵和集群能够实施的 基础,因此说主从复制是 Redis 高可用的基础。

实现原理

为了节省篇幅,我把主要的步骤都 浓缩 在了上图中,其实也可以 简化成三个阶段:准备阶段-数据同步阶段-命令传播阶段

更多细节 推荐阅读 之前的系列文章,不仅有原理讲解,还有实战环节:

哨兵模式了解吗?

上图 展示了一个典型的哨兵架构图,它由两部分组成,哨兵节点和数据节点:

  • 哨兵节点: 哨兵系统由一个或多个哨兵节点组成,哨兵节点是特殊的 Redis 节点,不存储数据;
  • 数据节点: 主节点和从节点都是数据节点;

在复制的基础上,哨兵实现了 自动化的故障恢复 功能,下方是官方对于哨兵功能的描述:

  • 监控(Monitoring): 哨兵会不断地检查主节点和从节点是否运作正常。
  • 自动故障转移(Automatic failover):主节点 不能正常工作时,哨兵会开始 自动故障转移操作,它会将失效主节点的其中一个 从节点升级为新的主节点,并让其他从节点改为复制新的主节点。
  • 配置提供者(Configuration provider): 客户端在初始化时,通过连接哨兵来获得当前 Redis 服务的主节点地址。
  • 通知(Notification): 哨兵可以将故障转移的结果发送给客户端。

其中,监控和自动故障转移功能,使得哨兵可以及时发现主节点故障并完成转移。而配置提供者和通知功能,则需要在与客户端的交互中才能体现。

新的主服务器是怎样被挑选出来的?

故障转移操作的第一步 要做的就是在已下线主服务器属下的所有从服务器中,挑选出一个状态良好、数据完整的从服务器,然后向这个从服务器发送 slaveof no one 命令,将这个从服务器转换为主服务器。但是这个从服务器是怎么样被挑选出来的呢?

简单来说 Sentinel 使用以下规则来选择新的主服务器:

  1. 在失效主服务器属下的从服务器当中, 那些被标记为主观下线、已断线、或者最后一次回复 PING 命令的时间大于五秒钟的从服务器都会被 淘汰
  2. 在失效主服务器属下的从服务器当中, 那些与失效主服务器连接断开的时长超过 down-after 选项指定的时长十倍的从服务器都会被 淘汰
  3. 经历了以上两轮淘汰之后 剩下来的从服务器中, 我们选出 复制偏移量(replication offset)最大 的那个 从服务器 作为新的主服务器;如果复制偏移量不可用,或者从服务器的复制偏移量相同,那么 带有最小运行 ID 的那个从服务器成为新的主服务器。

更多细节 推荐阅读 之前的系列文章,不仅有原理讲解,还有实战环节:

Redis 集群使用过吗?原理?

上图 展示了 Redis Cluster 典型的架构图,集群中的每一个 Redis 节点都 互相两两相连,客户端任意 直连 到集群中的 任意一台,就可以对其他 Redis 节点进行 读写 的操作。

基本原理

Redis 集群中内置了 16384 个哈希槽。当客户端连接到 Redis 集群之后,会同时得到一份关于这个 集群的配置信息,当客户端具体对某一个 key 值进行操作时,会计算出它的一个 Hash 值,然后把结果对 16384 求余数,这样每个 key 都会对应一个编号在 0-16383 之间的哈希槽,Redis 会根据节点数量 大致均等 的将哈希槽映射到不同的节点。

再结合集群的配置信息就能够知道这个 key 值应该存储在哪一个具体的 Redis 节点中,如果不属于自己管,那么就会使用一个特殊的 MOVED 命令来进行一个跳转,告诉客户端去连接这个节点以获取数据:

GET x
-MOVED 3999 127.0.0.1:6381

MOVED 指令第一个参数 3999key 对应的槽位编号,后面是目标节点地址,MOVED 命令前面有一个减号,表示这是一个错误的消息。客户端在收到 MOVED 指令后,就立即纠正本地的 槽位映射表,那么下一次再访问 key 时就能够到正确的地方去获取了。

集群的主要作用

  1. 数据分区: 数据分区 (或称数据分片) 是集群最核心的功能。集群将数据分散到多个节点,一方面 突破了 Redis 单机内存大小的限制,存储容量大大增加另一方面 每个主节点都可以对外提供读服务和写服务,大大提高了集群的响应能力。Redis 单机内存大小受限问题,在介绍持久化和主从复制时都有提及,例如,如果单机内存太大,bgsavebgrewriteaoffork 操作可能导致主进程阻塞,主从环境下主机切换时可能导致从节点长时间无法提供服务,全量复制阶段主节点的复制缓冲区可能溢出……
  2. 高可用: 集群支持主从复制和主节点的 自动故障转移 (与哨兵类似),当任一节点发生故障时,集群仍然可以对外提供服务。

集群中数据如何分区?

Redis 采用方案三。

方案一:哈希值 % 节点数

哈希取余分区思路非常简单:计算 key 的 hash 值,然后对节点数量进行取余,从而决定数据映射到哪个节点上。

不过该方案最大的问题是,当新增或删减节点时,节点数量发生变化,系统中所有的数据都需要 重新计算映射关系,引发大规模数据迁移。

方案二:一致性哈希分区

一致性哈希算法将 整个哈希值空间 组织成一个虚拟的圆环,范围是 *[0 - 232 - 1]*,对于每一个数据,根据 key 计算 hash 值,确数据在环上的位置,然后从此位置沿顺时针行走,找到的第一台服务器就是其应该映射到的服务器:

与哈希取余分区相比,一致性哈希分区将 增减节点的影响限制在相邻节点。以上图为例,如果在 node1node2 之间增加 node5,则只有 node2 中的一部分数据会迁移到 node5;如果去掉 node2,则原 node2 中的数据只会迁移到 node4 中,只有 node4 会受影响。

一致性哈希分区的主要问题在于,当 节点数量较少 时,增加或删减节点,对单个节点的影响可能很大,造成数据的严重不平衡。还是以上图为例,如果去掉 node2node4 中的数据由总数据的 1/4 左右变为 1/2 左右,与其他节点相比负载过高。

方案三:带有虚拟节点的一致性哈希分区

该方案在 一致性哈希分区的基础上,引入了 虚拟节点 的概念。Redis 集群使用的便是该方案,其中的虚拟节点称为 槽(slot)。槽是介于数据和实际节点之间的虚拟概念,每个实际节点包含一定数量的槽,每个槽包含哈希值在一定范围内的数据。

在使用了槽的一致性哈希分区中,槽是数据管理和迁移的基本单位。槽 解耦数据和实际节点 之间的关系,增加或删除节点对系统的影响很小。仍以上图为例,系统中有 4 个实际节点,假设为其分配 16 个槽(0-15);

  • 槽 0-3 位于 node1;4-7 位于 node2;以此类推….

如果此时删除 node2,只需要将槽 4-7 重新分配即可,例如槽 4-5 分配给 node1,槽 6 分配给 node3,槽 7 分配给 node4;可以看出删除 node2 后,数据在其他节点的分布仍然较为均衡。

节点之间的通信机制了解吗?

集群的建立离不开节点之间的通信,例如我们在 快速体验 中刚启动六个集群节点之后通过 redis-cli 命令帮助我们搭建起来了集群,实际上背后每个集群之间的两两连接是通过了 CLUSTER MEET <ip> <port> 命令发送 MEET 消息完成的,下面我们展开详细说说。

两个端口

哨兵系统 中,节点分为 数据节点哨兵节点:前者存储数据,后者实现额外的控制功能。在 集群 中,没有数据节点与非数据节点之分:所有的节点都存储数据,也都参与集群状态的维护。为此,集群中的每个节点,都提供了两个 TCP 端口:

  • 普通端口: 即我们在前面指定的端口 (7000等)。普通端口主要用于为客户端提供服务 (与单机节点类似);但在节点间数据迁移时也会使用。
  • 集群端口: 端口号是普通端口 + 10000 (10000是固定值,无法改变),如 7000 节点的集群端口为 17000集群端口只用于节点之间的通信,如搭建集群、增减节点、故障转移等操作时节点间的通信;不要使用客户端连接集群接口。为了保证集群可以正常工作,在配置防火墙时,要同时开启普通端口和集群端口。

Gossip 协议

节点间通信,按照通信协议可以分为几种类型:单对单、广播、Gossip 协议等。重点是广播和 Gossip 的对比。

  • 广播是指向集群内所有节点发送消息。优点 是集群的收敛速度快(集群收敛是指集群内所有节点获得的集群信息是一致的),缺点 是每条消息都要发送给所有节点,CPU、带宽等消耗较大。
  • Gossip 协议的特点是:在节点数量有限的网络中,每个节点都 “随机” 的与部分节点通信 (并不是真正的随机,而是根据特定的规则选择通信的节点),经过一番杂乱无章的通信,每个节点的状态很快会达到一致。Gossip 协议的 *优点** 有负载 *(比广播) 低、去中心化、容错性高 (因为通信有冗余) 等;缺点 主要是集群的收敛速度慢。

消息类型

集群中的节点采用 固定频率(每秒10次)定时任务 进行通信相关的工作:判断是否需要发送消息及消息类型、确定接收节点、发送消息等。如果集群状态发生了变化,如增减节点、槽状态变更,通过节点间的通信,所有节点会很快得知整个集群的状态,使集群收敛。

节点间发送的消息主要分为 5 种:meet 消息ping 消息pong 消息fail 消息publish 消息。不同的消息类型,通信协议、发送的频率和时机、接收节点的选择等是不同的:

  • MEET 消息: 在节点握手阶段,当节点收到客户端的 CLUSTER MEET 命令时,会向新加入的节点发送 MEET 消息,请求新节点加入到当前集群;新节点收到 MEET 消息后会回复一个 PONG 消息。
  • PING 消息: 集群里每个节点每秒钟会选择部分节点发送 PING 消息,接收者收到消息后会回复一个 PONG 消息。PING 消息的内容是自身节点和部分其他节点的状态信息,作用是彼此交换信息,以及检测节点是否在线。PING 消息使用 Gossip 协议发送,接收节点的选择兼顾了收敛速度和带宽成本,具体规则如下:(1)随机找 5 个节点,在其中选择最久没有通信的 1 个节点;(2)扫描节点列表,选择最近一次收到 PONG 消息时间大于 cluster_node_timeout / 2 的所有节点,防止这些节点长时间未更新。
  • PONG消息: PONG 消息封装了自身状态数据。可以分为两种:第一种 是在接到 MEET/PING 消息后回复的 PONG 消息;第二种 是指节点向集群广播 PONG 消息,这样其他节点可以获知该节点的最新信息,例如故障恢复后新的主节点会广播 PONG 消息。
  • FAIL 消息: 当一个主节点判断另一个主节点进入 FAIL 状态时,会向集群广播这一 FAIL 消息;接收节点会将这一 FAIL 消息保存起来,便于后续的判断。
  • PUBLISH 消息: 节点收到 PUBLISH 命令后,会先执行该命令,然后向集群广播这一消息,接收节点也会执行该 PUBLISH 命令。

集群数据如何存储的有了解吗?

节点需要专门的数据结构来存储集群的状态。所谓集群的状态,是一个比较大的概念,包括:集群是否处于上线状态、集群中有哪些节点、节点是否可达、节点的主从状态、槽的分布……

节点为了存储集群状态而提供的数据结构中,最关键的是 clusterNodeclusterState 结构:前者记录了一个节点的状态,后者记录了集群作为一个整体的状态。

clusterNode 结构

clusterNode 结构保存了 一个节点的当前状态,包括创建时间、节点 id、ip 和端口号等。每个节点都会用一个 clusterNode 结构记录自己的状态,并为集群内所有其他节点都创建一个 clusterNode 结构来记录节点状态。

下面列举了 clusterNode 的部分字段,并说明了字段的含义和作用:

typedef struct clusterNode {
    //节点创建时间
    mstime_t ctime;
    //节点id
    char name[REDIS_CLUSTER_NAMELEN];
    //节点的ip和端口号
    char ip[REDIS_IP_STR_LEN];
    int port;
    //节点标识:整型,每个bit都代表了不同状态,如节点的主从状态、是否在线、是否在握手等
    int flags;
    //配置纪元:故障转移时起作用,类似于哨兵的配置纪元
    uint64_t configEpoch;
    //槽在该节点中的分布:占用16384/8个字节,16384个比特;每个比特对应一个槽:比特值为1,则该比特对应的槽在节点中;比特值为0,则该比特对应的槽不在节点中
    unsigned char slots[16384/8];
    //节点中槽的数量
    int numslots;
    …………
} clusterNode;

除了上述字段,clusterNode 还包含节点连接、主从复制、故障发现和转移需要的信息等。

clusterState 结构

clusterState 结构保存了在当前节点视角下,集群所处的状态。主要字段包括:

typedef struct clusterState {
    //自身节点
    clusterNode *myself;
    //配置纪元
    uint64_t currentEpoch;
    //集群状态:在线还是下线
    int state;
    //集群中至少包含一个槽的节点数量
    int size;
    //哈希表,节点名称->clusterNode节点指针
    dict *nodes;
    //槽分布信息:数组的每个元素都是一个指向clusterNode结构的指针;如果槽还没有分配给任何节点,则为NULL
    clusterNode *slots[16384];
    …………
} clusterState;

除此之外,clusterState 还包括故障转移、槽迁移等需要的信息。

五、其他问题

Redis 如何实现分布式锁?

推荐阅读 之前的系列文章:
Redis(3)——分布式锁深入探究 - https://www.wmyskxz.com/2020/03/01/redis-3/

Redis 过期键的删除策略?

简单描述

先抛开 Redis 想一下几种可能的删除策略:

  1. 定时删除:在设置键的过期时间的同时,创建一个定时器 timer). 让定时器在键的过期时间来临时,立即执行对键的删除操作。
  2. 惰性删除:放任键过期不管,但是每次从键空间中获取键时,都检查取得的键是否过期,如果过期的话,就删除该键;如果没有过期,就返回该键。
  3. 定期删除:每隔一段时间程序就对数据库进行一次检查,删除里面的过期键。至于要删除多少过期键,以及要检查多少个数据库,则由算法决定。

在上述的三种策略中定时删除和定期删除属于不同时间粒度的 主动删除,惰性删除属于 被动删除

三种策略都有各自的优缺点

  1. 定时删除对内存使用率有优势,但是对 CPU 不友好;
  2. 惰性删除对内存不友好,如果某些键值对一直不被使用,那么会造成一定量的内存浪费;
  3. 定期删除是定时删除和惰性删除的折中。

Redis 中的实现

Reids 采用的是 惰性删除和定时删除 的结合,一般来说可以借助最小堆来实现定时器,不过 Redis 的设计考虑到时间事件的有限种类和数量,使用了无序链表存储时间事件,这样如果在此基础上实现定时删除,就意味着 O(N) 遍历获取最近需要删除的数据。

Redis 的淘汰策略有哪些?

Redis 有六种淘汰策略

策略 描述
volatile-lru 从已设置过期时间的 KV 集中优先对最近最少使用(less recently used)的数据淘汰
volitile-ttl 从已设置过期时间的 KV 集中优先对剩余时间短(time to live)的数据淘汰
volitile-random 从已设置过期时间的 KV 集中随机选择数据淘汰
allkeys-lru 从所有 KV 集中优先对最近最少使用(less recently used)的数据淘汰
allKeys-random 从所有 KV 集中随机选择数据淘汰
noeviction 不淘汰策略,若超过最大内存,返回错误信息

4.0 版本后增加以下两种

  • volatile-lfu:从已设置过期时间的数据集(server.db[i].expires)中挑选最不经常使用的数据淘汰
  • allkeys-lfu:当内存不足以容纳新写入数据时,在键空间中,移除最不经常使用的 key

Redis常见性能问题和解决方案?

  1. Master 最好不要做任何持久化工作,包括内存快照和 AOF 日志文件,特别是不要启用内存快照做持久化。
  2. 如果数据比较关键,某个 Slave 开启 AOF 备份数据,策略为每秒同步一次。
  3. 为了主从复制的速度和连接的稳定性,Slave 和 Master 最好在同一个局域网内。
  4. 尽量避免在压力较大的主库上增加从库。
  5. Master 调用 BGREWRITEAOF 重写 AOF 文件,AOF 在重写的时候会占大量的 CPU 和内存资源,导致服务 load 过高,出现短暂服务暂停现象。
  6. 为了 Master 的稳定性,主从复制不要用图状结构,用单向链表结构更稳定,即主从关系为:Master<–Slave1<–Slave2<–Slave3…,这样的结构也方便解决单点故障问题,实现 Slave 对 Master 的替换,也即,如果 Master 挂了,可以立马启用 Slave1 做 Master,其他不变。

假如Redis里面有1亿个key,其中有10w个key是以某个固定的已知的前缀开头的,如何将它们全部找出来?

使用 keys 指令可以扫出指定模式的 key 列表。但是要注意 keys 指令会导致线程阻塞一段时间,线上服务会停顿,直到指令执行完毕,服务才能恢复。这个时候可以使用 scan 指令,scan 指令可以无阻塞的提取出指定模式的 key 列表,但是会有一定的重复概率,在客户端做一次去重就可以了,但是整体所花费的时间会比直接用 keys 指令长。

More..

参考资料

  1. 3w字深度好文|Redis面试全攻略,读完这个就可以和面试官大战几个回合了 - https://mp.weixin.qq.com/s/f9N13fnyTtnu2D5sKZiu9w
  2. 大厂面试!我和面试官之间关于Redis的一场对弈! - https://mp.weixin.qq.com/s/DHTPSfmWTZpdTmlytzLz1g
  3. Redis面试题(2020最新版) - https://blog.csdn.net/ThinkWon/article/details/103522351
  • 本文已收录至我的 Github 程序员成长系列 【More Than Java】,学习,不止 Code,欢迎 star:https://github.com/wmyskxz/MoreThanJava
  • 个人公众号 :wmyskxz,个人独立域名博客:wmyskxz.com,坚持原创输出,下方扫码关注,2020,与您共同成长!

非常感谢各位人才能 看到这里,如果觉得本篇文章写得不错,觉得 「我没有三颗心脏」有点东西 的话,求点赞,求关注,求分享,求留言!

创作不易,各位的支持和认可,就是我创作的最大动力,我们下篇文章见!

前言:不管是远程的视频面试,还是现场的面试,都有可能会有手撕代码的环节,这也是很多童鞋包括我(虽然还没遇到过..)都很头疼的东西,可能是因为 IDE 自动提示功能用惯了或是其他一些原因,总之让我手写代码就是感觉很奇怪..但是我想的话,这应该侧重考察的是一些细节或者是习惯方面的一些东西,所以还是防患于未然吧,把一些可能手撕的代码给准备准备,分享分享,希望可以得到各位的指正,然后能有一些讨论,由于我字太丑就不上传自己默写的代码了,但还是希望各位潦草写一遍加深一下印象吧,以上;


Part 1.生产者-消费者问题

这绝对是属于重点了,不管是考察对于该重要模型的理解还是考察代码能力,这都是一道很好的考题,所以很有必要的,我们先来回顾一下什么是生产者-消费者问题;

问题简单回顾

生产者消费者问题(英语:Producer-consumer problem),也称有限缓冲问题(英语:Bounded-buffer problem),是一个多线程同步问题的经典案例。该问题描述了共享固定大小缓冲区的两个线程——即所谓的“生产者”和“消费者”——在实际运行时会发生的问题。生产者的主要作用是生成一定量的数据放到缓冲区中,然后重复此过程。与此同时,消费者也在缓冲区消耗这些数据。该问题的关键就是要保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中空时消耗数据。(摘自维基百科:生产者消费者问题)

  • 注意: 生产者-消费者模式中的内存缓存区的主要功能是数据在多线程间的共享,此外,通过该缓冲区,可以缓解生产者和消费者的性能差;

几种实现方式

上面说到该问题的关键是:如何保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区空时消耗数据;解决思路可以简单概括为:

  • 生产者持续生产,直到缓冲区满,满时阻塞;缓冲区不满后,继续生产;
  • 消费者持续消费,直到缓冲区空,空时阻塞;缓冲区不空后,继续消费;
  • 生产者和消费者都可以有多个;

那么在 Java 语言中,能达到上述要求的,自然而然的就会有如下的几种写法,但是问题的核心都是能够让消费者和生产者在各自满足条件需要阻塞时能够起到正确的作用:

  • wait()/notify()方式;
  • await()/signal()方式;
  • BlockingQueue阻塞队列方式;
  • PipedInputStream/PipedOutputStream方式;

手写代码,我们着重掌握上面对应的第一种和第三种写法就足够了;

wait()/notify()方式实现

在手写代码之前,我们需要现在 IDE 上实现一遍,理解其中的过程并且找到一些重点/细节,我们先来代码走一遍,然后我把我理解的重点给圈儿出来:

生产者代码

public class Producer implements Runnable {
    private volatile boolean isRunning = true;
    private final Vector sharedQueue;                            // 内存缓冲区
    private final int SIZE;                                      // 缓冲区大小
    private static AtomicInteger count = new AtomicInteger();    // 总数,原子操作
    private static final int SLEEPTIME = 1000;

    public Producer(Vector sharedQueue, int SIZE) {
        this.sharedQueue = sharedQueue;
        this.SIZE = SIZE;
    }

    @Override
    public void run() {
        int data;
        Random r = new Random();

        System.out.println("start producer id = " + Thread.currentThread().getId());
        try {
            while (isRunning) {
                // 模拟延迟
                Thread.sleep(r.nextInt(SLEEPTIME));

                // 当队列满时阻塞等待
                while (sharedQueue.size() == SIZE) {
                    synchronized (sharedQueue) {
                        System.out.println("Queue is full, producer " + Thread.currentThread().getId()
                                + " is waiting, size:" + sharedQueue.size());
                        sharedQueue.wait();
                    }
                }

                // 队列不满时持续创造新元素
                synchronized (sharedQueue) {
                    data = count.incrementAndGet();                    // 构造任务数据
                    sharedQueue.add(data);
                    System.out.println("producer create data:" + data + ", size:" + sharedQueue.size());
                    sharedQueue.notifyAll();
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
            Thread.currentThread().interrupted();
        }
    }

    public void stop() {
        isRunning = false;
    }
}

有了上面的提到的解决思路,应该很容易实现,但是这里主要提一下一些细节和重点:

  • 创造数据:生产者-消费者解决的问题就是数据在多线程间的共享,所以我们首要关心的问题就应该是数据,我们这里采用的是使用一个AtomicInteger类来为我们创造数据,使用它的好处是该类是一个保证原子操作的类,我们使用其中的incrementAndGet()方法不仅能够保证线程安全,还可以达到一个计数的效果,所以是一个既简单又实用的选择,当然也可以使用其他的数据来代替,这里注意的是要保证该类在内存中只存在一份,使用static修饰
  • 内存缓冲区:要保证在多线程环境下内存缓冲区的安全,所以我们考虑使用简单的Vector类来作为我们的内存缓冲区,并且使用final修饰保证内存缓冲区的唯一,然后的话我们需要判断队列是否满,需要手动添加一个标识缓冲区大小的变量SIZE,注意也是final修饰;
  • 模拟延迟:这里主要模拟的是一个网络延迟,我们首先定义了一个SLEEPTIME的延迟范围,注意使用的是static final修饰,然后使用Random()类的nextInt()方法来随机选取一个该范围内的值来模拟网络环境中的延迟;
  • 停止方法:首先需要知道在Thread类中有一个弃用的stop()方法,我们自己增加一个标志位isRunning来完成我们自己的stop()功能,需要注意的是使用volatile来修饰,保证该标志位的可见性;
  • 错误处理:当捕获到错误时,我们应该使用Thread类中的interrupted()方法来终止当前的进程;
  • 消息提示:我们主要是要在控制台输出该生产者的信息,包括当前队列的状态,大小,当前线程的生产者信息等,注意的是信息格式的统一(后面的消费者同样的)

消费者代码

public class Consumer implements Runnable {

    private final Vector sharedQueue;                            // 内存缓冲区
    private final int SIZE;                                      // 缓冲区大小
    private static final int SLEEPTIME = 1000;

    public Consumer(Vector sharedQueue, int SIZE) {
        this.sharedQueue = sharedQueue;
        this.SIZE = SIZE;
    }

    @Override
    public void run() {

        Random r = new Random();

        System.out.println("start consumer id = " + Thread.currentThread().getId());
        try {
            while (true) {
                // 模拟延迟
                Thread.sleep(r.nextInt(SLEEPTIME));

                // 当队列空时阻塞等待
                while (sharedQueue.isEmpty()) {
                    synchronized (sharedQueue) {
                        System.out.println("Queue is empty, consumer " + Thread.currentThread().getId()
                                + " is waiting, size:" + sharedQueue.size());
                        sharedQueue.wait();
                    }
                }

                // 队列不空时持续消费元素
                synchronized (sharedQueue) {
                    System.out.println("consumer consume data:" + sharedQueue.remove(0) + ", size:" + sharedQueue.size());
                    sharedQueue.notifyAll();
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
            Thread.currentThread().interrupt();
        }
    }
}

跟生产者相同的,你需要注意内存缓冲区/ 模拟延迟/ 错误处理/ 消息提示这些方面的细节问题,总体来说消费者就是持续不断的消费,也比较容易实现;

主线程代码

有了我们的消费者和生产者代码,我们需要来验证一下它们的正确性,照常理来说我们直接创建一些消费者和生产者的线程让它们执行就可以了啊,但是为了“加分”考虑呢,我们还是使用线程池吧..也不是特别复杂:

public static void main(String args[]) throws InterruptedException {
    // 1.构建内存缓冲区
    Vector sharedQueue = new Vector();
    int size = 4;

    // 2.建立线程池和线程
    ExecutorService service = Executors.newCachedThreadPool();
    Producer prodThread1 = new Producer(sharedQueue, size);
    Producer prodThread2 = new Producer(sharedQueue, size);
    Producer prodThread3 = new Producer(sharedQueue, size);
    Consumer consThread1 = new Consumer(sharedQueue, size);
    Consumer consThread2 = new Consumer(sharedQueue, size);
    Consumer consThread3 = new Consumer(sharedQueue, size);
    service.execute(prodThread1);
    service.execute(prodThread2);
    service.execute(prodThread3);
    service.execute(consThread1);
    service.execute(consThread2);
    service.execute(consThread3);

    // 3.睡一会儿然后尝试停止生产者
    Thread.sleep(10 * 1000);
    prodThread1.stop();
    prodThread2.stop();
    prodThread3.stop();

    // 4.再睡一会儿关闭线程池
    Thread.sleep(3000);
    service.shutdown();
}

大家可以自行去看看运行的结果,是没有问题的,不会出现多生产或者多消费之类的多线程问题,运行一段时间等生产者都停止之后,我们可以观察到控制台三个消费者都在等待数据的情况:

Queue is empty, consumer 17 is waiting, size:0
Queue is empty, consumer 15 is waiting, size:0
Queue is empty, consumer 16 is waiting, size:0

BlockingQueue阻塞队列方式实现

该方式对比起上面一种方式实现起来要简单一些,因为不需要手动的去通知其他线程了,生产者直接往队列中放数据直到队列满,消费者直接从队列中获取数据直到队列空,BlockingQueue会自动帮我们完成阻塞这个动作,还是先来看看代码

生产者代码

public class Producer implements Runnable {
    private volatile boolean isRunning = true;
    private BlockingQueue<Integer> queue;                        // 内存缓冲区
    private static AtomicInteger count = new AtomicInteger();    // 总数,原子操作
    private static final int SLEEPTIME = 1000;

    public Producer(BlockingQueue<Integer> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        int data;
        Random r = new Random();

        System.out.println("start producer id = " + Thread.currentThread().getId());
        try {
            while (isRunning) {
                // 模拟延迟
                Thread.sleep(r.nextInt(SLEEPTIME));

                // 往阻塞队列中添加数据
                data = count.incrementAndGet();                   // 构造任务数据
                System.out.println("producer " + Thread.currentThread().getId() + " create data:" + data
                        + ", size:" + queue.size());
                if (!queue.offer(data, 2, TimeUnit.SECONDS)) {
                    System.err.println("failed to put data:" + data);
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
            Thread.currentThread().interrupted();
        }
    }

    public void stop() {
        isRunning = false;
    }
}

跟上面一种方式没有很大的差别,倒是代码更加简单通透,不过需要注意的是对阻塞队列添加失败的错误处理

消费者代码

public class Consumer implements Runnable {

    private BlockingQueue<Integer> queue;                            // 内存缓冲区
    private static final int SLEEPTIME = 1000;

    public Consumer(BlockingQueue<Integer> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {

        int data;
        Random r = new Random();

        System.out.println("start consumer id = " + Thread.currentThread().getId());
        try {
            while (true) {
                // 模拟延迟
                Thread.sleep(r.nextInt(SLEEPTIME));

                // 从阻塞队列中获取数据
                if (!queue.isEmpty()) {
                    data = queue.take();
                    System.out.println("consumer " + Thread.currentThread().getId() + " consume data:" + data
                            + ", size:" + queue.size());
                } else {
                    System.out.println("Queue is empty, consumer " + Thread.currentThread().getId()
                            + " is waiting, size:" + queue.size());
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
            Thread.currentThread().interrupt();
        }
    }
}

主线程代码

public static void main(String args[]) throws InterruptedException {
    // 1.构建内存缓冲区
    BlockingQueue<Integer> queue = new LinkedBlockingDeque<>();

    // 2.建立线程池和线程
    ExecutorService service = Executors.newCachedThreadPool();
    Producer prodThread1 = new Producer(queue);
    Producer prodThread2 = new Producer(queue);
    Producer prodThread3 = new Producer(queue);
    Consumer consThread1 = new Consumer(queue);
    Consumer consThread2 = new Consumer(queue);
    Consumer consThread3 = new Consumer(queue);
    service.execute(prodThread1);
    service.execute(prodThread2);
    service.execute(prodThread3);
    service.execute(consThread1);
    service.execute(consThread2);
    service.execute(consThread3);

    // 3.睡一会儿然后尝试停止生产者
    Thread.sleep(10 * 1000);
    prodThread1.stop();
    prodThread2.stop();
    prodThread3.stop();

    // 4.再睡一会儿关闭线程池
    Thread.sleep(3000);
    service.shutdown();
}

因为队列中添加和删除的操作比较频繁,所以这里使用LinkedBlockingQueue来作为阻塞队列,所以这里除了内存缓冲区有所不同以外,其他的都差不多…当然你也可以指定一个队列的大小;

总结以及改进

生产者-消费者模式很好地对生产者线程和消费者线程进行解耦,优化了系统整体的结构,同时由于缓冲区的作用,允许生产者线程和消费者线程存在执行上的性能差异,从一定程度上缓解了性能瓶颈对系统性能的影响;上面两种写法都是非常常规的写法,只能说能起码能在及格的基础上加个那么点儿分数,如果想要得高分可以去搜索搜搜 Disruptor 来实现一个无锁的生产者-消费者模型….这里就不提及了..

改进:上面的线程输出可能会有点儿不友好(不直观),因为我们这里是直接使用的线程的 ID 来作为输出,我们也可以给线程弄一个名字来作为输出,以上;


Part 2.排序算法

排序算法当然也算是重点考察的对象之一了,毕竟基础且偏算法,当然我们有必要去了解常见的排序算法以及它们采取了怎样的思想又是如何实现的还有复杂度的问题,但是这里的话,主要就提及两种考的比较常见的排序算法:冒泡快排,以及分别对它们进行的一些优化;

冒泡排序

冒泡应该是比较基础的一种算法,我们以从小到大排序为例,它的基础思想是:从第一个数开始直到数组倒数第二个数,每一轮都去比较数组中剩下的数,如果后面的数据更小则两数交换,这样一轮一轮的比较交换下来,最大的那个数也就“沉到”了数组的最后,最小的“冒”到了数组的最前面,这样就完成了排序工作;

基础算法代码(未优化)

很简单,直接上代码:

/**
 * 冒泡排序
 *
 * @param nums 待排序的数组
 */
public void bubbleSort(int[] nums) {
    // 正确性判断
    if (null == nums || nums.length <= 1) {
        return;
    }

    // 从小到大排序
    for (int i = 0; i < nums.length - 1; i++) {
        for (int j = i + 1; j < nums.length; j++) {
            if (nums[i] > nums[j]) {
                nums[i] = nums[i] + nums[j];
                nums[j] = nums[i] - nums[j];
                nums[i] = nums[i] - nums[j];
            }
        }
    }
}

这里需要注意:加上正确性判断;(讨论:其实我看大多数地方都是没有这个的,不知道有没有加上的必要…求讨论)

另外光写完实现冒泡排序的算法是不算完的,还要养成良好的习惯去写测试单元用例,而且尽可能要考虑到多的点,例如这里的负数、多个相同的数之类的特殊情况,我就大概写一个吧,也欢迎指正:

@Test
public void bubbleSortTester() {

    // 测试用例1:验证负数是否满足要求
    int[] nums = {1, 4, 2, -2, 5, 11, -7, 0};
    bubbleSort(nums);
    // 输出测试结果
    for (int i = 0; i < nums.length; i++) {
        System.out.print(nums[i] + ", ");
    }
    System.out.println();

    // 测试用例2:验证多个相同的数是否满足要求
    nums = new int[]{1, 1, 5, 7, 7, 3, 1};
    bubbleSort(nums);
    // 输出测试结果
    for (int i = 0; i < nums.length; i++) {
        System.out.print(nums[i] + ", ");
    }
}

冒泡排序优化

想象一个这样的场景:如果该数组基本有序,或者在数组的后半段基本有序,上面的算法就会浪费许多的时间开销,所以我们不再使用双重嵌套去比较每两个元素的值,而只是不断比较数组每前后两个数值,让大的那个数不断“冒”到数组的最后,然后缩小尾边界的范围,并且增加一个标志位,表示这一趟是否发生了交换,如果没有那么证明该数组已经有序则完成了排序了:

/**
 * 改进的冒泡排序
 *
 * @param nums 待排序的数组
 */
public void bubbleSort2(int[] nums) {
    // 正确性判断
    if (null == nums || nums.length <= 1) {
        return;
    }

    // 使用一个数来记录尾边界
    int length = nums.length;
    boolean flag = true;// 发生了交换就为true, 没发生就为false,第一次判断时必须标志位true。
    while (flag) {
        flag = false;// 每次开始排序前,都设置flag为未排序过
        for (int i = 1; i < length; i++) {
            if (nums[i - 1] > nums[i]) {// 前面的数字大于后面的数字就交换
                int temp;
                temp = nums[i - 1];
                nums[i - 1] = nums[i];
                nums[i] = temp;

                // 表示交换过数据;
                flag = true;
            }
        }
        length--; // 减小一次排序的尾边界
    }
}

同样的记得写单元测试函数;

冒泡排序进一步优化

顺着这个思路,我们进一步想象一个场景:现在有一个包含 1000 个数的数组,仅有前面 100 个数无序,后面的 900 个数都比前面的 100 个数更大并且已经排好序,那么上面优化的方法又会造成一定的时间浪费,所以我们进一步增加一个变量记录最后发生交换的元素的位置,也就是排序的尾边界了:

/**
 * 冒泡算法最优解
 *
 * @param nums 待排序的数组
 */
public static void bubbleSort3(int[] nums) {
    int j, k;
    int flag = nums.length;// flag来记录最后交换的位置,也就是排序的尾边界

    while (flag > 0) {// 排序未结束标志
        k = flag;// k 来记录遍历的尾边界
        flag = 0;

        for (j = 1; j < k; j++) {
            if (nums[j - 1] > nums[j]) {// 前面的数字大于后面的数字就交换
                // 交换a[j-1]和a[j]
                int temp;
                temp = nums[j - 1];
                nums[j - 1] = nums[j];
                nums[j] = temp;

                // 表示交换过数据;
                flag = j;// 记录最新的尾边界.
            }
        }
    }
}

这应该是最优的冒泡排序了,同时也别忘记了最后要写测试单元用例代码;

快速排序

快排也是一种很经典的算法,它使用了一种分治的思想,基本思想是:通过一趟排序将待排序的数组分成两个部分,其中一部分记录的是比关键字更小的,另一部分是比关键字更大的,然后再分别对着两部分继续进行排序,直到整个序列有序;

基础实现

非常经典的代码,直接上吧:

public static void quickSort(int[] arr) {
    qsort(arr, 0, arr.length - 1);
}

private static void qsort(int[] arr, int low, int high) {
    if (low < high) {
        int pivot = partition(arr, low, high);        // 将数组分为两部分
        qsort(arr, low, pivot - 1);                   // 递归排序左子数组
        qsort(arr, pivot + 1, high);                  // 递归排序右子数组
    }
}

private static int partition(int[] arr, int low, int high) {
    int pivot = arr[low];               // 枢轴记录
    while (low < high) {
        while (low < high && arr[high] >= pivot) --high;
        arr[low] = arr[high];           // 交换比枢轴小的记录到左端
        while (low < high && arr[low] <= pivot) ++low;
        arr[high] = arr[low];           // 交换比枢轴小的记录到右端
    }
    // 扫描完成,枢轴到位
    arr[low] = pivot;
    // 返回的是枢轴的位置
    return low;
}

当然,在手撕的时候需要注意函数上的 Java Doc 格式的注释,这里省略掉是为了节省篇幅,另外别忘了测试单元用例代码

上面的代码也很容易理解,其实就是一个“填坑”的过程,第一个“坑”挖在每次排序的第一个位置arr[low],从序列后面往前找第一个比pivot小的数来把这个“坑”填上,这时候的“坑”就变成了当前的arr[high],然后再从序列前面往后用第一个比pivot大的数把刚才的“坑”填上,如此往复,始终有一个“坑”需要我们填上,直到最后一个“坑”出现,这个“坑”使用一开始的pivot填上就可以了,而这个“坑”的位置也就是pivot该填上的正确位置,我们再把这个位置返回,就可以把当前序列分成两个部分再依次这样操作最终就达到排序的目的了,不得不说这样的思想挺神奇的;

算法优化

上面这个快速排序算法可以说是最基本的快速排序,因为它并没有考虑任何输入数据。但是,我们很容易发现这个算法的缺陷:这就是在我们输入数据基本有序甚至完全有序的时候,这算法退化为冒泡排序,不再是O(n㏒n),而是O(n^2)了。

究其根源,在于我们的代码实现中,每次只从数组第一个开始取。如果我们采用“三者取中”,即 arr[low], arr[high], arr[(low+high)/2] 三者的中值作为枢轴记录,则可以大大提高快速排序在最坏情况下的性能。但是,我们仍然无法将它在数组有序情形下的性能提高到O(n)。还有一些方法可以不同程度地提高快速排序在最坏情况下的时间性能。

此外,快速排序需要一个递归栈,通常情况下这个栈不会很深,为log(n)级别。但是,如果每次划分的两个数组长度严重失衡,则为最坏情况,栈的深度将增加到O(n)。此时,由栈空间带来的空间复杂度不可忽略。如果加上额外变量的开销,这里甚至可能达到恐怖的O(n^2)空间复杂度。所以,快速排序的最差空间复杂度不是一个定值,甚至可能不在一个级别。

为了解决这个问题,我们可以在每次划分后比较两端的长度,并先对短的序列进行排序(目的是先结束这些栈以释放空间),可以将最大深度降回到O(㏒n)级别。

关于优化的话,了解一个大概的思路就可以了,那在这里稍微总结一下:

  • ①三数取中作为枢轴记录;
  • ②当待排序序列的长度分割到一定大小之后,使用插入排序;
  • ③在一次分割结束后,可以把与pivot相等的元素聚在一起,继续下次分割时,不用再对与pivot相等的元素分割;
  • ④优化递归操作;

参考文章:http://blog.51cto.com/flyingcat2013/1281614
想要了解的更多的童鞋可以戳这里:https://blog.csdn.net/insistGoGo/article/details/7785038


Part 3.二叉树相关算法

二叉树也是一个容易提及的概念和写算法的问题,特别是它的几种递归遍历和非递归遍历,都是基础且常考的点,那在这里就稍微整理整理吧…

二叉树的几种递归遍历

前序、中序、后序遍历都是非常基础且容易的遍历方法,重点还是在后面的中序和后续的非递归遍历上,当然还有层序遍历,所以这里不多说,直接给代码;

前序遍历递归实现

public void preOrderTraverse1(TreeNode root) {
    if (root != null) {
        System.out.print(root.val + "  ");
        preOrderTraverse1(root.left);
        preOrderTraverse1(root.right);
    }
}

中序遍历递归实现

public void inOrderTraverse1(TreeNode root) {
    if (root != null) {
        preOrderTraverse1(root.left);
        System.out.print(root.val + "  ");
        preOrderTraverse1(root.right);
    }
}

后序遍历递归实现

public void postOrderTraverse1(TreeNode root) {
    if (root != null) {
        preOrderTraverse1(root.left);
        preOrderTraverse1(root.right);
        System.out.print(root.val + "  ");
    }
}

前面三种遍历,也就是输出结点数据的位置不同而已,所以很容易,但是如果手写,建议问清楚面试官要求,是在遍历时直接输出还是需要函数返回一个List集合,然后注意写测试用例代码!

二叉树的几种非递归遍历

★★层序遍历★★

层序遍历我们只需要增加使用一个队列即可,看代码很容易理解:

public void levelTraverse(TreeNode root) {
    if (root == null) {
        return;
    }
    LinkedList<TreeNode> queue = new LinkedList<>();
    queue.offer(root);
    while (!queue.isEmpty()) {
        TreeNode node = queue.poll();
        System.out.print(node.val + "  ");
        if (node.left != null) {
            queue.offer(node.left);
        }
        if (node.right != null) {
            queue.offer(node.right);
        }
    }
}

前序遍历非递归实现

public void preOrderTraverse2(TreeNode root) {
    if (root == null) {
        return;
    }
    LinkedList<TreeNode> stack = new LinkedList<>();
    TreeNode pNode = root;
    while (pNode != null || !stack.isEmpty()) {
        if (pNode != null) {
            System.out.print(pNode.val + "  ");
            stack.push(pNode);
            pNode = pNode.left;
        } else { //pNode == null && !stack.isEmpty()
            TreeNode node = stack.pop();
            pNode = node.right;
        }
    }
}

★★中序遍历非递归实现★★

/**
 * 非递归中序遍历二叉树
 *
 * @param root 二叉树根节点
 * @return 中序遍历结果集
 */
public List<Integer> inorderTraversal(TreeNode root) {

    List<Integer> list = new ArrayList<>();
    ArrayDeque<TreeNode> stack = new ArrayDeque<>();

    while (root != null || !stack.isEmpty()) {
        while (root != null) {
            stack.addFirst(root);
            root = root.left;
        }
        root = stack.removeFirst();
        list.add(root.val);
        root = root.right;
    }
    return list;
}

★★后续遍历非递归实现★★

/**
 * 二叉树的后序遍历
 *
 * @param root 二叉树根节点
 * @return 后序遍历结果集
 */
public List<Integer> postorderTraversal(TreeNode root) {
    List<Integer> list = new ArrayList<>();
    Deque<TreeNode> stack = new ArrayDeque<>();
    TreeNode pre = null;
    while (!stack.isEmpty() || root != null) {

        while (root != null) {
            stack.push(root);
            root = root.left;
        }

        root = stack.peek();
        // i :判断如果右子树不为空且不为
        if (root.right != null && root.right != pre) {
            root = root.right;
        } else {
            root = stack.pop();
            list.add(root.val);
            pre = root;
            root = null;
        }
    }
    return list;
}

如果比较难以理解的话,可以自己尝试着跟跟 Debug 然后看看过程;

二叉树相关其他算法题

另外的话还有一些比较常见的关于树的算法,在文章的末尾,这里就不再赘述了:

链接:https://www.jianshu.com/p/4ef1f50d45b5


Part 4.其他重要算法

除了上面 3 Part 比较重要的点之外,还有一些其他的算法也是经常考到的,下面我们来说;

1.反转链表

这是一道很经典的题,不仅考你对链表的理解,而且还有一些细节(例如正确性判断/ 测试用例)需要你从代码层面去展现,下面我们给出两段代码,读者可以自行去比较,我只是提供一个思路;

思路一:使用一个 Node 不断链接

这是最经典的算法,也是需要我们牢牢掌握的方法,最重要的还是理解 while() 循环中的过程:


public ListNode reverseList(ListNode head) {

    // 正确性判断
    if (null == head || null == head.next) {
        return head;
    }

    ListNode pre = null;
    while (null != head) {
        ListNode temp = head;
        head = head.next;
        temp.next = pre;
        pre = temp;
    }

    return pre;
}

思路二:反转元素值然后重新赋给 Node

这是一个很简单的思路,比上个思路要多遍历一遍链表,但是好处是简单,思路清晰,实现起来容易,emm,像这一类问题我觉得另一个比较重要的就是举一反三的能力吧,在这里我只提供两个思路,其实还有很多种实现方法,当然也别忘了细节的东西~

public ListNode reverseList(ListNode head) {
    // 1.正确性判断
    if (null == head || null == head.next) {
        return head;
    }

    // 2.遍历链表head并将结果保存在List集合中
    List<ListNode> list = new LinkedList();
    ListNode tempNode = head;
    while (null != tempNode) {
        list.insert(tempNode.val);
        tempNode = tempNode.next;
    }   // end while:遍历完了链表并将结果保存在了List集合中

    // 3.反转集合,并将值依次复制给链表
    Collections.reverse(list);
    tempNode = head;
    while (null != tempNode) {
        tempNode.val = list.remove(0);
        tempNode = tempNode.next;
    }

    return head;
}

2.合并两个有序链表

问题描述:将两个有序链表合并为一个新的有序链表并返回。新链表是通过拼接给定的两个链表的所有节点组成的;

同样的经典算法,需要掌握:

public ListNode mergeTwoLists(ListNode l1, ListNode l2) {
    if (l1 == null) {
        return l2;
    }
    if (l2 == null) {
        return l1;
    }
    ListNode head = null;
    if (l1.val < l2.val) {
        head = l1;
        head.next = mergeTwoLists(l1.next, l2);
    } else {
        head = l2;
        head.next = mergeTwoLists(l1, l2.next);
    }
    return head;
}

这道题也是 LeetCode 上的一道题,我当时的做法是下面这样的,虽然看起来代码量多了不少而且看起来蠢蠢的..但是经过 LeetCode 测试,甚至比上面的实现要快上那么 2ms,特别提醒:下面的代码只是用作一个思路的参考,着重掌握上面的代码

public ListNode mergeTwoLists(ListNode l1, ListNode l2) {

    // 定义一个虚拟头结点方便遍历
    ListNode dummyHead = new ListNode(-1);
    dummyHead.next = l1;
    ListNode pre = dummyHead;

    // 遍历l1链表
    int len1 = 0;
    while (null != pre.next) {
        len1++;
        pre = pre.next;
    }

    int[] nums1 = new int[len1];

    // 保存l1链表的数据
    pre = dummyHead;
    for (int i = 0; i < len1; i++) {
        nums1[i] = pre.next.val;
        pre = pre.next;
    }

    // 遍历l2链表
    int len2 = 0;
    dummyHead.next = l2;
    pre = dummyHead;
    while (null != pre.next) {
        len2++;
        pre = pre.next;
    }

    int[] nums2 = new int[len2];

    // 保存l2链表的数据
    pre = dummyHead;
    for (int i = 0; i < len2; i++) {
        nums2[i] = pre.next.val;
        pre = pre.next;
    }

    int[] nums = new int[len1 + len2];
    // 将两个链表的数据整合并排序
    System.arraycopy(nums1, 0, nums, 0, len1);
    System.arraycopy(nums2, 0, nums, len1, len2);
    Arrays.sort(nums);

    // 拼接一个链表
    ListNode dummy = new ListNode(-1);
    pre = dummy;
    for (int i = 0; i < nums.length; i++) {
        ListNode node = new ListNode(nums[i]);
        pre.next = node;
        pre = pre.next;
    }

    return dummy.next;
}

3.两个链表的第一个公共结点

题目描述:找出两个链表的第一个公共结点;

/**
 * 求两个链表中第一个公共结点
 *
 * @param pHead1 链表1头结点
 * @param pHead2 链表2头结点
 * @return 链表第一个公共结点
 */
public ListNode FindFirstCommonNode(ListNode pHead1, ListNode pHead2) {
    // 1.正确性判断
    if (null == pHead1 || null == pHead2) {
        return null;
    }

    // 2.遍历链表1把所有结点保存在列表中中
    Vector<ListNode> nodeList1 = new Vector<>();
    while (null != pHead1) {
        nodeList1.add(pHead1);
        pHead1 = pHead1.next;
        // 判断是否成环,成环则退出循环
        if (nodeList1.contains(pHead1)) {
            break;
        }
    }   // end while:链表1中的所有结点都存入了nodeList1中

    // 3.遍历链表2比较列表中的数据
    Vector<ListNode> nodeList2 = new Vector<>();
    while (null != pHead2) {
        // 先判断nodeList1中是否存在相同结点,存在则返回
        if (nodeList1.contains(pHead2)) {
            return pHead2;
        }
        // 如果不存在则继续遍历,并判断是否成环
        nodeList2.add(pHead2);
        pHead2 = pHead2.next;
        if (nodeList2.contains(pHead2)) {
            break;
        }
    }   // end while:遍历完了链表2并将所有结点保存在了nodeList2中

    // 如果遍历完链表2则返回null
    return null;
}

需要注意的细节是:①正确性判断;②判断链表是否自己成环;③注释;④注意要自己写测试用例啊

另外还有一些类似的题目像是:①链表入环结点;②链表倒数第k个结点;之类的都是需要掌握的…

4.二分查找算法

二分查找也是一类比较常考的题目,其实代码也比较容易理解,直接上吧,再再再提醒一下:注意正确性判断还有测试用例…

普通实现

/**
 * 二分查找普通实现。
 *
 * @param srcArray 有序数组
 * @param key      查找元素
 * @return 不存在返回-1
 */
public static int binSearch(int srcArray[], int key) {
    int mid;
    int start = 0;
    int end = srcArray.length - 1;
    while (start <= end) {
        mid = (end - start) / 2 + start;
        if (key < srcArray[mid]) {
            end = mid - 1;
        } else if (key > srcArray[mid]) {
            start = mid + 1;
        } else {
            return mid;
        }
    }
    return -1;
}

递归实现

/**
 * 二分查找递归实现。
 *
 * @param srcArray 有序数组
 * @param start    数组低地址下标
 * @param end      数组高地址下标
 * @param key      查找元素
 * @return 查找元素不存在返回-1
 */
public static int binSearch(int srcArray[], int start, int end, int key) {
    int mid = (end - start) / 2 + start;
    if (srcArray[mid] == key) {
        return mid;
    }
    if (start >= end) {
        return -1;
    } else if (key > srcArray[mid]) {
        return binSearch(srcArray, mid + 1, end, key);
    } else if (key < srcArray[mid]) {
        return binSearch(srcArray, start, mid - 1, key);
    }
    return -1;
}

5.斐波那契数列

这也是一道很经典的题,通常是要要求 N 值的范围的,常规写法应该很简单,所以需要掌握的是优化之后的算法:

public int Fibonacci(int n) {
    // 正确性判断
    if (0 == n || 1 == n) {
        return n;
    }

    int nums1 = 0, nums2 = 1;
    int res = 0;
    for (int i = 2; i <= n; i++) {
        res = nums1 + nums2;
        nums1 = nums2;
        nums2 = res;
    }

    return res;
}

还是注意正确性判断然后写测试用例…


手撕代码总结

如果用手写代码的话,确实是个挺麻烦的事儿,首先需要对代码有相当的熟悉程度,然后其次的话考察的都是一些细节的东西,例如:

  • 编码规范:包括一些命名的规范/ 注释的规范等等;
  • 缩进:这个我自己倒是挺在意的..关于这个可以去参考参考阿里出的那个规范手册;
  • 注释:如果命名规范做得好的话其实是可以达到代码即注释的,但是仍然有一些需要标注的地方例如函数头之类的,最好还是做好注释;
  • 代码的完整性:我觉得这个包括对于错误的处理/ 正确性判断这样一类的东西;
  • 测试用例:每个函数都需要一定的测试来保证其正确性,所以这个还是挺有必要的,特别是一些边界情况,null 值判断之类的;
  • 想好再下笔:这一点其实也蛮重要的,不管是在纸上还是在我们平时的编程中,思路永远都是更重要的;

说来说去还是关于代码的事,我觉得还是理清思路最重要,所以我们需要在一遍一遍熟悉代码的过程中,熟知这些代码的思路,只有搞清楚这些代码背后的原理了,我们才能正确且快速的写出我们心中想要的代码,而不是简单的去背诵,这样是没有很大效果的,以上…


欢迎转载,转载请注明出处!
简书ID:@我没有三颗心脏
github:wmyskxz
欢迎关注公众微信号:wmyskxz_javaweb
分享自己的Java Web学习之路以及各种Java学习资料
想要交流的朋友也可以加qq群:3382693

前言:在之前的面试中,每每问到关于Java I/O 方面的东西都感觉自己吃了大亏..所以这里抢救一下..来深入的了解一下在Java之中的 I/O 到底是怎么回事..文章可能说明类的文字有点儿多,希望能耐心读完..

什么是 I/O?

学习过计算机相关课程的童鞋应该都知道,I/O 即输入Input/ 输出Output的缩写,最容易让人联想到的就是屏幕这样的输出设备以及键盘鼠标这一类的输入设备,其广义上的定义就是:数据在内部存储器和外部存储器或其他周边设备之间的输入和输出;

我们可以从定义上看到问题的核心就是:数据/ 输入/ 输出,在Java中,主要就是涉及到磁盘 I/O 和网络 I/O 两种了;

简单理解Java 流(Stream)

通常我们说 I/O 都会涉及到诸如输入流、输出流这样的概念,那么什么是流呢?流是一个抽象但形象的概念,你可以简单理解成一个数据的序列,输入流表示从一个源读取数据,输出流则表示向一个目标写数据,在Java程序中,对于数据的输入和输出都是采用 “流” 这样的方式进行的,其设备可以是文件、网络、内存等;

流具有方向性,至于是输入流还是输出流则是一个相对的概念,一般以程序为参考,如果数据的流向是程序至设备,我们成为输出流,反之我们称为输入流。

可以将流想象成一个“水流管道”,水流就在这管道中形成了,自然就出现了方向的概念。

“流”,代表了任何有能力产出数据的数据源对象或有能力接受数据的接收端对象,它屏蔽了实际的 I/O 设备中处理数据的细节——摘自《Think in Java》

参考资料:深入理解 Java中的 流 (Stream)https://www.cnblogs.com/shitouer/archive/2012/12/19/2823641.html


Java中的 I/O 类库的基本架构

I/O 问题是任何编程语言都无法回避的问题,因为 I/O 操作是人机交互的核心,是机器获取和交换信息的主要渠道,所以如何设计 I/O 系统变成了一大难题,特别是在当今大流量大数据的时代,I/O 问题尤其突出,很容易称为一个性能的瓶颈,也正因为如此,在 I/O 库上也一直在做持续的优化,例如JDK1.4引入的 NIO,JDK1.7引入的 NIO 2.0,都一定程度上的提升了 I/O 的性能;

Java的 I/O 操作类在包 java.io下,有将近80个类,这些类大概可以分成如下 4 组:

  • 基于字节操作的 I/O 接口:InputStream 和 OutputStream;
  • 基于字符操作的 I/O 接口:Writer 和 Reader;
  • 基于磁盘操作的 I/O 接口:File;
  • 基于网络操作的 I/O 接口:Socket;

前两组主要是传输数据的数据格式,后两组主要是传输数据的方式,虽然Socket类并不在java.io包下,但这里仍然把它们划分在了一起;I/O 只是人机交互的一种手段,除了它们能够完成这个交互功能外,我们更多的应该是关注如何提高它的运行效率;

00.基于字节的 I/O 操作接口

基于字节的 I/O 操作的接口输入和输出分别对应是 InputStream 和 OutputStream,InputStream 的类层次结构如下图:

输入流根据数据类型和操作方式又被划分成若干个子类,每个子类分别处理不同操作类型,OutputStream 输出流的类层次结构也是类似,如下图所示:

这里就不详细解释每个子类如何使用了,如果感兴趣可以自己去看一下JDK的源码,而且的话从类名也能大致看出一二该类是在处理怎样的一些东西..这里需要说明两点:

1)操作数据的方式是可以组合使用的:

例如:

OutputStream out = new BufferedOutputStream(new ObjectOutputStream(new FileOutputStream("fileName"))

2)必须要指定流最终写到什么地方:

要么是写到磁盘,要么是写到网络中,但重点是你必须说明这一点,而且你会发现其实SocketOutputStream是属于FileOutputStream下的,也就是说写网络实际上也是写文件,只不过写网络还有一步需要处理,就是让底层的操作系统知道我这个数据是需要传送到其他地方而不是本地磁盘上的;

01.基于字符的 I/O 操作接口

不管是磁盘还是网络传输,最小的存储单元都是字节,而不是字符,所以 I/O 操作的都是字节而不是字符,但是在我们日常的程序中操作的数据几乎都是字符,所以为了操作方便当然要提供一个可以直接写字符的 I/O 接口。而且从字符到字节必须经过编码转换,而这个编码又非常耗时,还经常出现乱码的问题,所以 I/O 的编码问题经常是让人头疼的问题,关于这个问题有一篇深度好文推荐一下:《深入分析 Java 中的中文编码问题》

下图是写字符的 I/O 操作接口涉及到的类,Writer 类提供了一个抽象方法 write(char cbuf[], int off, int len) 由子类去实现:

读字符的操作接口也有类似的类结构,如下图所示:

读字符的操作接口中也是 int read(char cbuf[], int off, int len),返回读到的 n 个字节数,不管是 Writer 还是 Reader 类它们都只定义了读取或写入的数据字符的方式,也就是怎么写或读,但是并没有规定数据要写到哪去,写到哪去就是我们后面要讨论的基于磁盘和网络的工作机制。

01.字节与字符的转化接口

另外数据持久化或网络传输都是以字节进行的,所以必须要有字符到字节或字节到字符的转化。字符到字节需要转化,其中读的转化过程如下图所示:

InputStreamReader 类是字节到字符的转化桥梁,InputStream 到 Reader 的过程要指定编码字符集,否则将采用操作系统默认字符集,很可能会出现乱码问题。StreamDecoder 正是完成字节到字符的解码的实现类。也就是当你用如下方式读取一个文件时:

try { 
       StringBuffer str = new StringBuffer(); 
       char[] buf = new char[1024]; 
       FileReader f = new FileReader("file"); 
       while(f.read(buf)>0){ 
           str.append(buf); 
       } 
       str.toString(); 
} catch (IOException e) {}

FileReader 类就是按照上面的工作方式读取文件的,FileReader 是继承了 InputStreamReader 类,实际上是读取文件流,然后通过 StreamDecoder 解码成 char,只不过这里的解码字符集是默认字符集。

写入也是类似的过程如下图所示:

通过 OutputStreamWriter 类完成,字符到字节的编码过程,由 StreamEncoder 完成编码过程。


磁盘 I/O 的工作机制

在介绍 Java 读取和写入磁盘文件之前,先来看看应用程序访问文件有哪几种方式;

几种访问文件的方式

我们知道,读取和写入文件 I/O 操作都调用的是操作系统提供给我们的接口,因为磁盘设备是归操作系统管的,而只要是系统调用都可能存在内核空间地址和用户空间地址切换的问题,这是为了保证用户进程不能直接操作内核,保证内核的安全而设计的,现代的操作系统将虚拟空间划分成了内核空间和用户空间两部分并实现了隔离,但是这样虽然保证了内核程序运行的安全性,但是也必然存在数据可能需要从内核空间向用户用户空间复制的问题;

如果遇到非常耗时的操作,如磁盘 I/O,数据从磁盘复制到内核空间,然后又从内核空间复制到用户空间,将会非常耗时,这时操作系统为了加速 I/O 访问,在内核空间使用缓存机制,也就是将从磁盘读取的文件按照一定的组织方式进行缓存,入股用户程序访问的是同一段磁盘地址的空间数据,那么操作系统将从内核缓存中直接取出返回给用户程序,这样就可以减少 I/O 的响应时间;

00. 标准访问文件的方式

读取的方式是,当应用程序调用read()接口时:

  • ①操作系统首先检查在内核的高速缓存中是否存在需要的数据,如果有,那么直接从缓存中返回;
  • ②如果没有,则从磁盘中读取,然后缓存在操作系统的缓存中;

写入的方式是,当应用程序调用write()接口时:

  • 从用户地址空间复制到内核地址空间的缓存中,这时对用户程序来说写操作就已经完成了,至于什么时候在写到磁盘中由操作系统决定,除非显示地调用了 sync 同步命令;

01.直接 I/O 方式

所谓的直接 I/O 的方式就是应用程序直接访问磁盘数据,而不经过操作系统内核数据缓冲区,这样做的目的是减少一次从内核缓冲区到用户程序缓存的数据复制;

这种访问文件的方式通常是在对数据的缓存管理由应用程序实现的数据库管理系统中,如在数据库管理系统中,系统明确地知道应该缓存哪些数据,应该失效哪些数据,还可以对一些热点数据做预加载,提前将热点数据加载到内存,可以加速数据的访问效率,而这些情况如果是交给操作系统进行缓存,那么操作系统将不知道哪些数据是热点数据,哪些是只会访问一次的数据,因为它只是简单的缓存最近一次从磁盘读取的数据而已;

但是直接 I/O 也有负面影响,如果访问的数据不再应用程序缓存之中,那么每次数据都会直接从磁盘进行加载,这种直接加载会非常缓慢,因此直接 I/O 通常与 异步 I/O 进行结合以达到更好的性能;

10.内存映射的方式

内存映射是指将硬盘上文件的位置与进程逻辑地址空间中一块大小相同的区域一一对应,当要访问内存中一段数据时,转换为访问文件的某一段数据。这种方式的目的同样是减少数据在用户空间和内核空间之间的拷贝操作。当大量数据需要传输的时候,采用内存映射方式去访问文件会获得比较好的效率。

同步和异步访问文件的方式

另外还有两种方式,一种是数据的读取和写入都是同步操作的同步方式,另一种是是当访问数据的线程发出请求之后,线程会接着去处理其他事情,而不是阻塞等待的异步访问方式,但从笔者就《深入分析 Java Web技术内幕》一书中的内容来看,这两种方式更像是对标准访问方式的一个具体说明,是标准访问方式对应的两种不同处理方法,知道就好了…


Java 访问磁盘文件

我们知道数据在磁盘的唯一最小描述就是文件,也就是说上层应用程序只能通过文件来操作磁盘上的数据,文件也是操作系统和磁盘驱动器交互的一个最小单元。值得注意的是 Java 中通常的 File 并不代表一个真实存在的文件对象,当你通过指定一个路径描述符时,它就会返回一个代表这个路径相关联的一个虚拟对象,这个可能是一个真实存在的文件或者是一个包含多个文件的目录。为何要这样设计?因为大部分情况下,我们并不关心这个文件是否真的存在,而是关心这个文件到底如何操作。例如我们手机里通常存了几百个朋友的电话号码,但是我们通常关心的是我有没有这个朋友的电话号码,或者这个电话号码是什么,但是这个电话号码到底能不能打通,我们并不是时时刻刻都去检查,而只有在真正要给他打电话时才会看这个电话能不能用。也就是使用这个电话记录要比打这个电话的次数多很多。

何时真正会要检查一个文件存不存?就是在真正要读取这个文件时,例如 FileInputStream 类都是操作一个文件的接口,注意到在创建一个 FileInputStream 对象时,会创建一个 FileDescriptor 对象,其实这个对象就是真正代表一个存在的文件对象的描述,当我们在操作一个文件对象时可以通过 getFD() 方法获取真正操作的与底层操作系统关联的文件描述。例如可以调用 FileDescriptor.sync() 方法将操作系统缓存中的数据强制刷新到物理磁盘中。

下面以上文读取文件的程序为例,介绍下如何从磁盘读取一段文本字符。如下图所示:

当传入一个文件路径,将会根据这个路径创建一个 File 对象来标识这个文件,然后将会根据这个 File 对象创建真正读取文件的操作对象,这时将会真正创建一个关联真实存在的磁盘文件的文件描述符 FileDescriptor,通过这个对象可以直接控制这个磁盘文件。由于我们需要读取的是字符格式,所以需要 StreamDecoder 类将 byte 解码为 char 格式,至于如何从磁盘驱动器上读取一段数据,由操作系统帮我们完成。至于操作系统是如何将数据持久化到磁盘以及如何建立数据结构需要根据当前操作系统使用何种文件系统来回答,至于文件系统的相关细节可以参考另外的文章。

参考文章:深入分析 Java I/O 的工作机制
关于这一part,我们只需要了解一下就可以,我也是直接复制就完事儿…

Java 序列化技术

Java序列化就是将一个对象转化成一串二进制表示的字节数组,通过保存或转移这些字节数据来达到持久化的目的。需要持久化,对象必须继承 java.io.Serializable 接口,或者将其转为字节数组,用于网络传输;

一个实际的序列化例子

第一步:创建一个用于序列化的对象

为了具体说明序列化在Java中是如何运作的,我们来写一个实际的例子,首先我们来写一个用于序列化的对象,然后实现上述的接口:

/**
 * 用于演示Java中序列化的工作流程...
 *
 * @author: @我没有三颗心脏
 * @create: 2018-08-15-下午 14:37
 */
public class People implements Serializable{

    public String name;
    public transient int age;

    public void sayHello() {
        System.out.println("Hello,My Name is " + name);
    }
}

注意:一个类的对象想要序列化成功,必须满足两个条件

  • ①实现上述的接口;
  • ②保证该类的所有属性必须都是可序列化的,如果不希望某个属性序列化(例如一些敏感信息),可以加上transient关键字;

第二步:序列化对象

如下的代码完成了实例化一个 People 对象并其序列化到D盘的根目录下的一个操作,这里呢按照 Java 的标准约定将文件的后缀写成 .ser 的样子,你也可以写成其他的…

People people = new People();
people.name = "我没有三颗心脏";
people.age = 21;

try {
    FileOutputStream fileOutputStream = new FileOutputStream("D:/people.ser");
    ObjectOutputStream out = new ObjectOutputStream(fileOutputStream);
    out.writeObject(people);
    out.close();
    fileOutputStream.close();
    System.out.println("Serialized data is saved in D:/");
} catch (IOException e) {
    e.printStackTrace();
}

第三步:反序列化对象

下面的程序完成了对刚才我们序列化的文件还原成一个People对象的过程,并获取了其中的参数,但是注意,由于我们希望 age 属性是短暂的加入了transient关键字, 所以我们无法获取到序列化时 People 的 age 属性:

People people = null;
try {
    FileInputStream fileIn = new FileInputStream("D:/people.ser");
    ObjectInputStream in = new ObjectInputStream(fileIn);
    people = (People) in.readObject();
    in.close();
    fileIn.close();
} catch (IOException i) {
    i.printStackTrace();
    return;
} catch (ClassNotFoundException c) {
    System.out.println("People class not found");
    c.printStackTrace();
    return;
}
System.out.println("Deserialized People...");
System.out.println("Name: " + people.name);
System.out.println("Age: " + people.age);

输出结果如下:

Deserialized People...
Name: 我没有三颗心脏
Age: 0

serialVersionUID的作用

上述的例子中我们完成了对一个 People 对象序列化和反序列化的过程,我们现在来做一点简单的修改,例如把age字段的transient关键字去掉:

public class People implements Serializable {

    public String name;
    public int age;

    public void sayHello() {
        System.out.println("Hello,My Name is " + name);
    }
}

然后我们再运行我们刚才反序列化的代码,会发现,这个时候程序竟然报错了,说是serialVersionUID不一致:

事实上,如果你经常看别人的代码的话,或许会有留意到诸如这样的代码:

private static final long serialVersionUID = 876323262645176354L;

就这一长串的东西也不知道是在干嘛的,但这其实是为了保证序列化版本的兼容性,即在版本升级后序列化仍保持对象的唯一性;我们通过上述的修改也感受到了其中的一二,但是问题是:我们并没有在需要序列化的对象中写任何关于这个UID的代码呀?

这是个有趣的问题,通常情况下,如果我们实现了序列化接口,但是没有自己显式的声明这个UID的话,那么JVM就会根据该类的类名、属性名、方法名等自己计算出一个独一无二的变量值,然后将这个变量值一同序列化到文件之中,而在反序列化的时候同样,会根据该类计算出一个独一无二的变量然后进行比较,不一致就会报错,但是我怀着强烈的好奇心去反编译了一下.class文件,并没有发现编译器写了UDI这一类的东西,我看《深入分析 Java Web 技术内幕》中说,实际上是写到了二进制文件里面了;

  • 不显式声明的缺点:一旦写好了某一个类,那么想要修改就不行了,所以我们最好自己显式的去声明;
  • 显式声明的方式:①使用默认的1L作用UID;②根据类名、接口名等生成一个64位的哈希字段,现在的编译器如IDEA、Eclipse都有这样的功能,大家感兴趣去了解下;

序列化用来干什么?

虽然我们上面的程序成功将一个对象序列化保存到磁盘,然后从磁盘还原,但是这样的功能到底可以应用在哪些场景?到底可以干一些什么样的事情呢?下面举一些在实际应用中的例子:

  • Web服务器中保存Session对象,如Tomcat会在服务器关闭时把session序列化存储到一个名为session.ser的文件之中,这个过程称为session的钝化;
  • 网络上传输对象,如分布式应用等;

关于序列化的一些细节

1.如果一个类没有实现Serializable接口,但是它的基类实现了,那么这个类也是可以序列化的;

2.相反,如果一个类实现了Serializable接口,但是它的父类没有实现,那么这个类还是可以序列化(Object是所有类的父类),但是序列化该子类对象,然后反序列化后输出父类定义的某变量的数值,会发现该变量数值与序列化时的数值不同(一般为null或者其他默认值),而且这个父类里面必须有无参的构造方法,不然子类反序列化的时候会报错。

了解到这里就可以了,更多的细节感兴趣的童鞋可以自行去搜索引擎搜索..


网络 I/O 工作机制

数据从一台主机发送到网络中的另一台主机需要经过很多步骤,首先双方需要有沟通的意向,然后要有能够沟通的物理渠道(物理链路),其次,还要保障双方能够正常的进行交流,例如语言一致的问题、说话顺序的问题等等等;

Java Socket 的工作机制

看到有地方说:网络 I/O 的实质其实就是对 Socket 的读取;那Socket 这个概念没有对应到一个具体的实体,它是描述计算机之间完成相互通信一种抽象功能。打个比方,可以把 Socket 比作为两个城市之间的交通工具,有了它,就可以在城市之间来回穿梭了。交通工具有多种,每种交通工具也有相应的交通规则。Socket 也一样,也有多种。大部分情况下我们使用的都是基于 TCP/IP 的流套接字,它是一种稳定的通信协议。

下图是典型的基于 Socket 的通信的场景:

主机 A 的应用程序要能和主机 B 的应用程序通信,必须通过 Socket 建立连接,而建立 Socket 连接必须需要底层 TCP/IP 协议来建立 TCP 连接。建立 TCP 连接需要底层 IP 协议来寻址网络中的主机。我们知道网络层使用的 IP 协议可以帮助我们根据 IP 地址来找到目标主机,但是一台主机上可能运行着多个应用程序,如何才能与指定的应用程序通信就要通过 TCP 或 UPD 的地址也就是端口号来指定。这样就可以通过一个 Socket 实例唯一代表一个主机上的一个应用程序的通信链路了。

建立通信链路

当客户端要与服务端通信,客户端首先要创建一个 Socket 实例,操作系统将为这个 Socket 实例分配一个没有被使用的本地端口号,并创建一个包含本地和远程地址和端口号的套接字数据结构,这个数据结构将一直保存在系统中直到这个连接关闭。在创建 Socket 实例的构造函数正确返回之前,将要进行 TCP 的三次握手协议,TCP 握手协议完成后,Socket 实例对象将创建完成,否则将抛出 IOException 错误。

与之对应的服务端将创建一个 ServerSocket 实例,ServerSocket 创建比较简单只要指定的端口号没有被占用,一般实例创建都会成功,同时操作系统也会为 ServerSocket 实例创建一个底层数据结构,这个数据结构中包含指定监听的端口号和包含监听地址的通配符,通常情况下都是“*”即监听所有地址。之后当调用 accept() 方法时,将进入阻塞状态,等待客户端的请求。当一个新的请求到来时,将为这个连接创建一个新的套接字数据结构,该套接字数据的信息包含的地址和端口信息正是请求源地址和端口。这个新创建的数据结构将会关联到 ServerSocket 实例的一个未完成的连接数据结构列表中,注意这时服务端与之对应的 Socket 实例并没有完成创建,而要等到与客户端的三次握手完成后,这个服务端的 Socket 实例才会返回,并将这个 Socket 实例对应的数据结构从未完成列表中移到已完成列表中。所以 ServerSocket 所关联的列表中每个数据结构,都代表与一个客户端的建立的 TCP 连接。

数据传输

传输数据是我们建立连接的主要目的,如何通过 Socket 传输数据,下面将详细介绍。

当连接已经建立成功,服务端和客户端都会拥有一个 Socket 实例,每个 Socket 实例都有一个 InputStream 和 OutputStream,正是通过这两个对象来交换数据。同时我们也知道网络 I/O 都是以字节流传输的。当 Socket 对象创建时,操作系统将会为 InputStream 和 OutputStream 分别分配一定大小的缓冲区,数据的写入和读取都是通过这个缓存区完成的。写入端将数据写到 OutputStream 对应的 SendQ 队列中,当队列填满时,数据将被发送到另一端 InputStream 的 RecvQ 队列中,如果这时 RecvQ 已经满了,那么 OutputStream 的 write 方法将会阻塞直到 RecvQ 队列有足够的空间容纳 SendQ 发送的数据。值得特别注意的是,这个缓存区的大小以及写入端的速度和读取端的速度非常影响这个连接的数据传输效率,由于可能会发生阻塞,所以网络 I/O 与磁盘 I/O 在数据的写入和读取还要有一个协调的过程,如果两边同时传送数据时可能会产生死锁,在后面 NIO 部分将介绍避免这种情况。

NIO 的工作方式

BIO 带来的挑战

BIO 即阻塞 I/O,不管是磁盘 I/O 还是网络 I/O,数据在写入 OutputStream 或者从 InputStream 读取时都有可能会阻塞。一旦有线程阻塞将会失去 CPU 的使用权,这在当前的大规模访问量和有性能要求情况下是不能接受的。虽然当前的网络 I/O 有一些解决办法,如一个客户端一个处理线程,出现阻塞时只是一个线程阻塞而不会影响其它线程工作,还有为了减少系统线程的开销,采用线程池的办法来减少线程创建和回收的成本,但是有一些使用场景仍然是无法解决的。如当前一些需要大量 HTTP 长连接的情况,像淘宝现在使用的 Web 旺旺项目,服务端需要同时保持几百万的 HTTP 连接,但是并不是每时每刻这些连接都在传输数据,这种情况下不可能同时创建这么多线程来保持连接。即使线程的数量不是问题,仍然有一些问题还是无法避免的。如这种情况,我们想给某些客户端更高的服务优先级,很难通过设计线程的优先级来完成,另外一种情况是,我们需要让每个客户端的请求在服务端可能需要访问一些竞争资源,由于这些客户端是在不同线程中,因此需要同步,而往往要实现这些同步操作要远远比用单线程复杂很多。以上这些情况都说明,我们需要另外一种新的 I/O 操作方式。

NIO 的工作机制

很多人都把NIO翻译成New IO,但我更觉得No-Block IO更接近它的本意,也就是非阻塞式IO,它虽然是非阻塞式的,但它是同步的,我们先看一下 NIO 涉及到的关联类图,如下:

上图中有两个关键类:Channel 和 Selector,它们是 NIO 中两个核心概念。我们还用前面的城市交通工具来继续比喻 NIO 的工作方式,这里的 Channel 要比 Socket 更加具体,它可以比作为某种具体的交通工具,如汽车或是高铁等,而 Selector 可以比作为一个车站的车辆运行调度系统,它将负责监控每辆车的当前运行状态:是已经出战还是在路上等等,也就是它可以轮询每个 Channel 的状态。这里还有一个 Buffer 类,它也比 Stream 更加具体化,我们可以将它比作为车上的座位,Channel 是汽车的话就是汽车上的座位,高铁上就是高铁上的座位,它始终是一个具体的概念,与 Stream 不同。Stream 只能代表是一个座位,至于是什么座位由你自己去想象,也就是你在去上车之前并不知道,这个车上是否还有没有座位了,也不知道上的是什么车,因为你并不能选择,这些信息都已经被封装在了运输工具(Socket)里面了,对你是透明的。

NIO 引入了 Channel、Buffer 和 Selector 就是想把这些信息具体化,让程序员有机会控制它们,如:当我们调用 write() 往 SendQ 写数据时,当一次写的数据超过 SendQ 长度是需要按照 SendQ 的长度进行分割,这个过程中需要有将用户空间数据和内核地址空间进行切换,而这个切换不是你可以控制的。而在 Buffer 中我们可以控制 Buffer 的 capacity,并且是否扩容以及如何扩容都可以控制。

理解了这些概念后我们看一下,实际上它们是如何工作的,下面是典型的一段 NIO 代码:

public void selector() throws IOException {
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        Selector selector = Selector.open();
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);//设置为非阻塞方式
        ssc.socket().bind(new InetSocketAddress(8080));
        ssc.register(selector, SelectionKey.OP_ACCEPT);//注册监听的事件
        while (true) {
            Set selectedKeys = selector.selectedKeys();//取得所有key集合
            Iterator it = selectedKeys.iterator();
            while (it.hasNext()) {
                SelectionKey key = (SelectionKey) it.next();
                if ((key.readyOps() & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT) {
                    ServerSocketChannel ssChannel = (ServerSocketChannel) key.channel();
                 SocketChannel sc = ssChannel.accept();//接受到服务端的请求
                    sc.configureBlocking(false);
                    sc.register(selector, SelectionKey.OP_READ);
                    it.remove();
                } else if 
                ((key.readyOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ) {
                    SocketChannel sc = (SocketChannel) key.channel();
                    while (true) {
                        buffer.clear();
                        int n = sc.read(buffer);//读取数据
                        if (n <= 0) {
                            break;
                        }
                        buffer.flip();
                    }
                    it.remove();
                }
            }
        }
}

调用 Selector 的静态工厂创建一个选择器,创建一个服务端的 Channel 绑定到一个 Socket 对象,并把这个通信信道注册到选择器上,把这个通信信道设置为非阻塞模式。然后就可以调用 Selector 的 selectedKeys 方法来检查已经注册在这个选择器上的所有通信信道是否有需要的事件发生,如果有某个事件发生时,将会返回所有的 SelectionKey,通过这个对象 Channel 方法就可以取得这个通信信道对象从而可以读取通信的数据,而这里读取的数据是 Buffer,这个 Buffer 是我们可以控制的缓冲器。

在上面的这段程序中,是将 Server 端的监听连接请求的事件和处理请求的事件放在一个线程中,但是在实际应用中,我们通常会把它们放在两个线程中,一个线程专门负责监听客户端的连接请求,而且是阻塞方式执行的;另外一个线程专门来处理请求,这个专门处理请求的线程才会真正采用 NIO 的方式,像 Web 服务器 Tomcat 和 Jetty 都是这个处理方式,关于 Tomcat 和 Jetty 的 NIO 处理方式可以参考文章《 Jetty 的工作原理和与 Tomcat 的比较》。

下图是描述了基于 NIO 工作方式的 Socket 请求的处理过程:

上图中的 Selector 可以同时监听一组通信信道(Channel)上的 I/O 状态,前提是这个 Selector 要已经注册到这些通信信道中。选择器 Selector 可以调用 select() 方法检查已经注册的通信信道上的是否有 I/O 已经准备好,如果没有至少一个信道 I/O 状态有变化,那么 select 方法会阻塞等待或在超时时间后会返回 0。上图中如果有多个信道有数据,那么将会将这些数据分配到对应的数据 Buffer 中。所以关键的地方是有一个线程来处理所有连接的数据交互,每个连接的数据交互都不是阻塞方式,所以可以同时处理大量的连接请求。

Buffer 的工作方式

上面介绍了 Selector 将检测到有通信信道 I/O 有数据传输时,通过 selelct() 取得 SocketChannel,将数据读取或写入 Buffer 缓冲区。下面讨论一下 Buffer 如何接受和写出数据?

Buffer 可以简单的理解为一组基本数据类型的元素列表,它通过几个变量来保存这个数据的当前位置状态,也就是有四个索引。如下表所示:

索引 说明
capacity 缓冲区数组的总长度
position 下一个要操作的数据元素的位置
limit 缓冲区数组中不可操作的下一个元素的位置,limit<=capacity
mark 用于记录当前 position 的前一个位置或者默认是 0

在实际操作数据时它们有如下关系图:

我们通过 ByteBuffer.allocate(11) 方法创建一个 11 个 byte 的数组缓冲区,初始状态如上图所示,position 的位置为 0,capacity 和 limit 默认都是数组长度。当我们写入 5 个字节时位置变化如下图所示:

这时底层操作系统就可以从缓冲区中正确读取这 5 个字节数据发送出去了。在下一次写数据之前我们在调一下 clear() 方法。缓冲区的索引状态又回到初始位置。

这里还要说明一下 mark,当我们调用 mark() 时,它将记录当前 position 的前一个位置,当我们调用 reset 时,position 将恢复 mark 记录下来的值。

还有一点需要说明,通过 Channel 获取的 I/O 数据首先要经过操作系统的 Socket 缓冲区再将数据复制到 Buffer 中,这个的操作系统缓冲区就是底层的 TCP 协议关联的 RecvQ 或者 SendQ 队列,从操作系统缓冲区到用户缓冲区复制数据比较耗性能,Buffer 提供了另外一种直接操作操作系统缓冲区的的方式即 ByteBuffer.allocateDirector(size),这个方法返回的 byteBuffer 就是与底层存储空间关联的缓冲区,它的操作方式与 linux2.4 内核的 sendfile 操作方式类似。

Java NIO 实例

上面从 NIO 中引入了一些概念,下面我们对这些概念再来进行简单的复述和补充:

  • 缓冲区Buffer:缓冲区是一个对象,里面存的是数据,NIO进行通讯,传递的数据,都包装到Buffer中,Buffer是一个抽象类。子类有ByteBuffer、CharBuffer等,常用的是字节缓冲区,也就是ByteBuffer;
  • 通道Channel:channel是一个通道,通道就是通流某种物质的管道,在这里就是通流数据,他和流的不同之处就在于,流是单向的,只能向一个方向流动,而通道是一个管道,有两端,是双向的,可以进行读操作,也可以写操作,或者两者同时进行;
  • 多路复用器Selector:多路复用器是一个大管家,他管理着通道,通道把自己注册到Selector上面,Selector会轮询注册到自己的管道,通过判断这个管道的不同的状态,来进行相应的操作;

NIO 工作机制的核心思想就是:客户端和服务器端都是使用的通道,通道具有事件,可以将事件注册到多路复选器上,事件有就绪和非就绪两种状态,就绪的状态会放到多路复选器的就绪键的集合中,起一个线程不断地去轮询就绪的状态,根据不同的状态做不同的处理

参考资料:https://wangjingxin.top/2017/01/17/io/

NIO 和 IO 的主要区别

  1. 面向流与面向缓冲.
    Java NIO和IO之间第一个最大的区别是,IO是面向流的,NIO是面向缓冲区的。Java IO面向流意味着每次从流中读一个或多个字节,直至读取所有字节,它们没有被缓存在任何地方。此外,它不能前后移动流中的数据。如果需要前后移动从流中读取的数据,需要先将它缓存到一个缓冲区。 Java NIO的缓冲导向方法略有不同。数据读取到一个它稍后处理的缓冲区,需要时可在缓冲区中前后移动。这就增加了处理过程中的灵活性。
  2. 阻塞与非阻塞IO
    Java IO的各种流是阻塞的。这意味着,当一个线程调用read() 或 write()时,该线程被阻塞,直到有一些数据被读取,或数据完全写入。该线程在此期间不能再干任何事情了。 Java NIO的非阻塞模式,使一个线程从某通道发送请求读取数据,但是它仅能得到目前可用的数据,如果目前没有数据可用时,该线程可以继续做其他的事情。 非阻塞写也是如此。一个线程请求写入一些数据到某通道,但不需要等待它完全写入,这个线程同时可以去做别的事情。线程通常将非阻塞IO的空闲时间用于在其它通道上执行IO操作,所以一个单独的线程现在可以管理多个输入和输出通道(channel)。
  3. 选择器(Selectors)
    Java NIO的选择器允许一个单独的线程来监视多个输入通道,你可以注册多个通道使用一个选择器,然后使用一个单独的线程来“选择”通道:这些通道里已经有可以处理的输入,或者选择已准备写入的通道。这种选择机制,使得一个单独的线程很容易来管理多个通道。

Java AIO 简单了解

AIO就是异步非阻塞IO,A就是asynchronous的意思,因为NIO1.0虽然面向缓冲,利用多路复选器实现了同步非阻塞IO,可是在NIO1.0中需要使用一个线程不断去轮询就绪集合,开销也是比较大的,所以在jdk1.7中扩展了NIO,称之为NIO2.0,NIO2.0中引入了AIO,此外NIO2.0中还引入了异步文件通道,那么究竟是怎么实现异步的呢?

AIO 有三个特点,它的特点也可以说明它是如何完成异步这样的操作的:

  • ①读完了再通知我;
  • ②不会加快 I/O,只是在读完后进行通知;
  • ③使用回调函数,进行业务处理;

AIO 的核心原理就是:对客户端和服务器端的各种操作进行回调函数的注册(通过实现一个CompletionHandler接口,其中定义了一个completed的成功操作方法和一个fail的失败方法)。在完成某个操作之后,就会自己去调用该注册到该操作的回调函数,达到异步的效果。

BIO/ NIO/ AIO 的简单理解

我们在这里假设一个烧了一排开水的场景,BIO(同步阻塞IO)的做法就是,叫一个线程停留在一个水壶那,直到这个水壶烧开我再去处理下一个水壶;NIO(准备好再通知我,同步非阻塞IO)的做法就是叫一个线程不断地去询问每个水壶的状态,看看是否有水壶的状态发生了变化,变化则再去做相应的处理;AIO(读完了再通知我,异步非阻塞IO)的做法是在每个水壶上都安装一个装置,当水壶烧开之后就会自动通知我水壶烧开了让我做相应的处理;

如果还觉得理解起来有困难的童鞋建议阅读以下这篇文章,相信会有收获:http://loveshisong.cn/编程技术/2016-06-25-十分钟了解BIO-NIO-AIO.html

BIO、NIO、AIO适用场景分析

  • BIO方式适用于连接数目比较小且固定的架构,这种方式对服务器资源要求比较高,并发局限于应用中,JDK1.4以前的唯一选择,但程序直观简单易理解。
  • NIO方式适用于连接数目多且连接比较短(轻操作)的架构,比如聊天服务器,并发局限于应用中,编程比较复杂,JDK1.4开始支持。
  • AIO方式使用于连接数目多且连接比较长(重操作)的架构,比如相册服务器,充分调用OS参与并发操作,编程比较复杂,JDK7开始支持。

简单总结

这篇文章大量复制粘贴到《深入分析 Java Web 技术内幕》第二节“深入分析 Java I/O 的工作机制”的内容,没办法确实很多描述性的概念以及说明,自己的说明也没有达到用简单语言能描述复杂事物的程度..所以可能看起来这篇文章会有那么点儿难以下咽..我自己的话也是为了写着一篇文章查了很多资料,书也是翻了很多很多遍才对Java 中的 I/O 相关的知识有所熟悉,不过耗费的时间也是值得的,同时也希望观看文章的你能够有所收获,也欢迎各位指正!


欢迎转载,转载请注明出处!
简书ID:@我没有三颗心脏
github:wmyskxz
欢迎关注公众微信号:wmyskxz_javaweb
分享自己的Java Web学习之路以及各种Java学习资料
想要交流的朋友也可以加qq群:3382693

7月27日:CVTE一面 30分钟(挂)

  1. 自我介绍
  2. 有没有做过JavaWeb相关的项目?你觉得难点在哪里呢?
  3. 你这个博客系统有没有加权限系统?如果被拦截封包获取了账号密码怎么办?(没加,凉拌..)
  4. 用过事务吗?怎么用的举一个实际的例子?
  5. Spring中的@Transactional放在类级别和方法级别上有什么不同?(不知道..)
  6. 你对Java哪一个方面的知识熟悉?
  7. List/ Set/ Map有什么区别?
  8. 谈一下HashMap插入元素的过程?
  9. HashMap安全吗?那有安全的Map吗?
  10. 多线程并发有什么问题?刚才安全的Map是如何解决这个问题的?
  11. Java中实现多线程有哪些方式?

总体感觉CVTE一面还没开始问就已经结束了…所以面完感觉挺不好的,总觉得自己会凉凉…结果一查结果真凉了…懵逼…

7月30日:阿里一面 36分钟

  1. 自我介绍
  2. 面试官开始自己介绍他所属的部门然后平时的一些业务
  3. 说一下HashMap的底层结构?
  4. 为什么1.8要引入红黑树这种结构呢?
  5. HashMap线程安全吗?那有没有线程安全的Map?(ConcurrentHashMap还有一个Collections类中的静态内部类SynrhonizedMap)
  6. ConcurrentHashMap底层结构?
  7. 对JDK1.5的concurrent包了解吗?(不了解…)
  8. Java中有两种错误你能说一下嘛?(想不起具体名字来了..解释了半天..)
  9. 说下JVM的内存结构?(本地方法栈打死没想起来…)堆、栈分别存储什么信息?
  10. 说一下新生代和永久代有什么关系?
  11. 平时学校学什么课?算法和数据结构学过吧?
  12. 说一个你印象最深刻的数据结构,用来解决什么典型问题?(堆、TopK问题)
  13. 说一个典型的算法解决什么问题?再回答一个(答得不好…)
  14. 贪心算法有什么优点缺点?贪心算法可能不是最优解那什么算法能解决呢?(动态规划这个词老想不起来..)
  15. Spring了解吗?说一下装载Bean的过程?
  16. 设计模式了解吗?说几个常用的
  17. 装饰器模式和代理模式有什么区别?
  18. 看你做过这么多项目说一个你刚开始难以解决然后通过什么方式最终解决的?
  19. 有没有什么要问我的?
  20. 平时都怎么学习的?

正在寝室收拾东西的时候预约了6分钟后面试,那个激动..总的来说感觉答得不好,确实是没啥面试经验,有很多东西自己知道没有表达,不过回答的时候倒是挺有自信的,电话一挂脑子一去回想.药丸…不过面试官还算是比较好的…希望能过吧…

8月5日:阿里二面 60分钟

  1. 面试官开场白:介绍部门、业务,上一轮面试的大概情况,今天准备聊一聊编程能力和项目的一些问题;
  2. 面向对象三大特点?你怎么理解多态?Java中是怎么实现多态的?
  3. 序列化有了解过吗?
  4. Java网络I/O了解吗?(不是很了解..)HTTP三次握手和四次挥手的详细过程能说下吗?
  5. 设计题:一个网络聊天室有思路吗?不要求界面,只说思路(后来想了想回答的很一般..)
  6. ArrayList和LinkedList的区别?ArrayList是怎么扩容的?
  7. Java中有两种异常,你能说说嘛(感觉是看了上一个面试官的评价来看我复习没有的感觉..)?Exception又分为两种你知道吗?举一个RunTimeException你遇到过的?(我说的NullPointer)除了这个你再说一个?再举一个非RunTimeException?
  8. GC什么时候开始?我调用System.gc()能保证GC一定发生吗?
  9. 你说一下类加载器是怎么回事?(我说了一下双亲委派模型..但后来想面试官好像问的是怎么加载class?…)怎么实现自己的类加载器?
  10. 设计题:现在我的jar包在云上,怎么动态添加进我的项目中?(不知道..后来想了一下自己写ClassLoader应该可以..)
  11. 看你简历有前端的东西,那前端盒子模型有了解吗?
  12. 可以给margin或者padding这样的属性赋值为负数吗?
  13. 我想把border变成一个虚线该怎么写?(忘了具体的dashed,只记得border-style..)
  14. 平常都用什么数据库?(MySQL)那MySQL有两种引擎了解吗?有什么区别?(这个说得挺完整的)我要用SQL获得一个表级锁应该怎么写?(不了解..)我给一个表三个ABC列建了一个组合索引,我查询B会用到索引嘛?(不知道..)
  15. 事务了解吗?四大基本特性?什么是隔离性?数据库并发有几个隔离级别?(我说我从面临的问题开始说面试官说不用只用说几个级别就可以)MySQL默认级别?你确定?(确定..)
  16. 平时有用到什么框架?(Spring/ SpringMVC/ SpringBoot/ MyBatis)
  17. MyBatis问一个问题,在mapper文件中#{}和${}有什么区别?
  18. Spring有两大特性,你是怎么理解IoC控制反转的?AOP实现原理你了解吗?JDK代理为什么非要实现一个接口呢?(这个一时间忘了…)
  19. MVC你是怎么理解的?
  20. 什么时候需要重写equals()?什么时候需要重写hashCode()?
  21. 项目相关,你博客是从0到有的?为什么要造这个轮子?
  22. JavaWeb安全方面的问题,你这个留言啥的有没有啥问题?(XSS攻击)知道有问题为啥不搞一下?(我…)
  23. 我把面试结果报上去,一般两三天后出结果,拜拜;

因为是靠回忆,所以有些问题的顺序有点儿记不太清了,比较基础但是有些宽泛..问了我一些前端的问题..比较遗憾的是网络I/O这一块自己不是特别熟悉,然后那一道设计题自己没有回答得很好..面试官说看得出我基础还行就是深度不够,他说要看总体的面试情况如果靠前就安排下一轮面试,让我自己下去准备等通知,但总觉得他在套路我..早上转发了一只专属锦鲤,希望能带来好运吧..

8月13日:阿里三面(P9钉钉视频面) 120分钟左右

1.面试官让进一个链接做六道题,一个小时之后它来看代码,可以搜索资料啥的都可以,大概是这样六道题:①反转一串字符串;②从一串JSON字符串中提取所有的一个属性并输出;③统计一串字符串中的数字、英文字母、空格、其他字符的个数并打印输出;④计算N的阶乘;⑤数组排序算法随便写;⑥创建三个线程ABC,分别打印其线程名十次,并按照ABC的顺序执行;
2.最近的项目有没有什么想说的?印象深刻的?
3.项目有什么难点?怎么解决的?
4.设计模式有了解过哪些?模板模式应用于什么样的场景?
5.OSI七层模型知道吗?IP在哪一层?TCP在哪一层?
6.对称加密与非对称加密有了解吗?
7.谈谈对BIO/ NIO/ AIO的理解?(有点忘了..说的有点问题面试官给解释了一下..)分别用于什么样的场景?
8.SpringBoot你觉得对于MVC有什么不一样的地方,有什么优点?
9.平时怎么学习的?
10.有什么要问的..然后问了一些自己关心的问题,并让面试官给我一些建议,感觉自己凉凉..

第二天下午更新了状态显示已回绝了..

简单总结

其实自己投了蛮多公司的,但是目前接到电话并且面试的,就只有阿里和CVTE,特别是CVTE感觉还蛮效率的..投递了没过几天就打电话预约面试了,emm..虽然两家公司提前批的面试都挂了..但我已经重新申请了正常的网申流程..嘻嘻..感觉还是基础至上吧..而且的话,电话面试的经验也很重要,现在回想CVTE一面的时候确实自己也不知道是个啥状态..问题也没有回答很好..挂的那自然也是理所当然..对于阿里的话就比较可惜吧..特别是三面,问的问题都不是自己不知道的,而差不多都是自己有些遗忘或者不是很熟悉的..决定还是抓紧时间回炉锻造锻造..

不过幸运的是这只是提前批吧..自己还是有一些时间去准备,通过上面的四次面试也是知道了自己的不足,另外想说的一点是:举一反三的能力,阿里三面也跟面试官有聊到,就是一个问题,你回答的时候最好能够引申出其他相关的一些东西,换位思考的角度讲,面试官希望这样,而你又能很好的展现自己的能力和基础知识,特别担心那种自己明明知道却说不好的问题,或者是遗漏了一些点的问题,还是下来需要多花时间准备吧..加油..


欢迎转载,转载请注明出处!
简书ID:@我没有三颗心脏
github:wmyskxz
欢迎关注公众微信号:wmyskxz_javaweb
分享自己的Java Web学习之路以及各种Java学习资料
想要交流的朋友也可以加qq群:3382693

前言:把这段时间复习的关于集合类的东西整理出来,特别是HashMap相关的一些东西,之前都没有很注意1.7 ->> 1.8的变化问题,但后来发现这其实变化挺大的,而且很多整理的面试资料都没有更新(包括我之前整理的…)

1)说说常见的集合有哪些吧?

答:Map接口和Collection接口是所有集合框架的父接口:

  1. Collection接口的子接口包括:Set接口和List接口
  2. Map接口的实现类主要有:HashMap、TreeMap、Hashtable、ConcurrentHashMap以及Properties等
  3. Set接口的实现类主要有:HashSet、TreeSet、LinkedHashSet等
  4. List接口的实现类主要有:ArrayList、LinkedList、Stack以及Vector等

2)HashMap与HashTable的区别?

答:

  1. HashMap没有考虑同步,是线程不安全的;Hashtable使用了synchronized关键字,是线程安全的;

  2. HashMap允许K/V都为null;后者K/V都不允许为null;

  3. HashMap继承自AbstractMap类;而Hashtable继承自Dictionary类;


3)HashMap的put方法的具体流程?

图引用自:https://blog.csdn.net/u011240877/article/details/53358305

答:下面先来分析一下源码

final V putVal(int hash, K key, V value, boolean onlyIfAbsent,
               boolean evict) {
    HashMap.Node<K,V>[] tab; HashMap.Node<K,V> p; int n, i;
    // 1.如果table为空或者长度为0,即没有元素,那么使用resize()方法扩容
    if ((tab = table) == null || (n = tab.length) == 0)
        n = (tab = resize()).length;
    // 2.计算插入存储的数组索引i,此处计算方法同 1.7 中的indexFor()方法
    // 如果数组为空,即不存在Hash冲突,则直接插入数组
    if ((p = tab[i = (n - 1) & hash]) == null)
        tab[i] = newNode(hash, key, value, null);
    // 3.插入时,如果发生Hash冲突,则依次往下判断
    else {
        HashMap.Node<K,V> e; K k;
        // a.判断table[i]的元素的key是否与需要插入的key一样,若相同则直接用新的value覆盖掉旧的value
        // 判断原则equals() - 所以需要当key的对象重写该方法
        if (p.hash == hash &&
                ((k = p.key) == key || (key != null && key.equals(k))))
            e = p;
        // b.继续判断:需要插入的数据结构是红黑树还是链表
        // 如果是红黑树,则直接在树中插入 or 更新键值对
        else if (p instanceof HashMap.TreeNode)
            e = ((HashMap.TreeNode<K,V>)p).putTreeVal(this, tab, hash, key, value);
        // 如果是链表,则在链表中插入 or 更新键值对
        else {
            // i .遍历table[i],判断key是否已存在:采用equals对比当前遍历结点的key与需要插入数据的key
            //    如果存在相同的,则直接覆盖
            // ii.遍历完毕后任务发现上述情况,则直接在链表尾部插入数据
            //    插入完成后判断链表长度是否 > 8:若是,则把链表转换成红黑树
            for (int binCount = 0; ; ++binCount) {
                if ((e = p.next) == null) {
                    p.next = newNode(hash, key, value, null);
                    if (binCount >= TREEIFY_THRESHOLD - 1) // -1 for 1st
                        treeifyBin(tab, hash);
                    break;
                }
                if (e.hash == hash &&
                        ((k = e.key) == key || (key != null && key.equals(k))))
                    break;
                p = e;
            }
        }
        // 对于i 情况的后续操作:发现key已存在,直接用新value覆盖旧value&返回旧value
        if (e != null) { // existing mapping for key
            V oldValue = e.value;
            if (!onlyIfAbsent || oldValue == null)
                e.value = value;
            afterNodeAccess(e);
            return oldValue;
        }
    }
    ++modCount;
    // 插入成功后,判断实际存在的键值对数量size > 最大容量
    // 如果大于则进行扩容
    if (++size > threshold)
        resize();
    // 插入成功时会调用的方法(默认实现为空)
    afterNodeInsertion(evict);
    return null;
}

图片简单总结为:


4)HashMap的扩容操作是怎么实现的?

答:通过分析源码我们知道了HashMap通过resize()方法进行扩容或者初始化的操作,下面是对源码进行的一些简单分析:

/**
 * 该函数有2中使用情况:1.初始化哈希表;2.当前数组容量过小,需要扩容
 */
final Node<K,V>[] resize() {
    Node<K,V>[] oldTab = table;// 扩容前的数组(当前数组)
    int oldCap = (oldTab == null) ? 0 : oldTab.length;// 扩容前的数组容量(数组长度)
    int oldThr = threshold;// 扩容前数组的阈值
    int newCap, newThr = 0;

    if (oldCap > 0) {
        // 针对情况2:若扩容前的数组容量超过最大值,则不再扩容
        if (oldCap >= MAXIMUM_CAPACITY) {
            threshold = Integer.MAX_VALUE;
            return oldTab;
        }
        // 针对情况2:若没有超过最大值,就扩容为原来的2倍(左移1位)
        else if ((newCap = oldCap << 1) < MAXIMUM_CAPACITY &&
                oldCap >= DEFAULT_INITIAL_CAPACITY)
            newThr = oldThr << 1; // double threshold
    }

    // 针对情况1:初始化哈希表(采用指定或者使用默认值的方式)
    else if (oldThr > 0) // initial capacity was placed in threshold
        newCap = oldThr;
    else {               // zero initial threshold signifies using defaults
        newCap = DEFAULT_INITIAL_CAPACITY;
        newThr = (int)(DEFAULT_LOAD_FACTOR * DEFAULT_INITIAL_CAPACITY);
    }

    // 计算新的resize上限
    if (newThr == 0) {
        float ft = (float)newCap * loadFactor;
        newThr = (newCap < MAXIMUM_CAPACITY && ft < (float)MAXIMUM_CAPACITY ?
                (int)ft : Integer.MAX_VALUE);
    }
    threshold = newThr;
    @SuppressWarnings({"rawtypes","unchecked"})
    Node<K,V>[] newTab = (Node<K,V>[])new Node[newCap];
    table = newTab;
    if (oldTab != null) {
        // 把每一个bucket都移动到新的bucket中去
        for (int j = 0; j < oldCap; ++j) {
            Node<K,V> e;
            if ((e = oldTab[j]) != null) {
                oldTab[j] = null;
                if (e.next == null)
                    newTab[e.hash & (newCap - 1)] = e;
                else if (e instanceof TreeNode)
                    ((TreeNode<K,V>)e).split(this, newTab, j, oldCap);
                else { // preserve order
                    Node<K,V> loHead = null, loTail = null;
                    Node<K,V> hiHead = null, hiTail = null;
                    Node<K,V> next;
                    do {
                        next = e.next;
                        if ((e.hash & oldCap) == 0) {
                            if (loTail == null)
                                loHead = e;
                            else
                                loTail.next = e;
                            loTail = e;
                        }
                        else {
                            if (hiTail == null)
                                hiHead = e;
                            else
                                hiTail.next = e;
                            hiTail = e;
                        }
                    } while ((e = next) != null);
                    if (loTail != null) {
                        loTail.next = null;
                        newTab[j] = loHead;
                    }
                    if (hiTail != null) {
                        hiTail.next = null;
                        newTab[j + oldCap] = hiHead;
                    }
                }
            }
        }
    }
    return newTab;
}

5)HashMap是怎么解决哈希冲突的?

参考资料:https://juejin.im/post/5ab99afff265da23a2291dee

答:在解决这个问题之前,我们首先需要知道什么是哈希冲突,而在了解哈希冲突之前我们还要知道什么是哈希才行;

什么是哈希?

Hash,一般翻译为“散列”,也有直接音译为“哈希”的,这就是把任意长度的输入通过散列算法,变换成固定长度的输出,该输出就是散列值(哈希值);这种转换是一种压缩映射,也就是,散列值的空间通常远小于输入的空间,不同的输入可能会散列成相同的输出,所以不可能从散列值来唯一的确定输入值。简单的说就是一种将任意长度的消息压缩到某一固定长度的消息摘要的函数。

所有散列函数都有如下一个基本特性:根据同一散列函数计算出的散列值如果不同,那么输入值肯定也不同。但是,根据同一散列函数计算出的散列值如果相同,输入值不一定相同。

什么是哈希冲突?

当两个不同的输入值,根据同一散列函数计算出相同的散列值的现象,我们就把它叫做碰撞(哈希碰撞)。

HashMap的数据结构

在Java中,保存数据有两种比较简单的数据结构:数组和链表。数组的特点是:寻址容易,插入和删除困难;链表的特点是:寻址困难,但插入和删除容易;所以我们将数组和链表结合在一起,发挥两者各自的优势,使用一种叫做链地址法的方式可以解决哈希冲突:

这样我们就可以将拥有相同哈希值的对象组织成一个链表放在hash值所对应的bucket下,但相比于hashCode返回的int类型,我们HashMap初始的容量大小DEFAULT_INITIAL_CAPACITY = 1 << 4(即2的四次方16)要远小于int类型的范围,所以我们如果只是单纯的用hashCode取余来获取对应的bucket这将会大大增加哈希碰撞的概率,并且最坏情况下还会将HashMap变成一个单链表,所以我们还需要对hashCode作一定的优化

hash()函数

上面提到的问题,主要是因为如果使用hashCode取余,那么相当于参与运算的只有hashCode的低位,高位是没有起到任何作用的,所以我们的思路就是让hashCode取值出的高位也参与运算,进一步降低hash碰撞的概率,使得数据分布更平均,我们把这样的操作称为扰动,在JDK 1.8中的hash()函数如下:

static final int hash(Object key) {
    int h;
    return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16);// 与自己右移16位进行异或运算(高低位异或)
}

这比在JDK 1.7中,更为简洁,相比在1.7中的4次位运算,5次异或运算(9次扰动),在1.8中,只进行了1次位运算和1次异或运算(2次扰动);

JDK1.8新增红黑树

通过上面的链地址法(使用散列表)扰动函数我们成功让我们的数据分布更平均,哈希碰撞减少,但是当我们的HashMap中存在大量数据时,加入我们某个bucket下对应的链表有n个元素,那么遍历时间复杂度就为O(n),为了针对这个问题,JDK1.8在HashMap中新增了红黑树的数据结构,进一步使得遍历复杂度降低至O(logn);

总结

简单总结一下HashMap是使用了哪些方法来有效解决哈希冲突的:

1. 使用链地址法(使用散列表)来链接拥有相同hash值的数据;
2. 使用2次扰动函数(hash函数)来降低哈希冲突的概率,使得数据分布更平均;
3. 引入红黑树进一步降低遍历的时间复杂度,使得遍历更快;


6)HashMap为什么不直接使用hashCode()处理后的哈希值直接作为table的下标?

答:hashCode()方法返回的是int整数类型,其范围为-(2 ^ 31)(2 ^ 31 - 1),约有40亿个映射空间,而HashMap的容量范围是在16(初始化默认值)2 ^ 30,HashMap通常情况下是取不到最大值的,并且设备上也难以提供这么多的存储空间,从而导致通过hashCode()计算出的哈希值可能不在数组大小范围内,进而无法匹配存储位置;

面试官:那怎么解决呢?

答:

  1. HashMap自己实现了自己的hash()方法,通过两次扰动使得它自己的哈希值高低位自行进行异或运算,降低哈希碰撞概率也使得数据分布更平均;

  2. 在保证数组长度为2的幂次方的时候,使用hash()运算之后的值与运算(&)(数组长度 - 1)来获取数组下标的方式进行存储,这样一来是比取余操作更加有效率,二来也是因为只有当数组长度为2的幂次方时,h&(length-1)才等价于h%length,三来解决了“哈希值与数组大小范围不匹配”的问题;

面试官:为什么数组长度要保证为2的幂次方呢?

答:

  1. 只有当数组长度为2的幂次方时,h&(length-1)才等价于h%length,即实现了key的定位,2的幂次方也可以减少冲突次数,提高HashMap的查询效率;

  2. 如果 length 为 2 的次幂 则 length-1 转化为二进制必定是 11111……的形式,在于 h 的二进制与操作效率会非常的快,而且空间不浪费;如果 length 不是 2 的次幂,比如 length 为 15,则 length - 1 为 14,对应的二进制为 1110,在于 h 与操作,最后一位都为 0 ,而 0001,0011,0101,1001,1011,0111,1101 这几个位置永远都不能存放元素了,空间浪费相当大,更糟的是这种情况中,数组可以使用的位置比数组长度小了很多,这意味着进一步增加了碰撞的几率,减慢了查询的效率!这样就会造成空间的浪费。

面试官:那为什么是两次扰动呢?

答:这样就是加大哈希值低位的随机性,使得分布更均匀,从而提高对应数组存储下标位置的随机性&均匀性,最终减少Hash冲突,两次就够了,已经达到了高位低位同时参与运算的目的;


7)HashMap在JDK1.7和JDK1.8中有哪些不同?

答:

不同 JDK 1.7 JDK 1.8
存储结构 数组 + 链表 数组 + 链表 + 红黑树
初始化方式 单独函数:inflateTable() 直接集成到了扩容函数resize()
hash值计算方式 扰动处理 = 9次扰动 = 4次位运算 + 5次异或运算 扰动处理 = 2次扰动 = 1次位运算 + 1次异或运算
存放数据的规则 无冲突时,存放数组;冲突时,存放链表 无冲突时,存放数组;冲突 & 链表长度 < 8:存放单链表;冲突 & 链表长度 > 8:树化并存放红黑树
插入数据方式 头插法(先讲原位置的数据移到后1位,再插入数据到该位置) 尾插法(直接插入到链表尾部/红黑树)
扩容后存储位置的计算方式 全部按照原来方法进行计算(即hashCode ->> 扰动函数 ->> (h&length-1)) 按照扩容后的规律计算(即扩容后的位置=原位置 or 原位置 + 旧容量)

8)为什么HashMap中String、Integer这样的包装类适合作为K?

答:String、Integer等包装类的特性能够保证Hash值的不可更改性和计算准确性,能够有效的减少Hash碰撞的几率

  1. 都是final类型,即不可变性,保证key的不可更改性,不会存在获取hash值不同的情况
  2. 内部已重写了equals()hashCode()等方法,遵守了HashMap内部的规范(不清楚可以去上面看看putValue的过程),不容易出现Hash值计算错误的情况;

面试官:如果我想要让自己的Object作为K应该怎么办呢?

答:重写hashCode()equals()方法

  1. 重写hashCode()是因为需要计算存储数据的存储位置,需要注意不要试图从散列码计算中排除掉一个对象的关键部分来提高性能,这样虽然能更快但可能会导致更多的Hash碰撞;

  2. 重写equals()方法,需要遵守自反性、对称性、传递性、一致性以及对于任何非null的引用值x,x.equals(null)必须返回false的这几个特性,目的是为了保证key在哈希表中的唯一性


9)ConcurrentHashMap和Hashtable的区别?

答:ConcurrentHashMap 结合了 HashMap 和 HashTable 二者的优势。HashMap 没有考虑同步,HashTable 考虑了同步的问题。但是 HashTable 在每次同步执行时都要锁住整个结构。 ConcurrentHashMap 锁的方式是稍微细粒度的。

面试官:ConcurrentHashMap的具体实现知道吗?

参考资料:http://www.importnew.com/23610.html

答:在JDK1.7中,ConcurrentHashMap采用Segment + HashEntry的方式进行实现,结构如下:

  1. 该类包含两个静态内部类 HashEntry 和 Segment ;前者用来封装映射表的键值对,后者用来充当锁的角色;

  2. Segment 是一种可重入的锁 ReentrantLock,每个 Segment 守护一个HashEntry 数组里得元素,当对 HashEntry 数组的数据进行修改时,必须首先获得对应的 Segment 锁。

JDK1.8中,放弃了Segment臃肿的设计,取而代之的是采用Node + CAS + Synchronized来保证并发安全进行实现,结构如下:

插入元素过程(建议去看看源码):

  1. 如果相应位置的Node还没有初始化,则调用CAS插入相应的数据;
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
    if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
        break;                   // no lock when adding to empty bin
}
  1. 如果相应位置的Node不为空,且当前该节点不处于移动状态,则对该节点加synchronized锁,如果该节点的hash不小于0,则遍历链表更新节点或插入新节点;
if (fh >= 0) {
    binCount = 1;
    for (Node<K,V> e = f;; ++binCount) {
        K ek;
        if (e.hash == hash &&
            ((ek = e.key) == key ||
             (ek != null && key.equals(ek)))) {
            oldVal = e.val;
            if (!onlyIfAbsent)
                e.val = value;
            break;
        }
        Node<K,V> pred = e;
        if ((e = e.next) == null) {
            pred.next = new Node<K,V>(hash, key, value, null);
            break;
        }
    }
}
  1. 如果该节点是TreeBin类型的节点,说明是红黑树结构,则通过putTreeVal方法往红黑树中插入节点;如果binCount不为0,说明put操作对数据产生了影响,如果当前链表的个数达到8个,则通过treeifyBin方法转化为红黑树,如果oldVal不为空,说明是一次更新操作,没有对元素个数产生影响,则直接返回旧值;

  2. 如果插入的是一个新节点,则执行addCount()方法尝试更新元素个数baseCount;


10)Java集合的快速失败机制 “fail-fast”?

答:

是java集合的一种错误检测机制,当多个线程对集合进行结构上的改变的操作时,有可能会产生 fail-fast 机制。

例如:假设存在两个线程(线程1、线程2),线程1通过Iterator在遍历集合A中的元素,在某个时候线程2修改了集合A的结构(是结构上面的修改,而不是简单的修改集合元素的内容),那么这个时候程序就会抛出 ConcurrentModificationException 异常,从而产生fail-fast机制。

原因:迭代器在遍历时直接访问集合中的内容,并且在遍历过程中使用一个 modCount 变量。集合在被遍历期间如果内容发生变化,就会改变modCount的值。每当迭代器使用hashNext()/next()遍历下一个元素之前,都会检测modCount变量是否为expectedmodCount值,是的话就返回遍历;否则抛出异常,终止遍历。

解决办法:

1. 在遍历过程中,所有涉及到改变modCount值得地方全部加上synchronized。

2. 使用CopyOnWriteArrayList来替换ArrayList


11)ArrayList 和 Vector 的区别?

答:

这两个类都实现了 List 接口(List 接口继承了 Collection 接口),他们都是有序集合,即存储在这两个集合中的元素位置都是有顺序的,相当于一种动态的数组,我们以后可以按位置索引来取出某个元素,并且其中的数据是允许重复的,这是与 HashSet 之类的集合的最大不同处,HashSet 之类的集合不可以按索引号去检索其中的元素,也不允许有重复的元素。

ArrayList 与 Vector 的区别主要包括两个方面:

  1. 同步性:
    Vector 是线程安全的,也就是说它的方法之间是线程同步(加了synchronized 关键字)的,而 ArrayList 是线程不安全的,它的方法之间是线程不同步的。如果只有一个线程会访问到集合,那最好是使用 ArrayList,因为它不考虑线程安全的问题,所以效率会高一些;如果有多个线程会访问到集合,那最好是使用 Vector,因为不需要我们自己再去考虑和编写线程安全的代码。

  2. 数据增长:
    ArrayList 与 Vector 都有一个初始的容量大小,当存储进它们里面的元素的个人超过了容量时,就需要增加 ArrayList 和 Vector 的存储空间,每次要增加存储空间时,不是只增加一个存储单元,而是增加多个存储单元,每次增加的存储单元的个数在内存空间利用与程序效率之间要去的一定的平衡。Vector 在数据满时(加载因子1)增长为原来的两倍(扩容增量:原容量的 2 倍),而 ArrayList 在数据量达到容量的一半时(加载因子 0.5)增长为原容量的 (0.5 倍 + 1) 个空间。


12)ArrayList和LinkedList的区别?

答:

  1. LinkedList 实现了 List 和 Deque 接口,一般称为双向链表;ArrayList 实现了 List 接口,动态数组;
  2. LinkedList 在插入和删除数据时效率更高,ArrayList 在查找某个 index 的数据时效率更高;
  3. LinkedList 比 ArrayList 需要更多的内存;

面试官:Array 和 ArrayList 有什么区别?什么时候该应 Array 而不是 ArrayList 呢?

答:它们的区别是:

  1. Array 可以包含基本类型和对象类型,ArrayList 只能包含对象类型。
  2. Array 大小是固定的,ArrayList 的大小是动态变化的。
  3. ArrayList 提供了更多的方法和特性,比如:addAll(),removeAll(),iterator() 等等。

对于基本类型数据,集合使用自动装箱来减少编码工作量。但是,当处理固定大小的基本数据类型的时候,这种方式相对比较慢。


13)HashSet是如何保证数据不可重复的?

答:HashSet的底层其实就是HashMap,只不过我们HashSet是实现了Set接口并且把数据作为K值,而V值一直使用一个相同的虚值来保存,我们可以看到源码:

public boolean add(E e) {
    return map.put(e, PRESENT)==null;// 调用HashMap的put方法,PRESENT是一个至始至终都相同的虚值
}

由于HashMap的K值本身就不允许重复,并且在HashMap中如果K/V相同时,会用新的V覆盖掉旧的V,然后返回旧的V,那么在HashSet中执行这一句话始终会返回一个false,导致插入失败,这样就保证了数据的不可重复性;


14)BlockingQueue是什么?

答:

Java.util.concurrent.BlockingQueue是一个队列,在进行检索或移除一个元素的时候,它会等待队列变为非空;当在添加一个元素时,它会等待队列中的可用空间。BlockingQueue接口是Java集合框架的一部分,主要用于实现生产者-消费者模式。我们不需要担心等待生产者有可用的空间,或消费者有可用的对象,因为它都在BlockingQueue的实现类中被处理了。Java提供了集中BlockingQueue的实现,比如ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue,、SynchronousQueue等。


欢迎转载,转载请注明出处!
简书ID:@我没有三颗心脏
github:wmyskxz
欢迎关注公众微信号:wmyskxz_javaweb
分享自己的Java Web学习之路以及各种Java学习资料
想要交流的朋友也可以加qq群:3382693

前言:题图无关,接下来开始简单学习学习优先队列和堆的相关数据结构的知识;

前序文章:

什么是优先队列?

听这个名字就能知道,优先队列也是一种队列,只不过不同的是,优先队列的出队顺序是按照优先级来的;在有些情况下,可能需要找到元素集合中的最小或者最大元素,可以利用优先队列ADT来完成操作,优先队列ADT是一种数据结构,它支持插入和删除最小值操作(返回并删除最小元素)或删除最大值操作(返回并删除最大元素);

这些操作等价于队列的enQueuedeQueue操作,区别在于,对于优先队列,元素进入队列的顺序可能与其被操作的顺序不同,作业调度是优先队列的一个应用实例,它根据优先级的高低而不是先到先服务的方式来进行调度;

如果最小键值元素拥有最高的优先级,那么这种优先队列叫作升序优先队列(即总是先删除最小的元素),类似的,如果最大键值元素拥有最高的优先级,那么这种优先队列叫作降序优先队列(即总是先删除最大的元素);由于这两种类型时对称的,所以只需要关注其中一种,如升序优先队列;

优先队列ADT

下面操作组成了优先队列的一个ADT;

1.优先队列的主要操作
优先队列是元素的容器,每个元素有一个相关的键值;

  • insert(key, data):插入键值为key的数据到优先队列中,元素以其key进行排序;
  • deleteMin/deleteMax:删除并返回最小/最大键值的元素;
  • getMinimum/getMaximum:返回最小/最大剑指的元素,但不删除它;

2.优先队列的辅助操作

  • 第k最小/第k最大:返回优先队列中键值为第k个最小/最大的元素;
  • 大小(size):返回优先队列中的元素个数;
  • 堆排序(Heap Sort):基于键值的优先级将优先队列中的元素进行排序;

优先队列的应用

  • 数据压缩:赫夫曼编码算法;
  • 最短路径算法:Dijkstra算法;
  • 最小生成树算法:Prim算法;
  • 事件驱动仿真:顾客排队算法;
  • 选择问题:查找第k个最小元素;
  • 等等等等….

优先队列的实现比较

实现 插入 删除 寻找最小值
无序数组 1 n n
无序链表 1 n n
有序数组 n 1 1
有序链表 n 1 1
二叉搜索树 logn(平均) logn(平均) logn(平均)
平衡二叉搜索树 logn logn logn
二叉堆 logn logn 1

堆和二叉堆

什么是堆

堆是一颗具有特定性质的二叉树,堆的基本要求就是堆中所有结点的值必须大于或等于(或小于或等于)其孩子结点的值,这也称为堆的性质;堆还有另一个性质,就是当 h > 0 时,所有叶子结点都处于第 h 或 h - 1 层(其中 h 为树的高度,完全二叉树),也就是说,堆应该是一颗完全二叉树;

在下面的例子中,左边的树为堆(每个元素都大于其孩子结点的值),而右边的树不是堆(因为5大于其孩子结点2)

二叉堆

在二叉堆中,每个结点最多有两个孩子结点,在实际应用中,二叉堆已经足够满足需求,因此接下来主要讨论二叉最小堆和二叉最大堆;

堆的表示:在描述堆的操作前,首先来看堆是怎样表示的,一种可能的方法就是使用数组,因为堆在形式上是一颗完全二叉树,用数组来存储它不会浪费任何空间,例如下图:

用数组来表示堆不仅不会浪费空间还具有一定的优势:

  • 每个结点的左孩子为下标i的2倍:left child(i) = i * 2;每个结点的右孩子为下标i的2倍加1:right child(i) = i * 2 + 1
  • 每个结点的父亲结点为下标的二分之一:parent(i) = i / 2,注意这里是整数除,2和3除以2都为1,大家可以验证一下;
  • 注意:这里是把下标为0的地方空出来了的,主要是为了方便理解,如果0不空出来只需要在计算的时候把i值往右偏移一个位置就行了(也就是加1,大家可以试试,下面的演示也采取这样的方式);

二叉堆的相关操作

堆的基本结构

public class MaxHeap<E extends Comparable<E>> {
    private Array<E> data;
    public MaxHeap(int capacity){ data = new Array<>(capacity); }
    public MaxHeap(){ data = new Array<>(); }
    // 返回堆中的元素个数
    public int size(){ return data.getSize(); }
    // 返回一个布尔值, 表示堆中是否为空
    public boolean isEmpty(){ return data.isEmpty(); }
    // 返回完全二叉树的数组表示中,一个索引所表示的元素的父亲节点的索引
    private int parent(int index){
        if(index == 0)
            throw new IllegalArgumentException("index-0 doesn't have parent.");
        return (index - 1) / 2;
    }
    // 返回完全二叉树的数组表示中,一个索引所表示的元素的左孩子节点的索引
    private int leftChild(int index){ return index * 2 + 1; }
    // 返回完全二叉树的数组表示中,一个索引所表示的元素的右孩子节点的索引
    private int rightChild(int index){ return index * 2 + 2; }
}

向堆中添加元素和Sift Up

当插入一个元素到堆中时,它可能不满足堆的性质,在这种情况下,需要调整堆中元素的位置使之重新变成堆,这个过程称为堆化(heapifying);在最大堆中,要堆化一个元素,需要找到它的父亲结点,如果不满足堆的基本性质则交换两个元素的位置,重复该过程直到每个结点都满足堆的性质为止,下面我们来模拟一下这个过程:

下面我们在该堆中插入一个新的元素26:

我们通过索引(上面的公式)可以很容易地找到新插入元素的父亲结点,然后比较它们的大小,如果新元素更大则交换两个元素的位置,这个操作就相当于把该元素上浮了一下:

重复该操作直到26到了一个满足堆条件的位置,此时就完成了插入的操作:

对应的代码如下:

// 向堆中添加元素
public void add(E e){
    data.addLast(e);
    siftUp(data.getSize() - 1);
}

private void siftUp(int k){

    while(k > 0 && data.get(parent(k)).compareTo(data.get(k)) < 0 ){
        data.swap(k, parent(k));
        k = parent(k);
    }
}

取出堆中的最大元素和Sift Down

如果理解了上述的过程,那么取出堆中的最大元素(堆顶元素)将变得容易,不过这里运用到一个小技巧,就是用最后一个元素替换掉栈顶元素,然后把最后一个元素删除掉,这样一来元素的总个数也满足条件,然后只需要把栈顶元素依次往下调整就好了,这个操作就叫做Sift Down(下沉):

用最后元素替换掉栈顶元素,然后删除最后一个元素:

然后比较其孩子结点的大小:

如果不满足堆的条件,那么就跟孩子结点中较大的一个交换位置:

重复该步骤,直到16到达合适的位置:

完成取出最大元素的操作:

对应的代码如下:

// 看堆中的最大元素
public E findMax(){
    if(data.getSize() == 0)
        throw new IllegalArgumentException("Can not findMax when heap is empty.");
    return data.get(0);
}

// 取出堆中最大元素
public E extractMax(){

    E ret = findMax();

    data.swap(0, data.getSize() - 1);
    data.removeLast();
    siftDown(0);

    return ret;
}

private void siftDown(int k){

    while(leftChild(k) < data.getSize()){
        int j = leftChild(k); // 在此轮循环中,data[k]和data[j]交换位置
        if( j + 1 < data.getSize() &&
                data.get(j + 1).compareTo(data.get(j)) > 0 )
            j ++;
        // data[j] 是 leftChild 和 rightChild 中的最大值

        if(data.get(k).compareTo(data.get(j)) >= 0 )
            break;

        data.swap(k, j);
        k = j;
    }
}

Replace 和 Heapify

Replace这个操作其实就是取出堆中最大的元素之后再新插入一个元素,常规的做法是取出最大元素之后,再利用上面的插入新元素的操作对堆进行Sift Up操作,但是这里有一个小技巧就是直接使用新元素替换掉堆顶元素,之后再进行Sift Down操作,这样就把两次O(logn)的操作变成了一次O(logn):

// 取出堆中的最大元素,并且替换成元素e
public E replace(E e){

    E ret = findMax();
    data.set(0, e);
    siftDown(0);
    return ret;
}

Heapify翻译过来就是堆化的意思,就是将任意数组整理成堆的形状,通常的做法是遍历数组从0开始添加创建一个新的堆,但是这里存在一个小技巧就是把当前数组就看做是一个完全二叉树,然后从最后一个非叶子结点开始进行Sift Down操作就可以了,最后一个非叶子结点也很好找,就是最后一个结点的父亲结点,大家可以验证一下:

从22这个节点开始,依次开始Sift Down操作:

重复该过程直到堆顶元素:

完成堆化操作:

将n个元素逐个插入到一个空堆中,算法复杂度是O(nlogn),而heapify的过程,算法复杂度为O(n),这是有一个质的飞跃的,下面是代码:

public MaxHeap(E[] arr){
    data = new Array<>(arr);
    for(int i = parent(arr.length - 1) ; i >= 0 ; i --)
        siftDown(i);
}

基于堆的优先队列

首先我们的队列仍然需要继承我们之前将队列时候声明的哪个接口Queue,然后实现这个接口中的方法就可以了,之类简单写一下:

public class PriorityQueue<E extends Comparable<E>> implements Queue<E> {

    private MaxHeap<E> maxHeap;

    public PriorityQueue(){ maxHeap = new MaxHeap<>(); }
    @Override
    public int getSize(){ return maxHeap.size(); }
    @Override
    public boolean isEmpty(){ return maxHeap.isEmpty(); }
    @Override
    public E getFront(){ return maxHeap.findMax(); }
    @Override
    public void enqueue(E e){ maxHeap.add(e); }
    @Override
    public E dequeue(){ return maxHeap.extractMax(); }
}

Java中的PriorityQueue

在Java中也实现了自己的优先队列java.util.PriorityQueue,与我们自己写的不同之处在于,Java中内置的为最小堆,然后就是一些函数名不一样,底层还是维护了一个Object类型的数组,大家可以戳戳看有什么不同,另外如果想要把最小堆变成最大堆可以给PriorityQueue传入自己的比较器,例如:

// 默认为最小堆
PriorityQueue<Integer> pq = new PriorityQueue<>();

pq.add(5);
pq.add(2);
pq.add(1);
pq.add(10);
pq.add(3);

while (!pq.isEmpty()) {
    System.out.println(pq.poll() + ", ");
}
System.out.println();
System.out.println("————————————————————————");

// 使用Lambda表达式传入自己的比较器转换成最大堆
PriorityQueue<Integer> pq2 = new PriorityQueue<>((a, b) -> b - a);
pq2.add(5);
pq2.add(2);
pq2.add(1);
pq2.add(10);
pq2.add(3);

while (!pq2.isEmpty()) {
    System.out.println(pq2.poll() + ", ");
}

LeetCode相关题目整理

23. 合并K个排序链表

参考答案:(85ms)

public ListNode mergeKLists(ListNode[] lists) {
    if (lists == null || lists.length == 0) return null;

    PriorityQueue<ListNode> q = new PriorityQueue<>(Comparator.comparing(node -> node.val));
    for (int i = 0; i < lists.length; i++) {
        if (lists[i] != null) {
            q.add(lists[i]);
        }
    }

    ListNode dummy = new ListNode(0);
    ListNode tail = dummy;

    while (!q.isEmpty()) {
        tail.next = q.poll();
        tail = tail.next;
        if (tail.next != null) {
            q.add(tail.next);
        }
    }

    return dummy.next;
}

215. 数组中的第K个最大元素

我的答案:(75ms)

public int findKthLargest(int[] nums, int k) {

    // 正确性判断
    if (0 == nums.length || null == nums || k <= 0 || k > nums.length) {
        return -1;
    }

    // 构造优先队列,默认为最小堆,传入自定义的比较器转换成最大堆
    PriorityQueue<Integer> pq = new PriorityQueue<>((a, b) -> b - a);
    for (Integer num : nums) {
        pq.add(num);
    }
    for (int i = 0; i < k - 1; i++) {
        pq.remove();
    }
    return pq.peek();
}

参考答案:(5ms)

public int findKthLargest(int[] nums, int k) {
    if (nums.length == 1) {
        return nums[0];
    }

    int max = nums[0];
    int min = nums[0];

    for (int i : nums) {
        max = i > max ? i : max;
        min = i < min ? i : min;
    }

    int[] arrs = new int[max - min + 1];

    for (int i : nums) {
        arrs[max - i]++;
    }

    int pos = 0;
    for (int i = 0; i < arrs.length; i++) {
        pos += arrs[i];
        if (pos >= k) {
            return max - i;
        }
    }

    return nums[0];
}

还看到一个简单粗暴的,也是服了:(4ms)

public int findKthLargest(int[] nums, int k) {
    Arrays.sort(nums);
    return nums[nums.length - k];
}

而且我随机生成了一个100万数据的随机数组,来测试这个简单粗暴的方法的效率,发现当数据量上去之后,排序这个操作变得繁琐,我自己测试的时候,上面三个方法,第三个大概比第一个(我自己写的方法)多花仅4倍的时间;

239. 滑动窗口最大值(类似剑指Offer面试题59)

参考答案:(88ms)

public int[] maxSlidingWindow(int[] nums, int k) {
    if (nums == null || k <= 0) return new int[0];
    int[] res = new int[nums.length - k + 1];
    ArrayDeque<Integer> deque = new ArrayDeque<Integer>();

    int index = 0;
    for (int i = 0; i < nums.length; i++) {
        while (!deque.isEmpty() && deque.peek() < i - k + 1) // Ensure deque's size doesn't exceed k
            deque.poll();

        // Remove numbers smaller than a[i] from right(a[i-1]) to left, to make the first number in the deque the largest one in the window         
        while (!deque.isEmpty() && nums[deque.peekLast()] < nums[i])
            deque.pollLast();

        deque.offer(i);// Offer the current index to the deque's tail

        if (i >= k - 1)// Starts recording when i is big enough to make the window has k elements 
            res[index++] = nums[deque.peek()];
    }
    return res;
}

参考答案2:(9ms)

public int[] maxSlidingWindow(int[] nums, int k) {
/*
思想:依次遍历数组,有效范围在长度k内寻找当前最大值,在用result数组来依次存储当前长度K内的最大值;
     若在当前轮中出现新增的nums[end]大于curMax,直接替换即可;
     如果当前轮curMax不是新增的nums[end],在新的范围内重置curMax.
*/
    if (nums.length == 0 || k <= 0)
        return new int[0];
    int curMax = Integer.MIN_VALUE;
    for (int i = 0; i < k; i++) {
        if (nums[i] > curMax)
            curMax = nums[i];
    }
    int[] ans = new int[nums.length - k + 1];
    ans[0] = curMax;

    for (int start = 1; start + k - 1 < nums.length; start++) {
        int end = start + k - 1;
        if (nums[end] > curMax)
            curMax = nums[end];
        else if (nums[start - 1] == curMax) {//新增的不大于curMax,新范围内重置
            curMax = Integer.MIN_VALUE;
            for (int i = start; i <= end; i++) {
                if (nums[i] > curMax)
                    curMax = nums[i];
            }
        }
        ans[start] = curMax;
    }
    return ans;
}

264. 丑数 II(剑指Offer面试题49)

参考答案:(7ms)

public int nthUglyNumber(int n) {
    // 正确性判断
    if (n < 1 || n > 1690) {
        return -1;
    }
    int[] ugly = new int[n];
    ugly[0] = 1;
    int index2 = 0, index3 = 0, index5 = 0;
    int factor2 = 2, factor3 = 3, factor5 = 5;
    for (int i = 1; i < n; i++) {
        int min = Math.min(Math.min(factor2, factor3), factor5);
        ugly[i] = min;
        if (factor2 == min)
            factor2 = 2 * ugly[++index2];
        if (factor3 == min)
            factor3 = 3 * ugly[++index3];
        if (factor5 == min)
            factor5 = 5 * ugly[++index5];
    }
    return ugly[n - 1];
}

如果采用逐个判断每个整数是不是丑数的解法,直观但不够高效,所以我们就需要换一种思路,我的第一反应就是这其中一定有什么规律,但是尝试着找了一下,没找到…看了看答案才幡然醒悟,前面提到的算法之所以效率低,很大程度上是因为不管一个数是不是丑数,我们都要对它进行计算,接下来我们试着找到一种只计算丑数的方法,而不在非丑数的整数上花费时间,根据丑数的定义,丑数应该是另一个丑数乘以2、3或者5的结果(1除外),因此,我们可以创建一个数组,里面的数字是排好序的丑数,每个丑数都是前面的丑数乘以2、3或者5得到的,也就是上面的算法了..

295.数据流的中位数(剑指Offer面试题41)

参考答案:(219ms)

public class MedianFinder {

    PriorityQueue<Integer> maxHeap;
    PriorityQueue<Integer> minHeap;

    /**
     * initialize your data structure here.
     */
    public MedianFinder() {
        maxHeap = new PriorityQueue<>(Collections.reverseOrder());
        minHeap = new PriorityQueue<>();
    }

    public void addNum(int num) {
        maxHeap.add(num);
        minHeap.add(maxHeap.poll());
        if (minHeap.size() - maxHeap.size() > 0) {
            maxHeap.add(minHeap.poll());
        }
    }

    public double findMedian() {
        if (maxHeap.size() == minHeap.size()) {
            return (maxHeap.peek() + minHeap.peek()) / 2.0;
        } else {
            return maxHeap.peek();
        }
    }
}

思路:这道题的实现思路有很多,比如我们可以在插入的时候就将每个元素插入到正确的位置上,这样返回中位数的时候就会是一个O(1)的操作,下面列举一张表来说明不同实现的复杂度具体是多少:
数据结构 | 插入的时间复杂度 | 得到中位数的时间复杂度
:– | :– | :–
没有排序的数组 | O(1) | O(n)
排序的数组 | O(n) | O(1)
排序的链表 | O(n) | O(1)
二叉搜索树 | 平均O(logn),最差O(n) | 平均O(logn),最差O(n)
AVL树 | O(logn) | O(logn)
最大堆和最小堆 | O(logn) | O(logn)

AVL树是一种很高效的数据结构,但是在大多数的语言中都没有现成的实现,所以考虑用最大堆和最小堆,对于一个已经排好序的数据容器,我们可以从中间分开分成两个部分,其中拿P1指向左半部分最大的元素,拿P2指向有半部分最小的元素,如果能够保证数据容器左边的数据都小于右边的数据,那么即使左、右两边内部的数据没有排序,我们仍然可以根据左边最大的数和右边最大的数得到中位数:

如何快速从一个数据容器中找出最大数呢?我们可以使用最大堆来实现这个数据容器,因为堆顶的元素就是最大的元素;同样我们可以使用最小堆来快速找出一个数据容器中最小数。因此按照这个思路我们就可以使用一个最大堆实现左边的数据容器,使用一个最小堆实现右边的数据容器,但是需要注意的是这两个容器的大小差值不能超过1;

347. 前K个高频元素(类似剑指Offer面试题40)

参考答案:(131ms)

public List<Integer> topKFrequent(int[] nums, int k) {
    TreeMap<Integer, Integer> map = new TreeMap<>();
    // 保存频率
    for (int num : nums) {
        if (map.containsKey(num)) {
            map.put(num, map.get(num) + 1);
        } else {
            map.put(num, 1);
        }
    }

    PriorityQueue<Integer> pq = new PriorityQueue<>(Comparator.comparingInt(map::get));
    for (int key : map.keySet()) {
        if (pq.size() < k) {
            pq.add(key);
        } else if (map.get(key) > map.get(pq.peek())) {
            pq.remove();
            pq.add(key);
        }
    }

    LinkedList<Integer> res = new LinkedList<>();
    while (!pq.isEmpty()) {
        res.add(pq.remove());
    }
    return res;
}

692. 前K个高频单词

参考答案:(72ms)

public List<String> topKFrequent(String[] words, int k) {
    Map<String, Integer> count = new HashMap();
    for (String word: words) {
        count.put(word, count.getOrDefault(word, 0) + 1);
    }
    List<String> candidates = new ArrayList(count.keySet());
    Collections.sort(candidates, (w1, w2) -> count.get(w1).equals(count.get(w2)) ?
            w1.compareTo(w2) : count.get(w2) - count.get(w1));

    return candidates.subList(0, k);
}

这道题类似于上面的第347题,但是问题出在返回的顺序上,需要自己来定义一个比较器来排序..然后也学到一个写法,就是上面的第一个for循环里,getOrDefault()方法,get√..

参考答案2:(91ms)

public List<String> topKFrequent(String[] words, int k) {
    Map<String, Integer> count = new HashMap();
    for (String word: words) {
        count.put(word, count.getOrDefault(word, 0) + 1);
    }
    PriorityQueue<String> heap = new PriorityQueue<String>(
            (w1, w2) -> count.get(w1).equals(count.get(w2)) ?
                    w2.compareTo(w1) : count.get(w1) - count.get(w2) );

    for (String word: count.keySet()) {
        heap.offer(word);
        if (heap.size() > k) heap.poll();
    }

    List<String> ans = new ArrayList();
    while (!heap.isEmpty()) ans.add(heap.poll());
    Collections.reverse(ans);
    return ans;
}

这个解法就有点儿类似于上面的347题,其实是大同小异,就是自己不会灵活使用比较器而已,学习到了学习到了√…


简单总结

今天算是很有收获的一天,因为这两种数据结构都是自己特别不熟悉的,特别是在刷了一些LeetCode相关题目之后,对这两种数据有了很不一样的认识,特别是堆的应用,这是一种特别适合用来找第k小/大的特殊的数据结构,并且在Java中居然有直接的实现,这可太棒了,而且今天的效率还算挺高的,满足;

欢迎转载,转载请注明出处!
简书ID:@我没有三颗心脏
github:wmyskxz
欢迎关注公众微信号:wmyskxz_javaweb
分享自己的Java Web学习之路以及各种Java学习资料