内容简介:通过上一章节,我们知道了Kafka的Message是如何持久化的,知道了保证高可用性、稳定性的策略。这一节来看看Kafka中如何生产Message以及相关的策略。简而言之,Producer就负责往Topic里发送数据,或者说写入数据。换言之,就是往组成这个Topic的一至多个Partition里写入数据。这里有三点需要注意:
通过上一章节,我们知道了Kafka的Message是如何持久化的,知道了保证高可用性、稳定性的策略。这一节来看看Kafka中如何生产Message以及相关的策略。
Producer
简而言之,Producer就负责往Topic里发送数据,或者说写入数据。换言之,就是往组成这个Topic的一至多个Partition里写入数据。这里有三点需要注意:
- 我们只需要通过Producer产生数据,往Topic里塞既可。Producer会自动去选择正确的、合适的Broker和Partition持久化数据。
- Producer默认采用轮询的机制选择Broker往Partition里持久化数据的。
- 如果其中有一个Broker挂了,当它再恢复时,Producer会自动接纳它。
Message keys
Producer默认采用轮询的机制选择Broker往Partition里持久化数据。但当我们需要根据数据中的某个字段按Partition进行分组或者 排序 时,就需要在每条Message里添加Key,这个Key可以是数字,也可以是字符串等等。然后相同Key的Message永远会持久化到同一个Partition。
Acks
Producer在发送生产出的数据给Broker时,可以选择三种模式,称为acks,它是Acknowledgment的缩写。意思是Broker对Producer即将发送来的数据采用何种确认方式。
acks=0
在该模式下,Producer不会等待Broker的确认反馈,即不关心Broker是否正确的将发送来的数据持久化,所以在这种模式下,很有可能会丢失数据。因为如果Broker挂了,Producer不会被通知到,所以还会不停的发送数据导致数据丢失。在对数据完整性需求不强烈的场景下,这种模式可以提高性能。
acks=1
默认采用的模式,该模式下Producer会等待Leader Broker的确认反馈,当Broker确实将数据持久化到至少一个Partition中后,给予Producer确认反馈,Producer才会继续发送数据。该模式下有几点需要注意:
- 不保证Replicas也持久化了数据。
- 当Producer没有收到Broker的确认反馈时,Producer会尝试重新发送数据。
- 当Leader Broker挂了,但是Replicas又没有持久化数据时,还是会丢失数据。
- 该模式只能说是可以有效防止数据丢失。
acks=all
该模式下,Producer同样需要等待Broker的确认,但是确认更为严格,需要所有的Partition(Leader + Replicas)都持久化数据后才返回确认信息。这种模式下,只要Replicas足够多,数据基本不会丢失。
在该模式下,还有一个重要的参数 min.insync.replicas
需要配置。该参数的意思是当 acks=all
时,至少有多少个Replicas需要确认已成功持久化数据,这个Replicas数量也包括Leader。
举个例子,假设有三个Broker,参数为 min.insync.replicas=2
、 replication.factor=3
、 acks=all
,那么Producer每次发送Message时,都需要至少2个Broker给予确认反馈,换句话说,在这个Kafka集群中,只能允许一个Broker挂掉。如果 min.insync.replicas=3
,那么一个Broker都不能挂,否则Producer在发送Message时会收到 NOT_ENOUGH_REPLICAS
的异常。
Retry
有时候Producer发送Message失败可能并不是因为Broker挂了,可能是因为网络问题,没有连接到Broker等等。这种问题可能在很短暂的时间内就会自动修复,那么在这种情况下,我们希望Producer在发送失败后能重新尝试发送。这里就需要设置 retries
这个参数,意思就是重试的次数,默认是0次,可以根据实际业务情况设置。
但是当设置了 retries
参数大于0后,有可能会带来新的问题。假如我们需要相同Key的Message进入特定的Partition,并且是要严格按照Producer生产Message的顺序排序。那么此时如果第一条Message发送失败,第二条Message发送成功了,第一条通过重试发送成功了,那Message的顺序就发生了变化。
这里又会引出一个参数 max.in.flight.requests.per.connection
,这个参数默认是5,意思是在被Broker阻止前,未通过acks确认的发送请求最大数,也就是在Broker处排队等待acks确认的Message数量。所以刚才那个场景,第一条和第二条Message都在Broker那排队等待确认放行,这时第一条失败了,等重试的第一条Message再来排队时,第二条早都通过进去了,所以排序就乱了。
如果想在设置了 retries
还要严格控制Message顺序,可以把 max.in.flight.requests.per.connection
设置为1。让Broker处永远只有一条Message在排队,就可以严格控制顺序了。但是这样做会严重影响性能(接收Message的吞吐量)。
Idempotent Producer
在实际情况中,经常会遇到一个现象,那就是当Broker给Producer返回acks确认时,网络出异常了,导致Producer没有收到ack确认,于是,Producer进行重试。如果Consumer的Offset策略(在后续章节会介绍)是 at least once
或者是 exactly once
,那么第一次对Message就已经进行了处理,比如入库。那么第二次会对相同的Message再做一次处理,对相同数据进行重复处理,势必会引起业务上的错误。整个过程如下图所示:
所以这就需要幂等Producer来保证我们处理数据的唯一性。Kafka在0.11版本之后,就为我们提供了定义幂等Producer的能力,可以通过将 enable.idempotence.config
参数设置为 true
来定义幂等Producer。将Producer定义为幂等后,还要设置其他对应的参数:
retries=Integer.MAX_VALUE max.in.flight.requests.per.connection=1 (Kafka >= v0.11 & < v1.1) max.in.flight.requests.per.connection=5 (Kafka >= v1.1) acks=all
如此设置后,可以有效防止重复消费Message,整个过程就会如下图所示:
Message Compression
消息压缩的作用不言而喻:
- 加快网络传输速度,减少消息延迟。
- 更有效的利用磁盘空间。
- 加快消息吞吐率。
只需要设置 compression.type
参数,该参数默认是 none
,可选项有 gzip
、 lz4
、 snappy
。建议使用 lz4
或者 snappy
。
Message Batch
上面介绍了 max.in.flight.requests.per.connection
参数,默认会在Broker那排队5条Message,那么如果第六条来了怎么办呢?这时候Kafka会自动开启批量处理Message的模式,将这6条Message作为一个批次进行处理。这一个批次可以看作是一次Message处理请求。
开启批量模式后,会引出两个参数:
linger.ms batch.size
过程如下图所示:
Producer Buffer
在大多数情况下,Consumer消费Message的速率是远不如Producer生产Message的速率的。所以Producer有一个缓存机制,将Broker还没来得及接收的Message缓存在内存中。缓存的大小可以通过 buffer.memory
配置,默认大小是32MB。默认存储时间为7天,这个时间可以通过设置Broker的 offset.retention.minutes
属性改变。
如果Producer的缓存被打满后,Producer会被阻塞,阻塞的最大时间可以通过 max.block.ms
配置,默认大小是60秒。
概括一下,就是当Producer生产Message的速率大于Broker接收Message(Consumer消费数据)的速率时,Producer会把Broker还没来得及接收的Message存在缓存里(内存),当存满设置的缓存大小后,Producer将不再发送Message给Broker,也就是进入阻塞状态,如果在设置的阻塞时间内,缓存还没有被释放出有用空间,那么Producer将抛出异常。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- Kafka从上手到实践-实践真知:搭建Zookeeper集群
- Kafka从上手到实践-实践真知:搭建单机Kafka
- Kafka从上手到实践-实践真知:Kafka Java Consumer
- Kafka从上手到实践-实践真知:Kafka Java Producer
- Nacos上手实践(基于Spring Cloud)
- Kafka从上手到实践-初步认知:Zookeeper
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Node.js:来一打 C++ 扩展
死月 / 电子工业出版社 / 2018-6-1 / 109
Node.js 作为近几年新兴的一种编程运行时,托 V8 引擎的福,在作为后端服务时有比较高的运行效率,在很多场景下对于我们的日常开发足够用了。不过,它还为开发者开了一个使用C++ 开发 Node.js 原生扩展的口子,让开发者进行项目开发时有了更多的选择。 《Node.js:来一打 C++ 扩展》以 Chrome V8 的知识作为基础,配合 GYP 的一些内容,将教会大家如何使用 Node......一起来看看 《Node.js:来一打 C++ 扩展》 这本书的介绍吧!