kafka精确一次语义EOS的原理深入剖析-kafka 商业环境实战

栏目: 后端 · 发布时间: 5年前

内容简介:本套技术专栏是作者(秦凯新)平时工作的总结和升华,通过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。期待加入IOT时代最具战斗力的团队。QQ邮箱地址:1120746959@qq.com,如有任何学术交流,可随时联系。消费端精确到一次语义实现:consumer通过subscribe方法注册到kafka,精确一次的语义要求必须手动管理offset,按照下述步骤进行设置:Kafka 0.11.0.0版本的逆天之作,都是在消费者EOS语义较弱,需要进一

本套技术专栏是作者(秦凯新)平时工作的总结和升华,通过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。期待加入IOT时代最具战斗力的团队。QQ邮箱地址:1120746959@qq.com,如有任何学术交流,可随时联系。

kafka精确一次语义EOS的原理深入剖析-kafka 商业环境实战

1 Kafka 0.11.0.0版本的逆天之作

  • 0.11.0.0版本之前默认提供at least once语义,想象这样一种场景,分区的Leader副本所在的Broker成功的将消息写入本地磁盘,然后broker将发送响应给producer,此时假设网络出现故障导致该响应没有发送成功。此种情况下,Producer将认为消息发送请求失败,从而开启重试机制。若此时网络恢复正常,那么同一条消息将会被写入两次。
  • 基于上述案例:0.11.0.0版本提供幂等性:每个分区中精确一次且有序
  • 0.11.0.0版本提供事务:跨分区原子写入机制。

2 故障类型

  • broker可能故障:Kafka是一个高可用、持久化的系统,每一条写入一个分区的消息都会被持久化并且多副本备份(假设有n个副本)。所以,Kafka可以容忍n-1个broker故障,意味着一个分区只要至少有一个broker可用,分区就可用。Kafka的副本协议保证了只要消息被成功写入了主副本,它就会被复制到其他所有的可用副本(ISR)。
  • producer到broker的RPC调用可能失败:Kafka的持久性依赖于生产者接收broker的ack响应。没有接收成功ack不代表生产请求本身失败了。broker可能在写入消息后,发送ack给生产者的时候挂了。甚至broker也可能在写入消息前就挂了。由于生产者没有办法知道错误是什么造成的,所以它就只能认为消息没写入成功,并且会重试发送。在一些情况下,这会造成同样的消息在Kafka分区日志中重复,进而造成消费端多次收到这条消息。
  • 客户端可能会故障:精确一次交付也必须考虑客户端故障。但是我们如何知道一个客户端已经故障而不是暂时和brokers断开,或者经历一个程序短暂的暂停,区分永久性故障和临时故障是很重要的,为了正确性,broker应该丢弃僵住的生产这发送来的消息,同样,也应该不向已经僵住的消费者发送消息。一旦一个新的客户端实例启动,它应该能够从失败的实例留下的任何状态中恢复,从一个安全点开始处理。这意味着,消费的偏移量必须始终与生产的输出保持同步。

3 Producer幂等性处理机制

  • 如果出现导致生产者重试的错误,同样的消息,仍由同样的生产者发送多次,将只被写到kafka broker的日志中一次。对于单个分区,幂等生产者不会因为生产者或broker故障而发送多条重复消息。
  • kafka保存序列号仅仅需要几个额外的字段,因此这种机制的开销非常低。
  • 除了序列号,kafka会为每个Producer实例分配一个Producer id(PID),每一条消息都会有序列号,并严格递增顺序。若发送的消息的序列号小于或者等于broker端保存的序列号,那么broker会拒绝这条消息的写入操作。
  • 注意的是:当前的设计只能保证单个producer实例的EOS语义,无法实现多个Producer实例一块提供EOS语义。
  • 想要开启这个特性,获得每个分区内的精确一次语义,也就是说没有重复,没有丢失,并且有序的语义,只需要设置producer配置中的”enable.idempotence=true”。

4 事务:跨分区原子写入

  • 事务:跨分区原子写入

    将允许一个生产者发送一批到不同分区的消息,这些消息要么全部对任何一个消费者可见,要么对任何一个消费者都不可见。这个特性也允许你在一个事务中处理消费数据和提交消费偏移量,从而实现端到端的精确一次语义。

  • 主要针对消息经过Partioner分区器到多个分区的情况。

    producer.initTransactions();
      try {
        producer.beginTransaction();
        producer.send(record1);
        producer.send(record2);
        producer.commitTransaction();
      } catch(ProducerFencedException e) {
        producer.close();
      } catch(KafkaException e) {
        producer.abortTransaction();
      }
    复制代码

5 消费端的事务支持

  • 在消费者方面,有两种选择来读取事务性消息,通过隔离等级“isolation.level”消费者配置表示:

    read_commited:除了读取不属于事务的消息之外,还可以读取事务提交后的消息。
      read_uncommited:按照偏移位置读取所有消息,而不用等事务提交。这个选项类似Kafka消费者的当前语义。
    复制代码
  • 为了使用事务,需要配置消费者使用正确的隔离等级。

  • 使用新版生产者,并且将生产者的“transactional . id”配置项设置为某个唯一ID。 需要此唯一ID来提供跨越应用程序重新启动的事务状态的连续性。

6 消费端精确到一次语义实现

消费端精确到一次语义实现:consumer通过subscribe方法注册到kafka,精确一次的语义要求必须手动管理offset,按照下述步骤进行设置:

  • 1.设置enable.auto.commit = false;

  • 2.处理完消息之后不要手动提交offset,

  • 3.通过subscribe方法将consumer注册到某个特定topic,

  • 4.实现ConsumerRebalanceListener接口和consumer.seek(topicPartition,offset)方法(读取特定topic和partition的offset)

  • 5.将offset和消息一块存储,确保原子性,推荐使用事务机制。

    public class ExactlyOnceDynamicConsumer {
    
      private static OffsetManager offsetManager = new OffsetManager("storage2");
    
      public static void main(String[] str) throws InterruptedException {
    
          System.out.println("Starting ManualOffsetGuaranteedExactlyOnceReadingDynamicallyBalancedPartitionConsumer ...");
    
          readMessages();
    
      }
    
    
    
      private static void readMessages() throws InterruptedException {
    
          KafkaConsumer<String, String> consumer = createConsumer();
    
          // Manually controlling offset but register consumer to topics to get dynamically assigned partitions.
          // Inside MyConsumerRebalancerListener use consumer.seek(topicPartition,offset) to control offset
    
          consumer.subscribe(Arrays.asList("normal-topic"), new MyConsumerRebalancerListener(consumer));
    
          processRecords(consumer);
      }
    
    
      private static KafkaConsumer<String, String> createConsumer() {
          Properties props = new Properties();
          props.put("bootstrap.servers", "localhost:9092");
          String consumeGroup = "cg3";
    
          props.put("group.id", consumeGroup);
    
          props.put("enable.auto.commit", "false");
          props.put("heartbeat.interval.ms", "2000");
          props.put("session.timeout.ms", "6001");
    
          * Control maximum data on each poll, make sure this value is bigger than the maximum single record size
          props.put("max.partition.fetch.bytes", "140");
    
          props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
          props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
          return new KafkaConsumer<String, String>(props);
      }
    
      private static void processRecords(KafkaConsumer<String, String> consumer) {
    
          while (true) {
    
              ConsumerRecords<String, String> records = consumer.poll(100);
    
              for (ConsumerRecord<String, String> record : records) {
    
                  System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
                  offsetManager.saveOffsetInExternalStore(record.topic(), record.partition(), record.offset());
    
              }
          }
      }
    复制代码

    }

    public class MyConsumerRebalancerListener implements org.apache.kafka.clients.consumer.ConsumerRebalanceListener {
    
      private OffsetManager offsetManager = new OffsetManager("storage2");
      private Consumer<String, String> consumer;
    
      public MyConsumerRebalancerListener(Consumer<String, String> consumer) {
          this.consumer = consumer;
      }
    
      public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
    
          for (TopicPartition partition : partitions) {
    
              offsetManager.saveOffsetInExternalStore(partition.topic(), partition.partition(), consumer.position(partition));
          }
      }
    
      public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
    
    
          for (TopicPartition partition : partitions) {
              consumer.seek(partition, offsetManager.readOffsetFromExternalStore(partition.topic(), partition.partition()));
          }
      }
    复制代码

    }

    public class OffsetManager {
        private String storagePrefix;
        public OffsetManager(String storagePrefix) {
            this.storagePrefix = storagePrefix;
        }
    
        void saveOffsetInExternalStore(String topic, int partition, long offset) {
    
            try {
    
                FileWriter writer = new FileWriter(storageName(topic, partition), false);
    
                BufferedWriter bufferedWriter = new BufferedWriter(writer);
                bufferedWriter.write(offset + "");
                bufferedWriter.flush();
                bufferedWriter.close();
    
            } catch (Exception e) {
                e.printStackTrace();
                throw new RuntimeException(e);
            }
        }
    
        long readOffsetFromExternalStore(String topic, int partition) {
    
            try {
    
                Stream<String> stream = Files.lines(Paths.get(storageName(topic, partition)));
    
                return Long.parseLong(stream.collect(Collectors.toList()).get(0)) + 1;
    
            } catch (Exception e) {
                e.printStackTrace();
            }
    
            return 0;
        }
    
        private String storageName(String topic, int partition) {
            return storagePrefix + "-" + topic + "-" + partition;
        }
    
    }
    复制代码

6总结

Kafka 0.11.0.0版本的逆天之作,都是在消费者EOS语义较弱,需要进一步增强。

本套技术专栏是作者(秦凯新)平时工作的总结和升华,通过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。期待加入IOT时代最具战斗力的团队。QQ邮箱地址:1120746959@qq.com,如有任何学术交流,可随时联系。

秦凯新 于深圳 201812012146


以上所述就是小编给大家介绍的《kafka精确一次语义EOS的原理深入剖析-kafka 商业环境实战》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

MFC编程技巧与范例详解

MFC编程技巧与范例详解

曾凡锋、苗雨 / 清华大学出版社 / 2008-10 / 45.00元

本书集作者多年教学与软件开发经验,通过不同类型的实例详解向读者解读了如何使用MFC进行软件开发,并按实例的复杂度进行分级介绍,以满足不同层次读者的切实需要。. 本书共55个完整实例,均选自作者多年工程应用开发中的案例;内容共分14章,分别为MFC的基本概念、文档和视图、对话框、按钮控件、编辑控件、组合框控件、列表框控件、列表视图控件、树状视图控件、图像、多媒体、GDI与GDI+、网络编程、I......一起来看看 《MFC编程技巧与范例详解》 这本书的介绍吧!

图片转BASE64编码
图片转BASE64编码

在线图片转Base64编码工具

HTML 编码/解码
HTML 编码/解码

HTML 编码/解码

Base64 编码/解码
Base64 编码/解码

Base64 编码/解码