Kafka核心设计与实践原理总结-进阶篇
Kafka核心设计与实践原理总结进阶篇 kafka作为当前热门的分布式消息队列,具有高性能、持久化、多副本备份、横向扩展能力。我学习了深入理解Kafka:核心设计与实践原理总结一书后,对其中主要的知识点进行了总结,便于理解和掌握kafka的原理和应用。在这里分享出来,希望也能帮助到大家。总结的知识点分为两部分:· 基础篇:基本概念、生产者和消费者的使用和原理,以及主题和分区的管理· 进阶篇:深入解析kafka服务端(broker)、客户端的进阶原理(包括重分配、事务等)、kafka的高级应用(点击跳转)五、日志存储1.文件目录布局· 一个分区副本对应一个日志(Log),一个日志会分配成多个日志分段(LogSegment),Log在物理上以文件夹形式存储,而LogSegment对应磁盘上的一个日志文件和2个索引文件及可能的其他文件 o· 向Log追加消息时是顺序写入的,只有最后一个LogSegment才能执行写入,称为activeSegment,满足一定条件时,需要创建新的activeSegment· 每个日志及索引的文件名根据基准偏移量(BaseOffset)命名,表示当前LogSegment中第一条消息的offset· broker配置了多个根目录时,会挑选分区数最少的根目录来创建主题2.日志格式· 消息压缩 o Kafka 会将多条消息一起进行压缩,生产者发送的压缩数据在 broker 中也是保持压缩状态进行存储的 ,消费者从服务端获取的也是压缩的消息,消费者在处理消息之前才会解压消息。o 压缩方式通过compression.type来配置:producer、gzip、snappy、lz4、uncmpressedo 消息压缩时,整个消息集压缩为内层消息,整体作为外层消息的value。外层消息的offset保存了内层消息最后一条记录的offset,而内层消息在压缩时会从0开始分配一个offset,内层消息的offset会在服务端进行转换。 §· v2版本消息结构 oo 使用了变长整型(Varints)来保存数值3.日志索引· 偏移量索引(.index) o 每个索引占8个字节,分为两个部分: § relativeOffset:相对偏移量,表示消息相对于baseOffset的偏移量,当前索引文件的文件名即为baseOffset值§ position:消息在日志分段文件中的物理地址· 时间戳索引(.timeindex) o 每个索引占12个字节,分为两个部分: § timestamp:当前日志分段最大的时间戳§ relativeOffset:时间戳对应的消息的相对偏移量4.日志清理· 可以通过broker端参数log.cleanup.policy设置日志清理策略(默认delete)· 两种清理策略: o 日志删除:按照一定保留策略直接删除不符合条件的日志分段 § 设置log.cleanup.policy为deleteo 日志压缩:针对每个消息的key进行整合,对于有相同key的不同value值,只保留最后一个版本 § 设置log.cleanup.policy为compact,并且将log.leaner.enable设置为true(默认true)o 可同时使用日志删除和日志压缩两种策略· 日志删除: o Kafka的日志管理器中有一个专门的日志删除任务来周期性地检测和删除不符合保留条件的日志分段文件,周期通过 broker 端参数 log.retention.check.interval.ms来配置(默认300000,5分钟)o 日志分段保留策略有3种: § 基于时间的保留策略: § 通过log.retention.hours、log.retention.minutes和log.retention.ms来配置超时清理阈值 § 优先级ms>minutes>hours(默认log.retention.hours=168,7天)§ 基于日志大小的保留策略: § 通过log.retention.bytes配置Log日志总大小阈值(默认-1,无穷大)§ 通过log.segment.bytes配置日志分段文件大小阈值(默认1G)§ 基于日志起始偏移量的保留策略: § 某日志分段的下一个日志分段的起始偏移量baseOffset是否小于等于logStartOffset,若是则可以删除此日志分段§ logStartOffset 的值可以通过DeleteRecordsRequest请求(比如使用KafkaAdminClient的deleteRecords()方法、使用kafka-delete-records.sh脚本、日志的清理和截断等操作进行修改o 删除日志分段时,首先会从Log对象中所维护日志分段的跳表中移出待删除分段,以保证没有现成对其进行读取,然后将对应文件加上.deleted后缀,最后由名为delete-file的延迟任务来删除文件 § 删除任务延迟时间通过file.delete.delay.ms配置(默认60000,1分钟)· 日志压缩(Log Compaction) o Log Compaction对于有相同key的不同value值,只保留最后一个版本。o 每个日志目录下都有一个名为“cleaner-offset-checkpoint”的清理检查点文件,用来记录每个主题的每个分区中己清理的偏移量。通过清理检查点文件可以将 Log 分成两个部分。通过检查点cleaner checkpoint来划分出 一个己经清理过的clean部分和一个还未清理过的dirty部分。o 注意Log Compaction是针对key的,所以在使用时应注意每个消息的key值不为null。每个broker会启动log.cleaner.thread(默认1)个日志清理线程负责执行清理任务, 这些线程会选择“污浊率”最高的日志文件进行清理。 § 污浊率:dirtyRatio = dirtyBytes / ( cleanBytes + dirtyBytes )o 为了防止日志不必要的频繁清理操作,使用参数log.cleaner.min.cleanable.ratio(默认0.5)来限定可进行清理操作的最小污浊率。 Kafka 中用于保存消费者消费位移的主题consumer_offsets使用的就是Log Compaction策略o 每个日志清理线程会使用一个名为“SkimpyOffsetMap”的对象来构建key与offset的映射关系的哈希表。日志清理需要遍历两次日志文件,第一次遍历把每个 key 的哈希值和最后出现的offset都保存在 SkimpyOffsetMap 中,第二次遍历会检查每个消息是否符合保留条件,符合就保留下来,否则就会被清理.o 墓碑消息(tombstone): § 如果一条消息的key不为null,但是其value为null,那么此消息就是墓碑消息。日志清理线程发现墓碑消息时会先进行常规的清理,并保留墓碑消息一段时间。墓碑消息的保留条件是所在的日志分段的最近修改时间 lastModifiedTime大于deleteHorizonMs。deleteHorizonMs为clean部分中最后一个日志分段的最近修改时间减去保留闽值deleteRetionMs(通过 broker 端参数log.cleaner.delete.retention.ms配置,默认86400000,即24小时)5.磁盘存储· 页缓存(pagecache) o 页缓存是操作系统实现的一种主要的磁盘缓存,用来减少对磁盘I/O的操作。具体就是把磁盘中的数据缓存到内存中,把对磁盘的访问变成对内存的访问。 § 读取:操作系统会先查看数据所在的页(page)是否在页缓存中,如果存在则直接返回,不存在则向磁盘发起读取请求,并将数据存入页缓存§ 写入:查看数据所在的页(page)是否在页缓存中,存在则直接修改页缓存,不存在则在页缓存中添加相应的页再写入。被修改过的页变成了脏页,操作系统会在合适的时间把脏页数据写入磁盘以保持数据一致性。o Kafka大量使用了页缓存,这是实现高吞吐的重要因素之一。Kafka提供了同步刷盘及间断性强制刷盘的功能,但并不推荐使用。· 磁盘I/O流程 o 从编程角度而言,一般I/O场景有以下4种,他们的数据流为: § 用户调用标准C库进行I/O操作:用户程序buffer->C库标准IObuffer->文件系统页缓存->通过具体文件系统到磁盘§ 用户调用文件I/O:用户程序buffer->文件系统页缓存->通过具体文件系统到磁盘§ 用户打开文件时使用O_DIRECT,绕过页缓存直接写磁盘§ 用户使用类似dd工具,使用direct参数,绕过系统cache与文件系统直接写磁盘o 最长链路数据流图示:oo 针对不同应用场景,I/O调度策略也会影响I/O读写性能,目前Linux提供4中调度策略:NOOP、CFQ(默认)、DEADLINE、ANTICIPATORY。· 零拷贝(Zero-Copy) o 所谓的零拷贝是指将数据直接从磁盘文件复制到网卡设备中,而不需要经由应用程序之手。零拷贝大大提高了应用程序的性能,减少了内核和用户模式之间的上下文切换。 对 Linux 操作系统而言,零拷贝技术依赖于底层的sendfile()方法实现。对应于Java 语言,FileChannal.transferTo()的底层实现就是sendfile()。o 零拷贝和非零拷贝对比 §六、深入服务端1.协议设计· Kafka自定义了一组基于TCP的二进制协议,用于实现各种消息相关操作· 协议基本结构 ooo 不同的api(PRODUCE、FETCH等),RequestBody和ResponseBody结构也不同2.时间轮(TimingWheel)· Kafka基于时间轮概念自定义实现了一个用于延时功能的定时器(SystemTitmer) oo 每个时间格代表基本时间跨度(titkMs),时间轮时间格个数(wheelSIze)是固定的。o currentTime将时间轮划分为到期部分和未到期部分,当前指向的表示刚好到期,需要处理此时间格内的TimerTaskList中的任务· wheelSize的扩充有限制,针对不同定时需要,Kafka引入层级时间轮的概念。当任务的到期时间超过了当前时间轮的时间范围,会尝试添加到上层时间轮。当延时任务所在的时间轮不能精准实现到期时间时,也会重新提交到层级时间轮,进行降级。· Kafka中的定时器只持有第一层时间轮引用,每一层时间轮中有一个引用指向更高一层。3.延时操作· 延时操作创建后会被加入延时操作管理器(DelayedOperationPurgatory)做专门处理,每个延时操作管理器配别一个定时器。延时操作除了满足时间条件执行,还支持外部事件触发,由一个监听池来监听每个分区的外部事件。4.控制器(KafkaController)· 控制器 o Kafka集群中有一个broker会被选举为控制器,负责管理整个集群中所有分区和副本的状态。其职责有: § 监听partition变化§ 监听topic相关变化§ 监听broker相关变化§ 从zk中读取当前与topic、partition、broker相关的信息并管理§ 启动并管理partition状态机和replica状态机§ 更新集群元数据§ 维护分区的优先副本均衡· 控制器的选举及异常恢复 o 控制器选举依赖zk,在/cont