Kafka 使用ExecutorService 多线程消费

栏目: 后端 · 发布时间: 4年前

内容简介:Apache Kafka 作为当下最常用消息中间件之一。给到我的需求是需要我们处理大量的消息(如果单线程处理过多消息会出现性能瓶颈)。1.创建一个可以从topic中poll()消息后传递到线程池以进行进一步处理。

前言:

Apache Kafka 作为当下最常用消息中间件之一。给到我的需求是需要我们处理大量的消息(如果单线程处理过多消息会出现性能瓶颈)。

如何使用 Java 的ExecutorService框架来创建线程池处理大量消息?

1.创建一个可以从topic中poll()消息后传递到线程池以进行进一步处理。

2.创建工作线程,以执行每条消息的进一步处理。

1.topic消息传递到ThreadPoolExecutorService

/** kafka 消息处理*/
public class KafkaProcessor {
    private final KafkaConsumer<String, String> myConsumer;
    private ExecutorService executor;
    private static final Properties KAFKA_PROPERTIES = new Properties();
   
   //基础的kafka配置~
   static {
        KAFKA_PROPERTIES.put("bootstrap.servers", "localhost:9092");
        KAFKA_PROPERTIES.put("group.id", "test-consumer-group");
        KAFKA_PROPERTIES.put("enable.auto.commit", "true");
        KAFKA_PROPERTIES.put("auto.commit.interval.ms", "1000");
        KAFKA_PROPERTIES.put("session.timeout.ms", "30000");
        KAFKA_PROPERTIES.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KAFKA_PROPERTIES.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    }

    public KafkaProcessor() {
        this.myConsumer = new KafkaConsumer<>(KAFKA_PROPERTIES);//初始化配置
        this.myConsumer.subscribe(Arrays.asList("test")); //订阅topic=test
    }
    public void init(int numberOfThreads) {
      //创建一个线程池 
     /**
      * public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler)
       *corePoolSize : 核心线程数,一旦创建将不会再释放。如果创建的线程数还没有达到指定的核心线    程数量,将会继续创建新的核心线程,直到达到最大核心线程数后,核心线程数将不在增加;如果没有空闲的核心线程,同时又未达到最大线程数,则将继续创建非核心线程;如果核心线程数等于最大线程数,则当核心线程都处于激活状态时,任务将被挂起,等待空闲线程来执行。

       *maximumPoolSize : 最大线程数,允许创建的最大线程数量。如果最大线程数等于核心线程数,则无法创建非核心线程;如果非核心线程处于空闲时,超过设置的空闲时间,则将被回收,释放占用的资源。

       *keepAliveTime : 也就是当线程空闲时,所允许保存的最大时间,超过这个时间,线程将被释放销毁,但只针对于非核心线程。

       *unit : 时间单位,TimeUnit.SECONDS等。

       *workQueue : 任务队列,存储暂时无法执行的任务,等待空闲线程来执行任务。

       *threadFactory :  线程工程,用于创建线程。
    
       *handler : 当线程边界和队列容量已经达到最大时,用于处理阻塞时的程序
      */

      executor = new ThreadPoolExecutor(numberOfThreads, numberOfThreads,0L,TimeUnit.MILLISECONDS,new ArrayBlockingQueue<Runnable>(1000), new ThreadPoolExecutor.CallerRunsPolicy());
      
    while (true) {
        ConsumerRecords<String, String> records = myConsumer.poll(100);//每隔时间段进行消息拉取
        for (final ConsumerRecord<String, String> record : records) {
        executor.submit(new KafkaRecordHandler(record)); 
        }
      }
}
    //别忘记线程池的关闭!
    public void shutdown() {
        if (myConsumer != null) {
        myConsumer.close();
        }
        if (executor != null) {
        executor.shutdown();
        }
        try {
          if (executor != null && !executor.awaitTermination(60, TimeUnit.MILLISECONDS)) {
          executor.shutdownNow();
          }
        }catch (InterruptedException e) {
        executor.shutdownNow();
        }
}
}复制代码

2.创建工作线程

// 创建消息线程进行处理
public class KafkaRecordHandler implements Runnable {

    private ConsumerRecord<String, String> record;

    public KafkaRecordHandler(ConsumerRecord<String, String> record) {
        this.record = record;
    }

    @Override
    public void run() { 
      //业务操作...
        System.out.println("value = "+record.value());
        System.out.println("Thread id = "+ Thread.currentThread().getId());
    }
}复制代码

3.Using ?

//消费测试
public class ConsumerTest {
    public static void main(String[] args) {
      KafkaProcessor processor = new KafkaProcessor();
      try {
          processor.init(5);//指定相应的线程数!
      }catch (Exception exp) {
          processor.shutdown();
      }
    }
}复制代码

4.总结

可能并不适合所有方案,按需定制方案。


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

程序开发心理学

程序开发心理学

(美)杰拉尔德·温伯格 / 邓俊辉 / 清华大学出版社 / 2004-1-1 / 39.00元

本书开创"以人为本"研究方法的先驱,在长达25年的岁月中一直保持活力,至今仍在继续。1997年,本书作者温伯格因其在软件领域的杰出贡献,被美国计算机博物馆的计算机名人堂选为首批5位成员之一。 在计算机界,还没有任何一本计算机方面的书,在初次出版之后,能够在长达25年的岁月中一直保持活力--而且这种活力到今天仍在继续。《程序开发心理学》是开创"以人为本"研究方法的先驱,它以其对程序员们在智力、......一起来看看 《程序开发心理学》 这本书的介绍吧!

HTML 编码/解码
HTML 编码/解码

HTML 编码/解码

XML、JSON 在线转换
XML、JSON 在线转换

在线XML、JSON转换工具

Markdown 在线编辑器
Markdown 在线编辑器

Markdown 在线编辑器