Kafka 使用ExecutorService 多线程消费

栏目: 后端 · 发布时间: 7年前

内容简介:Apache Kafka 作为当下最常用消息中间件之一。给到我的需求是需要我们处理大量的消息(如果单线程处理过多消息会出现性能瓶颈)。1.创建一个可以从topic中poll()消息后传递到线程池以进行进一步处理。

前言:

Apache Kafka 作为当下最常用消息中间件之一。给到我的需求是需要我们处理大量的消息(如果单线程处理过多消息会出现性能瓶颈)。

如何使用 Java 的ExecutorService框架来创建线程池处理大量消息?

1.创建一个可以从topic中poll()消息后传递到线程池以进行进一步处理。

2.创建工作线程,以执行每条消息的进一步处理。

1.topic消息传递到ThreadPoolExecutorService

/** kafka 消息处理*/
public class KafkaProcessor {
    private final KafkaConsumer<String, String> myConsumer;
    private ExecutorService executor;
    private static final Properties KAFKA_PROPERTIES = new Properties();
   
   //基础的kafka配置~
   static {
        KAFKA_PROPERTIES.put("bootstrap.servers", "localhost:9092");
        KAFKA_PROPERTIES.put("group.id", "test-consumer-group");
        KAFKA_PROPERTIES.put("enable.auto.commit", "true");
        KAFKA_PROPERTIES.put("auto.commit.interval.ms", "1000");
        KAFKA_PROPERTIES.put("session.timeout.ms", "30000");
        KAFKA_PROPERTIES.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KAFKA_PROPERTIES.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    }

    public KafkaProcessor() {
        this.myConsumer = new KafkaConsumer<>(KAFKA_PROPERTIES);//初始化配置
        this.myConsumer.subscribe(Arrays.asList("test")); //订阅topic=test
    }
    public void init(int numberOfThreads) {
      //创建一个线程池 
     /**
      * public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler)
       *corePoolSize : 核心线程数,一旦创建将不会再释放。如果创建的线程数还没有达到指定的核心线    程数量,将会继续创建新的核心线程,直到达到最大核心线程数后,核心线程数将不在增加;如果没有空闲的核心线程,同时又未达到最大线程数,则将继续创建非核心线程;如果核心线程数等于最大线程数,则当核心线程都处于激活状态时,任务将被挂起,等待空闲线程来执行。

       *maximumPoolSize : 最大线程数,允许创建的最大线程数量。如果最大线程数等于核心线程数,则无法创建非核心线程;如果非核心线程处于空闲时,超过设置的空闲时间,则将被回收,释放占用的资源。

       *keepAliveTime : 也就是当线程空闲时,所允许保存的最大时间,超过这个时间,线程将被释放销毁,但只针对于非核心线程。

       *unit : 时间单位,TimeUnit.SECONDS等。

       *workQueue : 任务队列,存储暂时无法执行的任务,等待空闲线程来执行任务。

       *threadFactory :  线程工程,用于创建线程。
    
       *handler : 当线程边界和队列容量已经达到最大时,用于处理阻塞时的程序
      */

      executor = new ThreadPoolExecutor(numberOfThreads, numberOfThreads,0L,TimeUnit.MILLISECONDS,new ArrayBlockingQueue<Runnable>(1000), new ThreadPoolExecutor.CallerRunsPolicy());
      
    while (true) {
        ConsumerRecords<String, String> records = myConsumer.poll(100);//每隔时间段进行消息拉取
        for (final ConsumerRecord<String, String> record : records) {
        executor.submit(new KafkaRecordHandler(record)); 
        }
      }
}
    //别忘记线程池的关闭!
    public void shutdown() {
        if (myConsumer != null) {
        myConsumer.close();
        }
        if (executor != null) {
        executor.shutdown();
        }
        try {
          if (executor != null && !executor.awaitTermination(60, TimeUnit.MILLISECONDS)) {
          executor.shutdownNow();
          }
        }catch (InterruptedException e) {
        executor.shutdownNow();
        }
}
}复制代码

2.创建工作线程

// 创建消息线程进行处理
public class KafkaRecordHandler implements Runnable {

    private ConsumerRecord<String, String> record;

    public KafkaRecordHandler(ConsumerRecord<String, String> record) {
        this.record = record;
    }

    @Override
    public void run() { 
      //业务操作...
        System.out.println("value = "+record.value());
        System.out.println("Thread id = "+ Thread.currentThread().getId());
    }
}复制代码

3.Using ?

//消费测试
public class ConsumerTest {
    public static void main(String[] args) {
      KafkaProcessor processor = new KafkaProcessor();
      try {
          processor.init(5);//指定相应的线程数!
      }catch (Exception exp) {
          processor.shutdown();
      }
    }
}复制代码

4.总结

可能并不适合所有方案,按需定制方案。


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Designing for Emotion

Designing for Emotion

Aarron Walter / Happy Cog / 2011-10-18 / USD 18.00

Make your users fall in love with your site via the precepts packed into this brief, charming book by MailChimp user experience design lead Aarron Walter. From classic psychology to case studies, high......一起来看看 《Designing for Emotion》 这本书的介绍吧!

HTML 压缩/解压工具
HTML 压缩/解压工具

在线压缩/解压 HTML 代码

JSON 在线解析
JSON 在线解析

在线 JSON 格式化工具

图片转BASE64编码
图片转BASE64编码

在线图片转Base64编码工具