Lazy loaded image
中间件
Lazy loaded imagekafka
字数 20734阅读时长 52 分钟
2025-3-22
2025-4-24
type
status
date
slug
summary
tags
category
icon
password

kafka 简介

本文档参看的文档是:尚硅谷官方文档,并在基础上修改完善!非常感谢尚硅谷团队!!!!
kafka是一款分布式的基于发布/订阅模式的消息队列,是目前比较主流的消息中间件。Kafka对消息保存时根据Topic(主题)进行归类,发送消息者称为Producer,消息接受者称为Consumer,此外kafka集群有多个kafka实例组成,每个实例称为broker。
Kafka软件最初的设计就是专门用于数据传输的消息系统,类似功能的软件有RabbitMQ、ActiveMQ、RocketMQ等。这些软件名称中的MQ是英文单词Message Queue的简称,也就是所谓的消息队列的意思。这些软件的核心功能是传输数据,而Java中如果想要实现数据传输功能,那么这个软件一般需要遵循Java消息服务技术规范JMS(Java Message Service)。
ActiveMQ软件就完全遵循了JMS技术规范,而RabbitMQ是遵循了类似JMS规范并兼容JMS规范的跨平台的AMQP(Advanced Message Queuing Protocol)规范。而Kafka拥有作为一个消息系统应该具备的功能,但是却有着独特的设计。可以这样说,Kafka借鉴了JMS规范的思想,但是却并没有完全遵循JMS规范。这也恰恰是软件名称为Kafka,而不是KafkaMQ的原因。
之所以取名kafka,据说是因为开发者非常喜欢奥地利作家卡夫卡,所以以此命名。2010年,Linkedin公司为了解决消息传输过程中由各种缺陷导致的阻塞、服务无法访问等问题,主导开发了一款分布式消息日志传输系统。主导开发的首席架构师Jay Kreps因为喜欢写出《变形记》的西方表现主义文学先驱小说家Jay Kafka,所以给这个消息系统起了一个很酷,却和软件系统特性无关的名称Kafka。
为什么需要zookeeper?Kafka集群中有一个broker会被选举为Controller,负责管理集群broker的上下线,所有topic的分区副本分配和leader选举等工作。Controller的管理工作都是依赖于Zookeeper的。

初识kafka

线程之间可以通过堆内存进行通信,比如线程1每秒向堆内存发送50条数据,而线程2每秒从堆内存中接收30条数据。由于接收速度低于发送速度,每秒会积压20条数据。随着时间的推移,积压的数据会越来越多,最终可能导致堆内存溢出(OutOfMemoryError),从而使服务不可用。
notion image
那么将数据放入到磁盘文件不就可以解决了吗?直接写磁盘文件虽然能解决内存溢出的问题,但有很多局限性。比如,当数据量很大时,单台机器的磁盘容易不够用。Kafka可以通过分布式架构把数据分散到多台机器上存储和处理,这样更可靠也更容易扩展。
notion image
在多进程通信中,假设进程1负责生产数据,进程2和进程3负责消费数据。那么进程1需要将同一份数据同时发送给进程2和进程3,导致数据被发送两次,浪费资源。或者,进程1需要按照某种规则,比如前一半数据发给进程2,后一半数据发给进程3,这样就会增加很多复杂的逻辑,无形中增加了开发和维护的负担。为了解决这些问题,我们可以引入一个消息中间件(缓冲区)来进行解耦。Kafka是一个常用的消息中间件,可以很好地解决这些问题。具体来说: 生产者(进程1)只需要把数据发送到Kafka,不需要关心谁会消费数据,也不需要重复发送。 消费者(进程2和进程3)从Kafka获取数据,它们可以独立消费,互不影响。 Kafka会确保数据的完整性和顺序,生产者发送的数据不会被修改或丢失。
notion image

kafka 集群部署

我这里部署的kafka集群是依赖于zookeeper,所以部署kafka集群需要先部署一套zookeeper集群,部署zookeeper集群,请看本人zookeeper介绍那章节。至于kafka和zk的关系,网上有大量的文档,这里就不重复造轮子了。
下载kafka二进制包,官网地址:https://kafka.apache.org/downloads,我这里下载的是kafka_2.13-3.6.1
解压到data目录
修改配置文件,修改之前我们先对原有的文件进行备份
kafka配置文件如下
将这个配置文件分别拷贝到另外两个节点,另外两个节点只需要修改server.properties配置文件中的 broker.idbroker.id 不得重复,整个集群中唯一。我们这里node2节点broker.id=1,node3节点broker.id=2
启动 Zookeeper 集群,如果已经启动,忽略此步骤
启动 Kafka 集群,每台节点都执行
启动完后,我们来进行一下简单的验证,启动生产者生产数据
启动消费者进行数据消费
生产者生产数据,消费者就可以接收到数据
notion image
停止kafka集群,注意:停止 Kafka 集群时,一定要等 Kafka 所有节点进程全部停止后再停止 Zookeeper集群。因为 Zookeeper 集群当中记录着 Kafka 集群相关信息,Zookeeper 集群一旦先停止,Kafka 集群就没有办法再获取停止进程的信息,只能手动杀死 Kafka 进程了。

kafka的核心组成

在Kafka的世界中有很多概念和术语是需要你提前理解并熟练掌握的
Broker:一台kafka服务器就是一个broker。一个集群由多个broker组成,每个broker就是一个kafka的实例。 Topic:Topic 就是数据主题,kafka建议根据业务系统将不同的数据存放在不同的topic中!Kafka中的Topics总是多订阅者模式,一个topic可以拥有一个或者多个消费者来订阅它的数据。一个大的Topic可以分布式存储在多个kafka broker中!Topic可以类比为数据库中的库! Partition:每个topic可以有多个分区,通过分区的设计,topic可以不断进行扩展!即一个Topic的多个分区分布式存储在多个broker;此外通过分区还可以让一个topic被多个consumer进行消费!以达到并行处理!分区可以类比为数据库中的表!kafka只保证按一个partition中的顺序将消息发给consumer,不保证一个topic的整体(多个partition间)的顺序。 Replication:备份机制(Replication)。备份的思想很简单,就是把相同的数据拷贝到多台机器上,而这些相同的数据拷贝在Kafka中被称为副本(Replica)。Kafka定义了两类副本:领导者副本(Leader Replica)和追随者副本(Follower Replica)。前者对外提供服务,这里的对外指的是与客户端程序进行交互;而后者只是被动地追随领导者副本而已,不能与外界进行交互。副本的工作机制也很简单:生产者总是向领导者副本写消息;而消费者总是从领导者副本读消息。至于追随者副本,它只做一件事:向领导者副本发送请求,请求领导者把最新生产的消息发给它,这样它能保持与领导者的同步。 Offset:生成者每生产一条数据都会追加到指定分区的log文件中,且存储的记录都是有序的,由于不可随机写入,所以顺序是不变的,这个顺序是通过一个称之为offset的id来唯一标识。kafka自动维护消费者消费的主题各个分区的offset,前提是消费者消费的分区是由kafka分配的,在启动消费者时,只指定了主题,没有指定分区,kafka会将offset数据保存到一个内置主题为__consumer_offsets的主题中,如果指定了分区,那么kafka将不再自动维护offset。 Producer:消息生产者,就是向kafka broker发消息的客户端。生产者负责将记录分配到topic的指定 partition(分区)中,如果没有指定分区,则都卡夫卡依据分区策略进行分配。 Consumer:消息消费者,向kafka broker取消息的客户端。每个消费者都要维护自己读取数据的offset。低版本0.9之前将offset保存在Zookeeper中,0.9及之后保存在Kafka的“__consumer_offsets”主题中 Consumer Group:每个消费者都会使用一个消费组名称来进行标识。同一个组中的不同的消费者实例,可以分布在多个进程或多个机器上! Interceptor:Interceptor 为拦截器,当生产者向kafka发送数据时,数据会先经过拦截器进行拦截处理,多个拦截器可以组成拦截器链,然后再真正发送数据doSend() Persistence:Persistence即持久化,Kafka 集群保留所有发布的记录,无论他们是否已被消费,都会通过一个可配置的参数:保留期限来控制。举个例子, 如果保留策略设置为2天,一条记录发布后两天内,可以随时被消费,两天过后这条记录会被清除并释放磁盘空间。

分区Partition

Kafka使用发布订阅模式进行消息传输,消息生产者需要将数据发送到一个主题(topic)。如果一个主题接收到的数据量非常大,那么该主题所在的broker节点的负载会迅速增加,可能会出现性能瓶颈,甚至因为热点问题导致节点故障,从而影响整个服务的可用性。为了解决这个问题,Kafka引入了分区(partition)的概念。分区是将一个主题在物理上划分成多个小块,然后将这些小块的数据分布到不同的broker节点上。这样,不同的broker可以共同分担一个主题的压力,从而避免单个节点的负载过高问题。这种设计有效提高了系统的吞吐量,并且分散了热点。
默认情况下,当创建一个主题时,分区的数量是1(即只有一个分区)。如果需要更多分区,可以通过指定参数--partitions来调整。Kafka的分区机制不仅解决了主题的线性扩展问题,还实现了负载均衡,让系统更加高效和可靠。
notion image
主题下的每条消息只会保存在某一个分区中,而不会在多个分区中被保存多份。官网上的这张图非常清晰地展示了Kafka的三级结构,如下所示:
notion image

副本Replication

在分布式系统中,出现错误是比较常见的,但只要集群中还有可用的服务节点,系统就能继续运行,虽然效率可能会有所降低,但最重要的是保证系统的可用性。Kafka的主题(topic)也会遇到类似的问题。如果一个topic被划分成多个分区(partition),这些分区会均匀分布在不同的broker节点上。一旦某个broker节点发生故障,该节点上的分区就无法正常使用,可能导致topic的数据不完整。为了防止这种情况下出现数据丢失,Kafka提供了一种机制,叫做副本(replication)。每个分区的数据都会被复制到多个broker节点上,形成副本。即使某个节点故障,其他节点上的副本仍然可以提供服务,保证数据的完整性和系统的高可用性。这种设计让Kafka在分布式环境中更加可靠,即便部分节点不可用,也能保证系统的正常运行。
Kafka支持多副本,使得主题topic可以做到更多容错性,牺牲性能与空间去换取更高的可靠性。注意:这里不能将多个备份放置在同一个broker中,因为一旦出现故障,多个副本就都不能用了,那么副本的意义就没有了。
notion image
通常情况下,当我们提到副本时,可能会认为有一个完整的主文件,然后这个文件的备份被称为副本。然而,在Kafka中,所有的副本都是平等的,只是会从中选出一个作为主导副本(Leader),其余的作为追随副本(Follower)。在Kafka中,这些副本其实就是分区的复制品。每个分区都可以有一个或多个副本,Kafka会从这些副本中选出一个作为Leader。只有Leader副本负责处理数据的读写操作,而Follower副本仅用于备份,跟随Leader保持数据同步。如果Leader副本所在的节点发生故障,Kafka会自动从Follower副本中选出新的Leader,继续提供服务。这种设计确保了数据的高可用性和一致性,即使某些节点发生故障,系统仍然能够正常运行,数据也不会丢失。
notion image
Kafka 分区中的所有副本统称为 AR,AR = ISR + OSR ISR 表示和 Leader 保持同步的 Follower 集合。如果 Follower 长时间未向 Leader 发送通信请求或同步数据,则该 Follower 将被踢出 ISR。该时间阈值由 replica.lag.time.max.ms 参数设定,默认 30s。Leader 发生故障之后,就会从 ISR 中选举新的 Leader。 OSR 表示 Follower 与 Leader 副本同步时,延迟过多的副本。

Leader选举流程

Kafka 集群中有一个 broker 的 Controller 会被选举为 Controller Leader,负责管理集群broker 的上下线,所有topic 的分区副本分配和Leader 选举等工作。Controller 的信息同步工作是依赖于Zookeeper 的。 Leader选举流程如下:
notion image
kafka部署好后,连接Zookeeper ,可以看到Zookeeper 的根目录多了一个新的目录 kafka(配置文件zookeeper.connect指定的目录)。ids存储了broker id,表示三个kafka节点都在线,可以正常进行工作
第一个 Kafka Broker 启动后,会尝试成为 Controller,创建 controller 节点,如果创建成功,它成为 Kafka Controller,负责分区管理、Leader 选举等任务。后续的 Broker 监听 controller,如果 Controller 挂掉,则进行抢占,新的 Broker 成为 Controller。此时看kafka的启动日志,先启动的kafk就是管理者,这里我们启动的是broker 0
notion image
关掉一个broke,比我我们这里把控制节点关掉,在node1节点执行如下命令
此时以经换了控制器
下面我们来创建一个新的 topic,3个分区,3个副本
查看 Leader 分布情况
停止掉 broker 3 的kafka进程,并查看Leader分区情况
停止掉 broker 2 的kafka进程,并查看Leader分区情况
启动 broker 3 的kafka进程,并查看Leader分区情况
启动 broker 2 的kafka进程,并查看Leader分区情况
停止掉 broker 1 的kafka进程,并查看Leader分区情况

Follower故障处理原理

LEO:每个副本的最后一个offset,LEO其实就是最新的offset + 1 HW:所有副本中最小的LEO
notion image
Follower发生故障后会被临时踢出ISR,这个期间Leader和Follower继续接收数据,如图紫色部分是接收的新数据
notion image
待该Follower恢复后,Follower会读取本地磁盘记录的上次的HW,并将log文件高于HW的部分截掉,从HW开始向leader进行同步。等待Follower的LEO大于等于该分区的HW,即Follower追上leader后,就可以重新加入ISR了
notion image

Leader故障处理原理

flower会主动去leader节点拉取数据,比如第一个从节点同步了2条数据,第二个节点同步了3条数据。此时leader挂了,那么将会从flower节点选举一个节点为leader,那么之前的leader有4条数据,而新的leader为2条数据,那么消费者只消费了2条数据,对消费者来说数据不是丢失了么?对于这种问题,kafka提出了水位线的概念,假设分区是个大的木桶,副本就是桶上的木板,分区中的数据就好比桶中的水,木桶中最多能容忍的水位是最短的木板高度,这就是木桶理论
水位线:当前分区中能被消费者消费的数据位置,水位线以下是可以被消费者消费。也就是说对消费者来说只能看到水位线以下的数据
Leader发生故障之后,会从ISR中选出一个新的Leader。为保证多个副本之间的数据一致性,其余的Follower会先将各自的log文件高于HW的部分截掉,然后从新的Leader同步数据。注意:这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复。
notion image

分区副本分配

在Kafka中,默认情况下,分区的Leader和副本的分配并不是完全均匀的。这是因为Kafka的分配算法并不能站在上帝视角去全面考虑所有因素,它只能尽可能地做到负载均衡,但这并不能完全避免一些节点出现过载的问题。具体来说,Kafka会尝试让每个broker节点承担相近数量的Leader副本和Follower副本,但在实际运行中,由于主题、分区数量以及集群规模的复杂性,可能仍会导致某些节点的I/O负载较高,出现热点问题。例如,一个节点可能同时拥有多个高流量分区的Leader副本,这会导致资源使用不均衡。再比如很多公司使用Kafka收集应用服务器的日志数据,这种数据都是很多的,特别是对于那种大批量机器组成的集群环境,每分钟产生的日志量都能以GB数,因此如何将这么大的数据量均匀地分配到Kafka的各个Broker上,就成为一个非常重要的问题。
创建一个topic,分区1,副本2,查看leader分区节点、副本所在节点 创建一个topic,分区2,副本2,查看leader分区节点、副本所在节点 创建一个topic,分区3,副本2,查看leader分区节点、副本所在节点
notion image

手动调整分区副本存储

在生产环境中,每台服务器的配置和性能不一致,但是Kafka只会根据自己的代码规则创建对应的分区副本,就会导致个别服务器存储压力较大。所有需要手动调整分区副本的存储。
我们在使用Apache Kafka生产和消费消息的时候,肯定是希望能够将数据均匀地分配到所有服务器上。为了解决这些问题,Kafka管理员可以手动调整分区和副本的分配策略,或者使用工具(如Kafka Reassign Partitions工具)来重新分配分区和副本,以进一步优化负载均衡。这样可以减少I/O热点问题,提升Kafka集群的性能和稳定性。
下面我们来创建一个新的topic,3个分区,2个副本,名称为test-3-2。将该topic的所有副本都存储到broker0和broker1两台服务器上
查看分区副本存储情况
notion image
创建副本存储计划,所有副本都指定存储在broker0、broker1中
执行副本存储计划
验证副本存储计划
查看分区副本存储情况
notion image

负载平衡

正常情况下,Kafka本身会自动把Leader Partition均匀分散在各个机器上,来保证每台机器的读写吞吐量都是均匀的。但是如果某些broker宕机,会导致Leader Partition过于集中在其他少部分几台broker上,这会导致少数几台broker的读写请求压力过高,其他宕机的broker重启之后都是follower partition,读写请求很低,造成集群负载不均衡。
下面拿一个主题举例说明,假设集群只有一个主题如下图所示:
notion image
针对broker0节点,分区2的AR优先副本是0节点,但是0节点却不是Leader节点,所以不平衡数加1,AR副本总数是4。所以broker0节点不平衡率为1/4>10%,需要再平衡。broker2和broker3节点和broker0不平衡率一样,需要再平衡。Broker1的不平衡数为0,不需要再平衡
参数 auto.leader.rebalance.enable:默认是true,自动Leader Partition 平衡,leader重选举的代价比较大,可能会带来性能影响,建议设置为false关闭。 leader.imbalance.per.broker.percentage:默认是10%,每个broker允许的不平衡的leader的比率。如果每个broker超过了这个值,控制器会触发leader的平衡。 leader.imbalance.check.interval.seconds:默认值300秒,检查leader负载是否平衡的间隔时间。

副本扩容

kafka并没有提供一个专门的脚本来支持副本的扩缩,不像 kafka-topic.sh 脚本一样是可以扩分区的。想要对副本进行扩缩,只能是曲线救国了。利用 kafka-reassign-partitions.sh 来重新分配副本
假设我们当前的情况是3分区1副本,为了提供可用性,我想把副本数升到3。首先我们来创建一个3分区1副本的topic
首先我们来查看一下test-3-1这个topic
notion image
在进行分区副本重分配之前,最好是用下面方式获取一个合理的分配文件,编写 move-json-file.json 文件,这个文件就是告知想对哪些Topic进行重新分配的计算,比如我们这里需要对test5这个topic进行重新分配
首先我们使用 --generate 获取一下当前分区副本的分配情况,--broker-list "0,1,2" 这个参数是你想要分配的Brokers
需求注意的是,此时尚未开始执行,它只是告诉你当前的分配和建议。我们想把所有分区的副本都变成3,那我们只需修改 "replicas": [] 里面的值了。这里面是Broker列表,排在第一个的是Leader,所以我们根据自己想要的分配规则修改一下json文件,就变成如下
注意 log_dirs 里面的数量要和 replicas 数量匹配,或者直接把 log_dirs 选项删除掉。这个 log_dirs 是副本跨路径迁移时候的绝对路径。然后执行--execute
验证
最后我们再来查看一下test-3-1这个topic
notion image

副本缩容

副本缩容跟扩容是一个意思,当副本分配少于之前的数量时候,多出来的副本会被删除。比如我test-3-1这个主题,想重新恢复到一个副本。创建下面的json文件
然后执行--execute缩减副本
执行之后可以看到其它的副本就被标记为删除了,一会就会被清理掉
notion image
然后我们再来查看我们这个topic,发现只有一个副本了
notion image
用这样一种方式我们虽然是实现了副本的扩缩容,但是副本的分配需要我们自己来把控好,要做到负载均衡等等。那肯定是没有kafka自动帮我们分配比较合理一点,那么我们有什么好的方法来帮我们给出一个合理分配的Json文件吗?

主题 topic

如果在server.properties文件中配置了参数auto.create.topics.enable=true,如果访问的主题不存在,那么Kafka就会自动创建主题
notion image

创建主题

Kafka是通过 kafka-topics.sh 指令文件进行消息主题操作的。其中包含了对主题的查询,创建,删除等功能。调用指令创建主题时,需要传递多个参数,而且参数的前缀为两个横线: --bootstrap-server :把当前的终端当成Kafka的客户端,那么进行操作前,就需要连接服务器。这里的参数就表示服务器的连接方式,因为我们在本机启动Kafka服务进程,且Kafka默认端口为9092,所以此处,后面接的参数值为localhost:9092,用空格隔开 --create :表示对主题的创建操作,是个操作参数,后面无需增加参数值 --topic :主题的名称,后面接的参数值一般就是见名知意的字符串名称,类似于java中的字符串类型标识符名称,当然也可以使用数字,只不过最后还是当成数字字符串使用 --replication-factor:副本数量,注意不能大于broker数量。如果不提供,则会用集群中默认配置 --partitions:分区数量,当创建或者修改topic的时候,用这个来指定分区数。如果创建的时候没有提供参数,则用集群中默认值。注意如果是修改的时候,分区比之前小会有问题
这里我们创建一个名为test1的主题
由于我们指令中没有配置分区和副本参数,所以当前主题分区数量为默认值1,编号为0,副本为1,编号为所在broker的ID值。
下面我们来继续创建一个主题,指定分区与副本数量,副本数量不能超过当前可用的broker数量。如果只指定了分区数与副本数,由kafka采用负载均衡策略进行对副本自动分配
为了方便集群的管理,创建topic时,会同时在ZK中增加子节点,记录主题相关配置信息:/kafka/brokers/topics节点中会增加first-topic节点以及相应的子节点,我们连接zk可以看到刚刚创建的topic

查询主题

查看所有主题
  • -list:表示对所有主题的查询操作,是个操作参数,后面无需增加参数值
查看主题的详细信息,比如我们这里查看test1主题的详细信息
  • -describe:查看主题的详细信息 -topic:查询的主题名称

创建主题

Kafka是通过 kafka-topics.sh 指令文件进行消息主题操作的。其中包含了对主题的查询,创建,删除等功能。调用指令创建主题时,需要传递多个参数,而且参数的前缀为两个横线: --bootstrap-server :把当前的终端当成Kafka的客户端,那么进行操作前,就需要连接服务器。这里的参数就表示服务器的连接方式,因为我们在本机启动Kafka服务进程,且Kafka默认端口为9092,所以此处,后面接的参数值为localhost:9092,用空格隔开 --create :表示对主题的创建操作,是个操作参数,后面无需增加参数值 --topic :主题的名称,后面接的参数值一般就是见名知意的字符串名称,类似于java中的字符串类型标识符名称,当然也可以使用数字,只不过最后还是当成数字字符串使用 --replication-factor:副本数量,注意不能大于broker数量。如果不提供,则会用集群中默认配置 --partitions:分区数量,当创建或者修改topic的时候,用这个来指定分区数。如果创建的时候没有提供参数,则用集群中默认值。注意如果是修改的时候,分区比之前小会有问题
这里我们创建一个名为test1的主题
由于我们指令中没有配置分区和副本参数,所以当前主题分区数量为默认值1,编号为0,副本为1,编号为所在broker的ID值。
下面我们来继续创建一个主题,指定分区与副本数量,副本数量不能超过当前可用的broker数量。如果只指定了分区数与副本数,由kafka采用负载均衡策略进行对副本自动分配
为了方便集群的管理,创建topic时,会同时在ZK中增加子节点,记录主题相关配置信息:/kafka/brokers/topics节点中会增加first-topic节点以及相应的子节点,我们连接zk可以看到刚刚创建的topic

查询主题

查看所有主题
  • -list:表示对所有主题的查询操作,是个操作参数,后面无需增加参数值
查看主题的详细信息,比如我们这里查看test1主题的详细信息
  • -describe:查看主题的详细信息 -topic:查询的主题名称
notion image
以test1为例: Topic: test1 表示主题名称是 test1 TopicId: ox4rABtBS7O8dl28JRr4CA 表示 Kafka 主题的唯一标识符(Kafka 2.8 及以上引入) PartitionCount: 1 表示该主题有 1 个分区 ReplicationFactor: 1 每个分区有 1 个副本,只有一个leader副本 Configs: segment.bytes=1073741824 表示 Kafka 允许每个分段文件最大 1GB(1,073,741,824 字节)
Partition: 0 表示这是 test1 主题的第 0 号分区 Leader: 2 表示 Broker 2 是该分区的 Leader,负责处理生产者和消费者的请求 Replicas: 2 表示该分区的副本存在于 Broker 2 Isr: 2 表示ISR(同步副本集合) 中只有Broker 2

修改主题(增加分区)

我们这里创建的test1主题分区数为1,副本数为1。下面我们修改为3分区。注意:分区数只能增加,不能减少。副本可以增加也可以缩减
修改为三个分区
  • -alter : 表示对所有主题的查询操作,是个操作参数,后面无需增加参数值 -topic : 修改的主题名称 -partitions : 修改的配置参数:分区数量
查看我们此时的topic,发现已经改为3个分区
notion image

删除主题

如果主题创建后不需要了,或创建的主题有问题,那么我们可以通过相应的指令删除主题。
  • -delete: 表示对主题的删除操作,是个操作参数,后面无需增加参数值。默认情况下,删除操作是逻辑删除,也就是说数据存储的文件依然存在,但是通过指令查询不出来。如果想要直接删除,需要在server.properties文件中设置参数delete.topic.enable=true-topic : 删除的主题名称
kafka彻底删除topic,删除topic后,再次创建同名topic会报错,如下所示
notion image
首先修改 server.properties 配置文件,修改 delete.topic.enable=true,删除kafka存储的topic目录。最后登录zkShell,执行deleteall /kafka/brokers/topics/<topic_name>

生产者

Topic主题创建好后,接下来我们就可以向该主题生产消息了。生产者(Producer)将消息发送到 Kafka 集群中负责该分区的 Leader 副本(Partition Leader)。Leader 副本在接收到数据后,会将数据写入自身的日志,并将数据同步到其它的 Follower 副本中。数据同步完成后,Kafka 会根据配置的应答机制(acks 参数)向生产者返回确认消息。
notion image

ACK应答机制

为保证producer生产者发送的数据,能可靠的发送到指定的topic,topic的每个partition分区收到producer生产者发送的数据后,都需要向producer生产者发送ack,如果producer收到ack,就会进行下一轮的发送,否则重新发送数据。
notion image
对于某些不太重要的数据,对数据的可靠性要求不是很高,可以容忍数据的少量丢失,因此没必要等 ISR 中的所有 Follower 副本都接收成功。Kafka 为用户提供了三种可靠性级别(0、1、-1),用户可以根据对可靠性和延迟的要求进行权衡,选择以下配置: acks = 0:Producer 不等待 Broker 的 ACK,即 Broker 接收到消息后直接返回响应,而不保证消息写入磁盘或日志 优点:延迟最低,性能最高。 缺点:当 Broker 故障时,可能会导致数据丢失,因为消息未真正持久化。 适用场景:对数据可靠性要求低,可以容忍数据丢失的非关键性场景。
notion image
acks = 1:Producer 等待 Partition 的 Leader 副本成功写入日志后返回 ACK 优点:性能和可靠性之间的平衡。 缺点:如果 Leader 副本在向 Follower 副本同步之前发生故障,可能会导致数据丢失。例如,Producer 已经收到 Leader 的 ACK 并认为消息发送成功,但同步未完成时 Leader 挂掉,那么新的 Leader 副本可能不包含这条消息,导致数据丢失。 适用场景:对数据可靠性有一定要求,但延迟敏感的场景。
notion image
acks = -1(或 acks = all):Producer 等待 Partition 的 Leader 和所有 ISR(同步副本集合)中的 Follower 副本都成功写入日志后返回 ACK。 优点:提供最高的数据可靠性,确保消息写入所有副本。 缺点:性能较低,且在极端情况下,例如所有副本同步完成后,但在返回 ACK 之前 Leader 故障,可能会导致消息重复。 适用场景:对数据可靠性要求极高,且可以接受较高延迟的场景。
notion image

ISR

如果设置 acks=-1(或 acks=all),leader收到数据,所有follower都开始同步数据,但有一个follower,因为某种故障,迟迟不能与leader进行同步,那leader就要一直等下去,直到它完成同步,才能发送ack。这个问题怎么解决呢?为了避免 Leader 因等待某个 Follower 同步而导致 ACK 长时间无法返回,Kafka 引入了动态 ISR(In-Sync Replica Set,同步副本集合)机制:ISR 是一个动态集合,表示当前与 Leader 保持同步的 Follower 副本列表。只有在 ISR 中的 Follower 副本完成数据同步后,Leader 才会返回 ACK。
如果某个 Follower 长时间未能完成数据同步,Kafka 会将其从 ISR 集合中临时移除。这个超时时间由 replica.lag.time.max.ms 参数控制。如果 Follower 超过这个时间未能拉取并同步 Leader 的数据,就会被踢出 ISR。被踢出的 Follower 并不会永久移除,而是会持续尝试与 Leader 同步。当追上 Leader 的数据后,Kafka 会将其重新加入 ISR。
如果 Leader 发生故障,Kafka 会从 ISR 集合中选举新的 Leader。因为 ISR 中的副本是与原 Leader 同步的,能够确保数据的高一致性。如果分区副本设置为1个,或者ISR里应答的最小副本数量min.insync.replicas 设置为1,和ack=1的效果是一样的,仍然有丢数的风险。

数据可靠性

数据完全可靠条件= ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2
notion image

数据重复

在 Kafka 中,启用了 acks = -1 或 acks = all 以确保高可靠性,在某些特殊情况下,仍可能导致数据重复:Producer 将数据发送给 Partition 的 Leader。Leader 接收数据并写入本地日志,随后同步给 ISR 中的所有 Follower。当 ISR 同步完成后,Leader 准备向 Producer 返回 ACK。如果此时 Leader 挂掉(例如机器故障或网络中断),ACK 未能返回给 Producer。Kafka 会选举新的 Leader,Producer 检测到 ACK 未收到,会重新发送相同的数据给新选出的 Leader。因此,新 Leader 会再次接收并写入这条数据,导致消息重复。
为什么数据重复会发生?Kafka 的机制设计优先保证数据的至少一次投递(At-Least-Once Delivery),以确保消息不会丢失。如果 Producer 没有收到 ACK,就会认为消息发送失败,并按照重试机制重新发送。新 Leader 无法判断该消息是否已存在,因为 Kafka 没有内置去重功能(Message Deduplication)。
notion image

Exactly Once

对于某些比较重要的消息,我们需要保证每条消息被发送且仅被发送一次。如果将ACK级别设置为-1, 可以保证producer到server之间不会丢失数据,但是可能会重复数据。如果将ACK级别设置为0,那么生产者每条消息只会被发送一次,可能会丢失数据。为了解决这个问题,Kafka在0.11版本之后引入了幂等性机制(idempotent),也就是producer无论向broker发送多少次重复的数据,broker只会持久化一条,保证了数据不重复。
幂等性机制:假设你给朋友发了一条微信消息,因为网络问题,你点了几次 "发送",但朋友的聊天记录里仍然只会出现 1 条消息。Kafka 幂等性机制的作用类似于这个“去重”功能。
在 Kafka 1.1 及以上版本,可以通过 enable.idempotence=true 开启幂等性,默认开启 ACK=all,确保所有副本都同步写入,提升可靠性。
精确一次(Exactly Once) = 幂等性 + 至少一次(ACK = -1 + 分区副本 >= 2 + ISR最小副本数量 >= 2)
重复数据的判断标准:具有<PID, Partition, SeqNumber>相同主键的消息提交时,Broker只会持久化一条。其中PID是Kafka每次重启都会分配一个新的,Partition 表示分区号,Sequence Number是单调自增的。所以幂等性只能保证的是在单分区单会话内不重复。生产者发送的数据pid重复、seqnumber重复,则被干掉。
notion image

数据有序

生产者采用推(push)模式将消息发布到broker,每条消息都被追加到分区中,属于顺序写磁盘。顺序写磁盘效率比随机写内存要高,保障kafka吞吐率。生产的消息被不断追加到Partition log上,每个Partition中的消息都是有序的。
总结:单分区:有序,多分区:分区与分区之间无序
单分区有序
notion image
多分区,虽然这些消息是顺序发送,但是每个消息落到了不同的分区,如果想要分区中的消息全局有序,可以设置为一个分区
notion image
局部有序,需要保证生产者和消费者同时有序
notion image

数据存储

作为一款消息中间件,很多人误以为写入kafka数据是存储在内存中,但是实际上写入kafka的数据是存储在磁盘中,很多人都认为磁盘很慢,为此,官网专门有一张对此作出了说明。不过听官网语气,大意是人们认为觉得磁盘很慢,但是官网说很快,总结一句话就是:我不要你觉得,我要我觉得。
在官方文档 4.2章持久化的介绍中,官网第一篇说了这样一句话,这句话的意思是:不要害怕文件系统!(文件系统即磁盘) 。kafka很大程度上是依赖于文件系统来缓存消息。人们普遍认为“磁盘速度很慢”,这使得人们怀疑其(kafka)持久化的架构及性能是否具有竞争力。实际上,磁盘的速度比人期望的更快或者更慢取决于他们(指磁盘)如何被使用。正确设计的磁盘结构通常可以和网络一样快。
notion image
上述的意思就是,磁盘其实并不慢,磁盘的快慢取决于人们如何去使用磁盘。那么,kafka如何在高效的使用磁盘,文档中我标记红框的那部分说了这样一句话:顺序访问磁盘比随机访问内存更快!那么kafka的高效的原因下面就总结出来了: 顺序写磁盘:Kafka的producer生产数据,要写入到log文件中,写的过程是一直追加到文件末端,为顺序写。官网有数据表明,同样的磁盘,顺序写能到600M/s,而随机写只有100K/s。这与磁盘的机械机构有关,顺序写之所以快,是因为其省去了大量磁头寻址的时间。 零拷贝技术:“零拷贝技术”只用将磁盘文件的数据复制到页面缓存中一次,然后将数据从页面缓存直接发送到网络中(发送给不同的订阅者时,都可以使用同一个页面缓存),避免了重复复制操作。如果有10个消费者,传统方式下,数据复制次数为4*10=40次,而使用“零拷贝技术”只需要1+10=11次,一次为从磁盘复制到页面缓存,10次表示10个消费者各自读取一次页面缓存。 分区:kafka对每个主题进行分区提高了并发,也提高了效率。
Kafka最初的应用场景是日志处理和消息队列(MQ),更多地充当一个日志传输和存储系统,这是Kafka的核心功能,也是它的立足之本。在Kafka中,接收到的消息最终都会被存储在日志文件(log)中。每个partition分区对应一个日志文件目录,里面包含多个以.log为扩展名的文件,该log文件中存储的就是Producer生产的数据。这些日志文件是Kafka底层存储数据的核心结构,确保了数据的可靠性和持久性,即使系统出现故障,数据也不会丢失。
Producer生产的数据会被不断追加到该log文件末端,为防止log文件过大导致数据定位效率低下,Kafka采取了分片和索引机制,将每个partition分为多个segment。每个segment包括:.index文件.log文件.timeindex等文件。这些文件位于一个文件夹下,该文件夹的命名规则为:topic名称+分区序号,例如:first-0表示 Topic first 的第 0 个 Partition。
每个 Segment 由以下文件组成: .log 文件:存储实际的消息内容。 .index 文件:提供消息偏移量到 .log 文件物理位置的索引。 .timeindex 文件:根据消息的时间戳建立索引,用于快速定位时间范围内的消息。 注意:index和log文件以当前segment的第一条消息的offset命名。
notion image
接下来,我们创建一个主题test,为了方便演示,我们这里创建一个分区,一个副本
主题创建后,需要找到一个用于存储分区数据的位置,当前主题的分区数量为1,副本数量为1,我们先来查看这个副本在哪个节点
notion image
可以看到在 broker 1 节点上,然后发现文件大小是0KB
notion image
路径中的00000000000000000000.log文件就是真正存储消息数据的文件,文件名称中的0表示当前文件的起始偏移量为0,index文件和timeindex文件都是数据索引文件,用于快速定位数据。只不过index文件采用偏移量的方式进行定位,而timeindex是采用时间戳的方式。
当数据写入leade节点副本的时候,先将数据写入内存,然后周期性写入磁盘。如果数据都写入一个文件,那么文件变大了,读取数据就会变慢。所以我们可以将文件切割成多个小文件。修改server.properties配置文件,将刷写的值改小。我们这里为了演示,修改配置文件如下
然后重启kafka集群
现在我们启动生产者,并发送300条消息
可以发现每三个文件为一组,文件前缀相同,文件大小是154KB
notion image
下面我们来解析 Kafka 的日志段文件,以00000000000000000004.log为例
notion image
这个日志中一共存储了2条数据,存储了 offset=4 到 offset=5 的消息。每条数据的大小是77kb,所以数据文件的大小是154KB,刷写配置设置的是200KB Log starting offset: 4:当前日志段的起始 offset 为 4,说明 00000000000000000004.log 这个日志文件中存储的最早的消息 offset = 4
第一条消息解析: baseOffset: 4 该消息的 offset producerId: 104 生产者 ID(Kafka 事务/幂等机制用) partitionLeaderEpoch: 5 当前分区 Leader 的 epoch(Kafka 发生 Leader 变更时会增加) isTransactional: false 这条消息不是事务性消息 CreateTime: 1742646830518 消息的时间戳 size: 77 日志记录的总大小(包括元数据 + 消息体) compresscodec: none 未压缩 crc: 239474595 CRC 校验码(用于完整性校验) payload: message-5 实际消息内容(即 message-5) 第二条消息的 offset=5,消息内容message-6
其中第一条消息的position: 0表示起始位置,文件开始。第二条消息的position: 77表示第二条消息的起始位置,说明第一条消息占了 77 字节。
Log文件和Index文件详解
notion image
notion image

文件清理策略

物理上把topic分成一个或多个patition(对应 server.properties 中的num.partitions=3配置),每个patition物理上对应一个目录,该目录存储该patition的所有消息和索引文件,我这里为了方便演示,配置的是1个分区1个副本
notion image
无论消息是否被消费,kafka都会保留所有消息。有两种策略可以删除旧数据: 1)基于时间:log.retention.hours=168 2)基于大小:log.retention.bytes=1073741824 需要注意的是,因为Kafka读取特定消息的时间复杂度为O(1),即与文件大小无关,所以这里删除过期文件与提高 Kafka 性能无关。
比如上述我的topic在 broker 1 节点上,下满我们直接将topic目录删除
notion image
然后重启kafka,发现目录又重新创建了,只是文件大小为0,因为元数据存储在zk,所以会根据元数据重新创建
notion image
kafka本质是传输数据,不是存储数据,为了均衡生产者和消费者速率,所以才会把数据保存在log文件中存储,是一种临时存储,不能长时间保存,默认保存7天,超过7天就被清理掉。

消费者

数据已经存储到了Kafka的数据文件中,接下来应用程序就可以使用 Kafka Consumer API 向 Kafka 订阅主题,并从订阅的主题上接收消息了。也就是说消费者主动从kafka中拉取数据
Kafka 消费者总体工作流程
notion image
下面我们来创建一个分区,生产者发送三条数据
启动消费者进行数据消费,发现消费不到数据。因为kafka默认把偏移量设置为LEO,当前LEO为3,消费者从3这个位置消费,所以消费不到数据。然后在创建新的数据,此时消费者可以消费数据。那么这样是不是就造成了数据丢失?那么我们怎么才能从头消费呢?
notion image
-from-beginning从头消费
notion image

消费数据的方式 push & pull

Kafka的主题如果就一个分区的话,那么在硬件配置相同的情况下,消费者Consumer消费主题数据的方式没有什么太大的差别。
notion image
Kafka为了能够构建高吞吐,高可靠性,高并发的分布式消息传输系统,它的主题是允许多个分区的,那么就会发现不同的消费数据的方式区别还是很大的。如果数据由Kafka进行推送(push),那么多个分区的数据同时推送给消费者进行处理,明显一个消费者的消费能力是有限的,那么消费者无法快速处理数据,就会导致数据的积压,从而导致网络,存储等资源造成极大的压力,影响吞吐量和数据传输效率。
notion image
如果kafka的分区数据在内部可以存储的时间更长一些,再由消费者根据自己的消费能力向kafka申请(拉取)数据,那么整个数据处理的通道就会更顺畅一些。Kafka的Consumer就采用的这种拉取数据的方式。注意,如果Kafka中没有数据,消费者可能会陷入循环中,一直返回空数据。
notion image

消费者组

消费者可以根据自身的消费能力主动拉取Kafka的数据,但是毕竟自身的消费能力有限,如果主题分区的数据过多,那么消费的时间就会很长。对于kafka来讲,数据就需要长时间的进行存储,那么对Kafka集群资源的压力就非常大。如果希望提高消费者的消费能力,并且减少kafka集群的存储资源压力。所以有必要对消费者进行横向伸缩,从而提高消息消费速率。
notion image
不过这么做有一个问题,就是每一个消费者是独立,那么一个消费者就不能消费主题中的全部数据,简单来讲,就是对于某一个消费者个体来讲,主题中的部分数据是没有消费到的,也就会认为数据丢了,这个该如何解决呢?那如果我们将这多个消费者当成一个整体,是不是就可以了呢?这就是所谓的消费者组 Consumer Group。既然是一个组,那么组内必然可以有多个消费者或消费者实例(Consumer Instance),它们共享一个公共的ID,这个ID被称为Group ID。组内的所有消费者协调在一起来消费订阅主题(Subscribed Topics)的所有分区(Partition)。当然,每个分区只能由同一个消费者组内的一个Consumer实例来消费。
notion image
个人认为,理解Consumer Group记住下面这几个特性就好了: Consumer Group下可以有一个或多个Consumer实例,同一个Consumer Group中的Consumer实例Group ID相同; 消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费,为了避免数据重复消费; 消费者组之间互不影响,所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
notion image
传统的消息队列模型的缺陷在于消息一旦被消费,就会从队列中被删除,而且只能被下游的一个Consumer消费。严格来说,这一点不算是缺陷,只能算是它的一个特性。但很显然,这种模型的伸缩性(scalability)很差,因为下游的多个Consumer都要“抢”这个共享消息队列的消息。发布/订阅模型倒是允许消息被多个Consumer消费,但它的问题也是伸缩性不高,因为每个订阅者都必须要订阅主题的所有分区。
Kafka的Consumer Group完美地规避了上面提到的伸缩性差的问题,Kafka仅仅使用Consumer Group这一种机制,却同时实现了传统消息引擎系统的两大模型:如果所有实例都属于同一个Group,那么它实现的就是消息队列模型;如果所有实例分别属于不同的Group,那么它实现的就是发布/订阅模型。
在实际使用场景中,我怎么知道一个Group下该有多少个Consumer实例呢?理想情况下,Consumer实例的数量应该等于该Group订阅主题的分区总数
举个简单的例子,假设一个Consumer Group订阅了3个主题,分别是主题A,1个分区、主题B,2个分区、主题C,3个分区,总共是6个分区。通常情况下,为该Group设置6个Consumer实例是比较理想的情形,因为它能最大限度地实现高伸缩性。如果你有3个实例,那么平均下来每个实例大约消费2个分区(6 / 3 = 2)。如果你设置了8个实例,那么很遗憾,有2个实例(8 – 6 = 2)将不会被分配任何分区,它们永远处于空闲状态。因此,在实际使用过程中一般不推荐设置大于总分区数的Consumer实例。设置多余的实例只会浪费资源,而没有任何好处。
比如我们现在有3个分区,4个消费者,此时只有3个消费者能消费数据,多出的一个消费者就会闲置,不会接收任何消息
notion image
然后我们停掉一个消费者,此时那个没有消费数据的消费者则开始消费数据。如果只有一个消费者,那么这个消费者则会消费所有分区的数据
notion image
消费者组初始化流程 coordinator:辅助实现消费者组的初始化和分区的分配。 coordinator节点选择 = groupid的hashcode值 % 50( __consumer_offsets的分区数量) 例如: groupid的hashcode值 = 1,1 % 50 = 1,那么__consumer_offsets 主题的1号分区,在哪个broker上,就选择这个节点的coordinator作为这个消费者组的老大。消费者组下的所有的消费者提交offset的时候就往这个分区去提交offset。
notion image

offset

消费者组内的消费者数量可以大于分区数,多余的消费者无法消费数据,起到了备用的作用,当消费者组中的一个消费者宕掉了,备用的消费者就会顶替,那么这个备胎是从头消费还是继续消费?从头消费则会出现重复消费,那么备胎如何知道消费到哪个位置了呢?offset 是 Kafka 为每条消息分配的一个唯一的编号,它表示消息在分区中的顺序位置。offset 是从 0 开始的,每当有新的消息写入分区时,offset 就会加 1。offset 是不可变的,即使消息被删除或过期,offset 也不会改变或重用。
notion image
生产者在向 Kafka 发送消息时,可以指定一个分区键(Partition Key),Kafka 会根据这个键和分区算法来决定消息应该发送到哪个分区。如果没有指定分区键,Kafka 会采用轮询或随机的方式来选择分区。生产者也可以自定义分区算法。当消息被写入到分区后,Kafka broker 会为消息分配一个 offset,并返回给生产者。生产者可以根据返回的 offset 来确认消息是否成功写入,并进行重试或其他处理。
消费者在消费 Kafka 消息时,需要维护一个当前消费的 offset 值,以及一个已提交的 offset 值。当前消费的 offset 值表示消费者正在消费的消息的位置,已提交的 offset 值表示消费者已经确认消费过的消息的位置。消费者在消费完一条消息后,需要提交 offset 来更新已提交的 offset 值。提交 offset 的方式有两种:自动提交和手动提交。 自动提交:Kafka 提供了一个配置参数 enable.auto.commit,默认为 true,表示开启自动提交功能。自动提交功能会在后台定期将当前消费的 offset 值提交给 Kafka broker,由 auto.commit.interval.ms 参数控制。 手动提交:如果 enable.auto.commit 设置为 false,则表示关闭自动提交功能,此时消费者需要手动调用 commitSync 或 commitAsync 方法来提交 offset。手动提交功能可以让消费者更灵活地控制何时以及如何提交 offset。
无论是自动提交还是手动提交,offset 的实际存储位置都是在 Kafka 的一个内置主题中:__consumer_offsets。这个主题有 50 个分区(可配置),每个分区存储一部分消费组(Consumer Group)的 offset 信息。Kafka broker 会根据消费组 ID 和主题名来计算出一个哈希值,并将其映射到 __consumer_offsets 主题的某个分区上。__consumer_offsets 主题是 Kafka 0.9.0 版本引入的新特性,之前的版本是将 offset 存储在 Zookeeper 中。但是 Zookeeper 不适合大量写入,因此后来改为存储在 Kafka 自身中,提高了性能和可靠性。
提交 offset 的目的是为了记录消费进度,以便在消费者发生故障或重启时,能够从上次消费的位置继续消费。需要注意的是,无论是自动提交还是手动提交,都不保证提交成功。因为 Kafka broker 可能发生故障或网络延迟,导致提交失败或延迟。因此,消费者需要处理提交失败延迟的情况。 提交失败:如果提交失败,消费者可以选择重试或放弃。重试的话,可能会导致多次提交同一个 offset 值,但是不会影响正确性,因为 Kafka broker 会忽略重复的 offset 值。放弃的话,可能会导致下次启动时重新消费已经消费过的消息,但是不会影响完整性,因为 Kafka 消息是幂等的。 提交延迟:如果提交延迟,消费者可以选择等待或继续。等待的话,可能会导致消费速度变慢,或者超过 session.timeout.ms 参数设置的时间而被认为已经死亡。继续的话,可能会导致下次启动时漏掉一些没有提交成功的消息。
重置 offset:重置 offset 是消费者在启动或运行过程中,将当前消费的 offset 值修改为其他值的操作。重置 offset 的目的是为了调整消费位置,以便在需要重新消费或跳过某些消息时,能够实现这个需求。重置 offset 的方式有两种:手动重置和自动重置。 手动重置:手动重置可以让消费者精确地控制从哪个位置开始消费。例如,如果想要重新消费某个分区的所有消息,可以调用 seekToBeginning 方法将 offset 设置为 0;如果想要跳过某个分区的所有消息,可以调用 seekToEnd 方法将 offset 设置为最大值;如果想要从某个具体的位置开始消费,可以调用 seek 方法将 offset 设置为任意值。 自动重置:自动重置可以让消费者在启动时根据 auto.offset.reset 参数来决定从哪个位置开始消费。auto.offset.reset 参数有三个可选值:earliest, latest 和 none。earliest 表示从最早的可用消息开始消费;latest 表示从最新的可用消息开始消费;none 表示如果没有可用的 offset,则抛出异常。
offset 的消费和保证 offset 的消费和保证主要涉及到两个方面:顺序性一致性。 顺序性是指 Kafka 消息是否按照发送和接收的顺序进行处理。Kafka 只保证分区内的顺序性,即同一个分区内的消息按照 offset 的顺序进行发送和接收。但是不保证主题内或跨主题的顺序性,即不同分区内的消息可能会乱序发送和接收。因此,如果需要保证主题内或跨主题的顺序性,需要在生产者和消费者端进行额外的处理,例如使用同一个分区键或同一个消费组。
一致性是指 Kafka 消息是否能够被正确地发送和接收,不会出现丢失或重复的情况。Kafka 提供了三种不同级别的一致性保证:最多一次(At most once),最少一次(At least once)和精确一次(Exactly once) 最多一次:最多一次是指 Kafka 消息只会被发送或接收一次或零次,不会出现重复的情况,但是可能会出现丢失的情况。这种保证的实现方式是在生产者端关闭重试功能,在消费者端在消费消息之前提交 offset。这种保证适用于对消息丢失不敏感的场景,例如日志收集或监控。 最少一次:最少一次是指 Kafka 消息只会被发送或接收一次或多次,不会出现丢失的情况,但是可能会出现重复的情况。这种保证的实现方式是在生产者端开启重试功能,在消费者端在消费消息之后提交 offset。这种保证适用于对消息重复不敏感的场景,例如计数或累加。 精确一次:精确一次是指 Kafka 消息只会被发送或接收一次,不会出现丢失或重复的情况。这种保证的实现方式是在生产者端和消费者端使用事务功能,在消费者端使用幂等功能。这种保证适用于对消息丢失和重复都敏感的场景,例如转账或支付。

数据重复消费

kafka消费数据时,为了防止消费过程中出现故障或者节点宕机等情况,导致重启后消费者不知道从哪个位置开始继续消费,会保存消费者的偏移量,默认每5s自动保存偏移量,假如在消费者在第3s的时候出现了故障重启了,前2s的消费的数据偏移量就无法保存,消费者重启之后就会从3s前那个偏移量位置继续消费。导致数据重复消费
notion image
然后我们停止消费者,再次启动,发现数据重复消费
notion image
解决方法: 1、将偏移量保存时间改短一些,但是基于时间修改,还是会有一些问题。只是几率更小一些,时间短了会频繁对偏移量数据更新,性能下降 2、手动保存偏移量,同步提交(阻塞)、异步提交(非阻塞)。如果偏移量提交了,数据还没有来得及消费,消费者重启后就会导致数据漏消费

节点的扩容缩容

扩容

将老集群的kafka打包,拷贝到新节点,比如我这里新节点为node4
将拷贝过来的压缩包进行解压
修改server.properties配置文件,将broker.id=3,然后启动kafka
连接zk客户端,验证kafka节点是否正常加入集群
查看kafka日志是否正常,如果出现如下错误,本质原因是kafka配置文件server.properites中默认监听hostname
notion image
两种解决方案 1、修改kafka服务的配置文件,改为监听ip,将 advertised.listeners 中的your.host.name 改为服务器的IP地址即可:
2、修改producer客户端所在的电脑的host文件,在host文件中添加如下信息:
此时kafka节点启动完成了,但是数据还存放在老节点,比如我们查看之前创建的test主题
notion image
在进行分区副本重分配之前,最好是用下面方式获取一个合理的分配文件。编写 move-json-file.json 文件,这个文件就是告知想对哪些Topic进行重新分配的计算,可以写多个主题
然后执行下面的脚本,--broker-list "0,1,2,3" 这个参数是你想要分配的Brokers
-topics-to-move-json-file:指定json文件,文件内容为topic配置 -generate:尝试给出副本重分配的策略,该命令并不实际执行 -broker-list:指定具体的BrokerList,用于尝试给出分配策略,与 -generate 搭配使用
执行完后,可以看到 Kafka 生成了新的分区副本分配方案
notion image
生成的当前副本分配(原始状态):当前 test 主题的 3 个分区分别在 Broker [0,1,2],所有副本的日志存储目录为 any(Kafka 自动分配) 生成的新副本分配(计划调整后的状态),新方案将 test 主题的副本均匀分布到 4 个 Broker (0,1,2,3): 分区 0:副本从 [0,2,1] 变成 [3,0,1] 分区 1:副本从 [2,1,0] 变成 [0,1,2] 分区 2:副本从 [1,0,2] 变成 [1,2,3]
需求注意的是,此时分区移动尚未开始,它只是告诉你当前的分配和建议。保存当前分配,以防你想要回滚它。将上面得到期望的重新分配方式文件保存在一个json文件里面,创建副本存储计划,我们这里保存在了reassignment-json-file.json文件中
然后执行副本迁移
如果一切正常,会显示副本迁移开始的信息。验证迁移进度
如果返回 "Reassignment of partition(s) is complete",说明迁移完成 🎉 如果仍在进行,耐心等待(Kafka 迁移数据可能需要几分钟) 如果失败,检查 Kafka Broker 是否都在运行,并查看 Kafka 日志 /data/kafka/logs/server.log

缩容

比如我们这里要缩减一台kafka节点,比如我们这里要缩减broker 3节点。生成执行计划,然后按照服役时操作流程执行负载均衡,首先创建一个要均衡的主题
创建执行计划
创建副本存储计划,所有副本存储在broker0、broker1、broker2中
执行副本存储计划
验证副本存储计划
最后在broker 3节点停止kafka服务

helm部署kafka

1、部署方式选择 基于Kafka3.X后的集群搭建方式主要分为两种,一种是基于Zookeeper管理方式,一种是基于KRaft模式,本文主要介绍Kafka-KRaft集群模式搭建 2、KRaft模式介绍 Apache Kafka 不依赖 Apache Zookeeper的版本,被社区称之为 Kafka Raft 元数据模式,简称KRaft模式。
KRaft运行模式的Kafka集群,不会将元数据存储在 Apache ZooKeeper中。即部署新集群的时候,无需部署ZooKeeper集群,因为Kafka将元数据存储在 Controller 节点的 KRaft Quorum中。KRaft可以带来很多好处,比如可以支持更多的分区,更快速的切换Controller,也可以避免Controller缓存的元数据和Zookeeper存储的数据不一致带来的一系列问题
基于Helm进行安装
参数说明:
执行完后输出结果如下
执行如下命令获取密码
创建文件
先用如下命令创建一个 kafka-client pod:
验证生产者与消费者
notion image
notion image
 
上一篇
zookeeper
下一篇
nginx