引言:来到了新公司,需要对kafka组件有很深的研究,本人之前对老版的kafka有过一定的研究,但是谈不上深入,新公司力推kafka,比较kafka作为消息系统在目前的市场上的占有率还是很高的,可以看本人之前kafka的博客中有关kafka的优点和为什么要用kafka。
在众多优点中,我本人认为最重要的2个优点如下:
1、削峰
数据库的处理能力是有限的,在峰值期,过多的请求落到后台,一旦超过系统的处理能力,可能会使系统挂掉。
如上图所示,系统的处理能力是 2k/s,MQ 处理能力是 8k/s,峰值请求 5k/s,MQ 的处理能力远远大于数据库,在高峰期,请求可以先积压在 MQ 中,系统可以根据自身的处理能力以 2k/s 的速度消费这些请求。
这样等高峰期一过,请求可能只有 100/s,系统可以很快的消费掉积压在 MQ 中的请求。
注意,上面的请求指的是写请求,查询请求一般通过缓存解决。
2、解耦
如下场景,S 系统与 A、B、C 系统紧密耦合。由于需求变动,A 系统修改了相关代码,S 系统也需要调整 A 相关的代码。
过几天,C 系统需要删除,S 紧跟着删除 C 相关代码;又过了几天,需要新增 D 系统,S 系统又要添加与 D 相关的代码;再过几天,程序猿疯了...
这样各个系统紧密耦合,不利于维护,也不利于扩展。现在引入 MQ,A 系统变动,A 自己修改自己的代码即可;C 系统删除,直接取消订阅;D 系统新增,订阅相关消息即可。
这样通过引入消息中间件,使各个系统都与 MQ 交互,从而避免它们之间的错综复杂的调用关系。
kafka架构原理:
最经典的图也就是官方的图了
找了一些其他博主的图:这里自己就懒的画了
详细复杂的kafka架构
通俗点讲:就是producer ----> kafka cluster(brokers) -----> consumer
生产者生产消息 经过 kafka队列 被消费者消费
相关的组件概念见:
topic and logs
废话不多说,先见图
文字解释如下:
Message 是按照 Topic 来组织的,每个 Topic 可以分成多个 Partition(对server.properties/num.partitions)。 本人习惯性配置文件为num.partitions=broker个数,人为的分配到各个节点上。
Partition 中的每条记录(Message)包含三个属性:Offset,messageSize 和 Data。
其中 Offset 表示消息偏移量;messageSize 表示消息的大小;Data 表示消息的具体内容。
Partition 是以文件的形式存储在文件系统中,位置由 server.properties/log.dirs 指定,其命名规则为
生产配置文件为:log.dirs=/data/kafka/kafka-logs
[hadoop@kafka03-55-13 kafka-logs]$ pwd
/data/kafka/kafka-logs
[hadoop@kafka03-55-13 kafka-logs]$ ls |grep mjh
topic-by-mjh-0
topic-by-mjh-1
topic-by-mjh-10
topic-by-mjh-11
topic-by-mjh-12
...
...
...
Partition 可能位于不同的 Broker 上,Partition 是分段的,每个段是一个 Segment 文件。
Partition 目录下包括了数据文件和索引文件
[hadoop@kafka03-55-13 kafka-logs]$ cd topic-by-mjh-0
[hadoop@kafka03-55-13 topic-by-mjh-0]$ ll
total 4
-rw-rw-r-- 1 hadoop hadoop 10485760 Aug 24 20:13 00000000000000000334.index
-rw-rw-r-- 1 hadoop hadoop 0 Aug 13 17:42 00000000000000000334.log
-rw-rw-r-- 1 hadoop hadoop 10485756 Aug 24 20:13 00000000000000000334.timeindex
-rw-rw-r-- 1 hadoop hadoop 4 Aug 16 14:16 leader-epoch-checkpoint
Index 采用稀疏存储的方式,它不会为每一条 Message 都建立索引,而是每隔一定的字节数建立一条索引,避免索引文件占用过多的空间。
缺点是没有建立索引的 Offset 不能一次定位到 Message 的位置,需要做一次顺序扫描,但是扫描的范围很小。
索引包含两个部分(均为 4 个字节的数字),分别为相对 Offset 和 Position。
相对 Offset 表示 Segment 文件中的 Offset,Position 表示 Message 在数据文件中的位置。
Segment下的log文件就是存储消息的地方
每个消息都会包含消息体、offset、timestamp、key、size、压缩编码器、校验和、消息版本号等。
在磁盘上的数据格式和producer发送到broker的数据格式一模一样,也和consumer收到的数据格式一模一样。由于磁盘格式与consumer以及producer的数据格式一模一样,这样就使得Kafka可以通过零拷贝(zero-copy)技术来提高传输效率。 // 关于零拷贝技术,后期会专门写一遍博客来解释
小结:
1、Partition 是一个顺序的追加日志,属于顺序写磁盘(顺序写磁盘效率比随机写内存要高,保障 Kafka 吞吐率)。
2、Kafka 的 Message 存储采用了分区(Partition),磁盘顺序读写,分段(LogSegment)和稀疏索引这几个手段来达到高效性。
3、在 Kafka 的文件存储中,同一个 Topic 下有多个不同的 Partition,每个 Partition 都为一个目录,而每一个目录又被平均分配成多个大小相等的 Segment File 中(Segment 大小我们在生产上设置成1G或者 500MB ),Segment File 又由 index file 和 data file 组成,他们总是成对出现,后缀 ".index" 和 ".log" 分表表示 Segment 索引文件和数据文件。
Partition and Replica
一个 Topic 物理上分为多个 Partition,位于不同的 Broker 上。如果没有 Replica,一旦 Broker 宕机,其上所有的 Patition 将不可用。
每个 Partition 可以有多个Replica(对应server.properties/default.replication.factor),分配到不同的 Broker 上。本人默认习惯为 default.replication.factor=2 也就是默认2个副本,比较合理
其中有一个 Leader 负责读写,处理来自 Producer 和 Consumer 的请求;其他作为 Follower 从 Leader Pull 消息,保持与 Leader 的同步。
如何分配 Partition 和 Replica 到 Broker 上?步骤如下:
1、将所有 Broker(假设共 n 个 Broker)和待分配的 Partition 排序。
2、将第 i 个 Partition 分配到第(i mod n)个 Broker 上。
3、将第 i 个 Partition 的第 j 个 Replica 分配到第((i + j) mode n)个 Broker 上。
根据上面的分配规则,若 Replica 的数量大于 Broker 的数量,必定会有两个相同的 Replica 分配到同一个 Broker 上,产生冗余。因此 Replica 的数量应该小于或等于 Broker 的数量。
//这里kafka硬性规定了创建的replica不能超过broker的数量,必须等于小于broker的数量
这里有2个算法函数解释一下
1、mod:求余函数;
2、mode:返回在某数组或数据区域中出现频率最多的数值,mode是一个位置测量函数。
我这里只有3个broker 创建4个replica就出现报错 具体见下
[root@kafka02-55-12 ~]# kafka-topics.sh --zookeeper 10.211.55.11:2181,10.211.55.12:2181,10.211.55.13:2181/kafkagroup --replication-factor 4 --partitions 9 --create --topic topic-zhuhair
**Error while executing topic command : Replication factor: 4 larger than available brokers: 3.**
[2019-08-24 20:41:40,611] ERROR org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 4 larger than available brokers: 3.
pratition的leader是如何选举的---broker failover故障转移
//通俗点讲也就是当broker发生宕机了,如何保证高可用的
文字描述如下:
Kafka 在 Zookeeper 中(/brokers/topics/[topic]/partitions/[partition]/state)动态维护了一个 ISR(in-sync replicas)。
ISR 里面的所有 Replica 都"跟上"了 Leader,Controller 将会从 ISR 里选一个做 Leader。
具体流程文字描述如下:
1、Controller 在 Zookeeper 的 /brokers/ids/[brokerId] 节点注册 Watcher,
2、当 Broker 宕机时 Zookeeper 会 Fire Watch。
3、Controller 从 /brokers/ids 节点读取可用 Broker。
4、Controller 决定 set_p,该集合包含宕机 Broker 上的所有 Partition。
5、对 set_p 中的每一个 Partition,从/brokers/topics/[topic]/partitions/[partition]/state 节点读取 ISR,决定新 Leader,将新 Leader、ISR、controller_epoch 和 leader_epoch 等信息写入 State 节点。
6、zk通过 RPC 向相关 Broker 发送 leaderAndISRRequest 命令。
极端情况下需要考虑的是:
当 ISR 为空时,会选一个 Replica(不一定是 ISR 成员)作为 Leader;
当所有的 Replica 都歇菜了,会等任意一个 Replica 复活,将其作为 Leader。
//
这就需要在可用性和一致性当中作出一个简单的折衷。如果一定要等待ISR中的Replica“活”过来,那不可用的时间就可能会相对较长。而且如果ISR中的所有Replica都无法“活”过来了,或者数据都丢失了,这个Partition将永远不可用。选择第一个“活”过来的Replica作为Leader,而这个Replica不是ISR中的Replica,那即使它并不保证已经包含了所有已commit的消息,它也会成为Leader而作为consumer的数据源(前文有说明,所有读写都由Leader完成)。Kafka0.8.*使用了第二种方式。根据Kafka的文档,在以后的版本中,Kafka支持用户通过配置选择这两种方式中的一种,从而根据不同的使用场景选择高可用性还是强一致性。
ISR(同步列表)中的 Follower 都"跟上"了Leader,"跟上"并不表示完全一致,它由 server.properties/replica.lag.time.max.ms 配置。表示 Leader 等待 Follower 同步消息的大时间,如果超时,Leader 将 Follower 移除 ISR。配置项 replica.lag.max.messages 已经移除。
Replica 副本如何同步 消息传递同步策略
1、Producer在发布消息到某个Partition时,先通过ZooKeeper找到该Partition的Leader,
2、无论该Topic的Replication Factor为多少,Producer只将该消息发送到该Partition的Leader。
3、Leader会将该消息写入其本地Log。
4、每个Follower都从Leader pull数据。这种方式上,Follower存储的数据顺序与Leader保持一致。
5、Follower在收到该消息并写入其Log后,向Leader发送ACK。
6、一旦Leader收到了ISR中的所有Replica的ACK,该消息就被认为已经commit了,Leader将增加HW并且向Producer发送ACK。
为了提高性能,每个Follower在接收到数据后就立马向Leader发送ACK,而非等到数据写入Log中。
因此,对于已经commit的消息,Kafka只能保证它被存于多个Replica的内存中,而不能保证它们被持久化到磁盘中,也就不能完全保证异常发生后该条消息一定能被Consumer消费。
Consumer读消息也是从Leader读取,只有被commit过的消息才会暴露给Consumer。
具体的可靠性,是由生产者(根据配置项 producer.properties/acks)来决定的。
有资料说 最新的文档 2.2.x request.required.acks 已经不存在了,这一点有待我去确认
通俗一点讲对ack的三个参数的含义为
Kafka producer有三种ack机制 初始化producer时在config中进行配置
0 :意味着producer不等待broker同步完成的确认,继续发送下一条(批)信息
提供了最低的延迟。但是最弱的持久性,当服务器发生故障时,就很可能发生数据丢失。例如leader已经死亡,producer不知情,还会继续发送消息broker接收不到数据就会数据丢失
1:意味着producer要等待leader成功收到数据并得到确认,才发送下一条message。此选项提供了较好的持久性较低的延迟性。Partition的Leader死亡,follwer尚未复制,数据就会丢失
-1:意味着producer得到follwer确认,才发送下一条数据
持久性最好,延时性最差。
在这里强调的一点是,在kafak的partition中的fllower和leader中的复制不是完全的同步复制,也不是单纯的异步复制
同步复制:所有的fllower复制完才提交 这样的缺点是极大的影响了吞吐率
异步复制:Follower异步的从Leader复制数据,数据只要被Leader写入log就被认为已经commit,这种情况下如果Follower都复制完都落后于Leader,而如果Leader突然宕机,则会丢失数据。
所有 kafak采用的是ISR 的方式则很好的均衡了确保数据不丢失以及吞吐率。Follower可以批量的从Leader复制数据,这样极大的提高复制性能(批量写磁盘),极大减少了Follower与Leader的差距。
producer如何发送消息
Producer 首先将消息封装进一个 ProducerRecord 实例中。
写消息的路由模式
1、 指定了 patition,则直接使用;
2、 未指定 patition 但指定 key,通过对 key 的 value 进行hash 选出一个 patition
这个 Hash(即分区机制)由 producer.properties/partitioner.class 指定的类实现,这个路由类需要实现 Partitioner 接口。
3、 patition 和 key 都未指定,使用轮询选出一个 patition。
备注:消息并不会立即发送,而是先进行序列化后,发送给 Partitioner,
也就是上面提到的 Hash 函数,由 Partitioner 确定目标分区后,发送到一块内存缓冲区中(发送队列)。Producer 的另一个工作线程(即 Sender 线程),
则负责实时地从该缓冲区中提取出准备好的消息封装到一个批次内,统一发送到对应的 Broker 中。
具体写数据流程如下:
具体流程如下:
1、 producer 先从 zookeeper 的 "/brokers/.../state" 节点找到该 partition 的 leader
2、 producer 将消息发送给该 leader
3、 leader 将消息写入本地 log
4、 followers 从 leader pull 消息,写入本地 log 后 leader 发送 ACK
5、 leader 收到所有 ISR 中的 replica 的 ACK 后,增加 HW(high watermark,最后 commit 的 offset) 并向 producer 发送 ACK
参考网上的资料,有的文字版的解释是这样的 个人觉得下面的这样的文字解释的更加通俗易懂
流程如下:
1、首先,我们需要创建一个ProducerRecord,这个对象需要包含消息的主题(topic)和值(value),可以选择性指定一个键值(key)或者分区(partition)。
2、发送消息时,生产者会对键值和值序列化成字节数组,然后发送到分配器(partitioner)。
3、如果我们指定了分区,那么分配器返回该分区即可;否则,分配器将会基于键值来选择一个分区并返回。
4、选择完分区后,生产者知道了消息所属的主题和分区,它将这条记录添加到相同主题和分区的批量消息中,另一个线程负责发送这些批量消息到对应的Kafka broker。
5、当broker接收到消息后,如果成功写入则返回一个包含消息的主题、分区及位移的RecordMetadata对象,否则返回异常。
6、生产者接收到结果后,对于异常可能会进行重试。
参考链接:
架构成长之路:Kafka设计原理看了又忘,忘了又看?一文让你掌握: https://www.toutiao.com/i6714606866355192328/
Kafka的ACK机制有三种,是哪三种 : https://blog.csdn.net/Sun1181342029/article/details/87806207
kafka原理系列ACK机制(数据可靠性和持久性保证) https://blog.csdn.net/bluehawksk/article/details/96120803
kafka入门介绍 https://www.orchome.com/5
Kafka学习之路 (三)Kafka的高可用 https://www.cnblogs.com/qingyunzong/p/9004703.html
另外有需要云服务器可以了解下创新互联scvps.cn,海内外云服务器15元起步,三天无理由+7*72小时售后在线,公司持有idc许可证,提供“云服务器、裸金属服务器、高防服务器、香港服务器、美国服务器、虚拟主机、免备案服务器”等云主机租用服务以及企业上云的综合解决方案,具有“安全稳定、简单易用、服务可用性高、性价比高”等特点与优势,专为企业上云打造定制,能够满足用户丰富、多元化的应用场景需求。