Kafka从上手到实践-庖丁解牛:Producer

栏目: 服务器 · Apache · 发布时间: 4年前

内容简介:通过上一章节,我们知道了Kafka的Message是如何持久化的,知道了保证高可用性、稳定性的策略。这一节来看看Kafka中如何生产Message以及相关的策略。简而言之,Producer就负责往Topic里发送数据,或者说写入数据。换言之,就是往组成这个Topic的一至多个Partition里写入数据。这里有三点需要注意:

通过上一章节,我们知道了Kafka的Message是如何持久化的,知道了保证高可用性、稳定性的策略。这一节来看看Kafka中如何生产Message以及相关的策略。

Producer

简而言之,Producer就负责往Topic里发送数据,或者说写入数据。换言之,就是往组成这个Topic的一至多个Partition里写入数据。这里有三点需要注意:

  • 我们只需要通过Producer产生数据,往Topic里塞既可。Producer会自动去选择正确的、合适的Broker和Partition持久化数据。
  • Producer默认采用轮询的机制选择Broker往Partition里持久化数据的。
  • 如果其中有一个Broker挂了,当它再恢复时,Producer会自动接纳它。

Kafka从上手到实践-庖丁解牛:Producer

Message keys

Producer默认采用轮询的机制选择Broker往Partition里持久化数据。但当我们需要根据数据中的某个字段按Partition进行分组或者 排序 时,就需要在每条Message里添加Key,这个Key可以是数字,也可以是字符串等等。然后相同Key的Message永远会持久化到同一个Partition。

Acks

Producer在发送生产出的数据给Broker时,可以选择三种模式,称为acks,它是Acknowledgment的缩写。意思是Broker对Producer即将发送来的数据采用何种确认方式。

acks=0

在该模式下,Producer不会等待Broker的确认反馈,即不关心Broker是否正确的将发送来的数据持久化,所以在这种模式下,很有可能会丢失数据。因为如果Broker挂了,Producer不会被通知到,所以还会不停的发送数据导致数据丢失。在对数据完整性需求不强烈的场景下,这种模式可以提高性能。

acks=1

默认采用的模式,该模式下Producer会等待Leader Broker的确认反馈,当Broker确实将数据持久化到至少一个Partition中后,给予Producer确认反馈,Producer才会继续发送数据。该模式下有几点需要注意:

  • 不保证Replicas也持久化了数据。
  • 当Producer没有收到Broker的确认反馈时,Producer会尝试重新发送数据。
  • 当Leader Broker挂了,但是Replicas又没有持久化数据时,还是会丢失数据。
  • 该模式只能说是可以有效防止数据丢失。

acks=all

该模式下,Producer同样需要等待Broker的确认,但是确认更为严格,需要所有的Partition(Leader + Replicas)都持久化数据后才返回确认信息。这种模式下,只要Replicas足够多,数据基本不会丢失。

在该模式下,还有一个重要的参数 min.insync.replicas 需要配置。该参数的意思是当 acks=all 时,至少有多少个Replicas需要确认已成功持久化数据,这个Replicas数量也包括Leader。

举个例子,假设有三个Broker,参数为 min.insync.replicas=2replication.factor=3acks=all ,那么Producer每次发送Message时,都需要至少2个Broker给予确认反馈,换句话说,在这个Kafka集群中,只能允许一个Broker挂掉。如果 min.insync.replicas=3 ,那么一个Broker都不能挂,否则Producer在发送Message时会收到 NOT_ENOUGH_REPLICAS 的异常。

Retry

有时候Producer发送Message失败可能并不是因为Broker挂了,可能是因为网络问题,没有连接到Broker等等。这种问题可能在很短暂的时间内就会自动修复,那么在这种情况下,我们希望Producer在发送失败后能重新尝试发送。这里就需要设置 retries 这个参数,意思就是重试的次数,默认是0次,可以根据实际业务情况设置。

但是当设置了 retries 参数大于0后,有可能会带来新的问题。假如我们需要相同Key的Message进入特定的Partition,并且是要严格按照Producer生产Message的顺序排序。那么此时如果第一条Message发送失败,第二条Message发送成功了,第一条通过重试发送成功了,那Message的顺序就发生了变化。

这里又会引出一个参数 max.in.flight.requests.per.connection ,这个参数默认是5,意思是在被Broker阻止前,未通过acks确认的发送请求最大数,也就是在Broker处排队等待acks确认的Message数量。所以刚才那个场景,第一条和第二条Message都在Broker那排队等待确认放行,这时第一条失败了,等重试的第一条Message再来排队时,第二条早都通过进去了,所以排序就乱了。

如果想在设置了 retries 还要严格控制Message顺序,可以把 max.in.flight.requests.per.connection 设置为1。让Broker处永远只有一条Message在排队,就可以严格控制顺序了。但是这样做会严重影响性能(接收Message的吞吐量)。

Idempotent Producer

在实际情况中,经常会遇到一个现象,那就是当Broker给Producer返回acks确认时,网络出异常了,导致Producer没有收到ack确认,于是,Producer进行重试。如果Consumer的Offset策略(在后续章节会介绍)是 at least once 或者是 exactly once ,那么第一次对Message就已经进行了处理,比如入库。那么第二次会对相同的Message再做一次处理,对相同数据进行重复处理,势必会引起业务上的错误。整个过程如下图所示:

Kafka从上手到实践-庖丁解牛:Producer

所以这就需要幂等Producer来保证我们处理数据的唯一性。Kafka在0.11版本之后,就为我们提供了定义幂等Producer的能力,可以通过将 enable.idempotence.config 参数设置为 true 来定义幂等Producer。将Producer定义为幂等后,还要设置其他对应的参数:

retries=Integer.MAX_VALUE
max.in.flight.requests.per.connection=1 (Kafka >= v0.11 & < v1.1)
max.in.flight.requests.per.connection=5 (Kafka >= v1.1)
acks=all

如此设置后,可以有效防止重复消费Message,整个过程就会如下图所示:

Kafka从上手到实践-庖丁解牛:Producer

Message Compression

消息压缩的作用不言而喻:

  • 加快网络传输速度,减少消息延迟。
  • 更有效的利用磁盘空间。
  • 加快消息吞吐率。

只需要设置 compression.type 参数,该参数默认是 none ,可选项有 gziplz4snappy 。建议使用 lz4 或者 snappy

Message Batch

上面介绍了 max.in.flight.requests.per.connection 参数,默认会在Broker那排队5条Message,那么如果第六条来了怎么办呢?这时候Kafka会自动开启批量处理Message的模式,将这6条Message作为一个批次进行处理。这一个批次可以看作是一次Message处理请求。

开启批量模式后,会引出两个参数:

linger.ms
batch.size

过程如下图所示:

Kafka从上手到实践-庖丁解牛:Producer

Producer Buffer

在大多数情况下,Consumer消费Message的速率是远不如Producer生产Message的速率的。所以Producer有一个缓存机制,将Broker还没来得及接收的Message缓存在内存中。缓存的大小可以通过 buffer.memory 配置,默认大小是32MB。默认存储时间为7天,这个时间可以通过设置Broker的 offset.retention.minutes 属性改变。

如果Producer的缓存被打满后,Producer会被阻塞,阻塞的最大时间可以通过 max.block.ms 配置,默认大小是60秒。

概括一下,就是当Producer生产Message的速率大于Broker接收Message(Consumer消费数据)的速率时,Producer会把Broker还没来得及接收的Message存在缓存里(内存),当存满设置的缓存大小后,Producer将不再发送Message给Broker,也就是进入阻塞状态,如果在设置的阻塞时间内,缓存还没有被释放出有用空间,那么Producer将抛出异常。


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Node.js:来一打 C++ 扩展

Node.js:来一打 C++ 扩展

死月 / 电子工业出版社 / 2018-6-1 / 109

Node.js 作为近几年新兴的一种编程运行时,托 V8 引擎的福,在作为后端服务时有比较高的运行效率,在很多场景下对于我们的日常开发足够用了。不过,它还为开发者开了一个使用C++ 开发 Node.js 原生扩展的口子,让开发者进行项目开发时有了更多的选择。 《Node.js:来一打 C++ 扩展》以 Chrome V8 的知识作为基础,配合 GYP 的一些内容,将教会大家如何使用 Node......一起来看看 《Node.js:来一打 C++ 扩展》 这本书的介绍吧!

在线进制转换器
在线进制转换器

各进制数互转换器

图片转BASE64编码
图片转BASE64编码

在线图片转Base64编码工具

正则表达式在线测试
正则表达式在线测试

正则表达式在线测试