在spring boot中三分钟上手日志堆积系统kafka

栏目: Java · 发布时间: 4年前

内容简介:kafka消息堆积能力比较强,可以堆积上亿的消息,特别适合日志处理这种实时性要求不太高的场景,同时支持集群部署,相比redis堆积能力和可靠性更高完整项目代码已上传github:可以通过下面的步骤快速上手这个kafka

kafka消息堆积能力比较强,可以堆积上亿的消息,特别适合日志处理这种实时性要求不太高的场景,同时支持集群部署,相比 redis 堆积能力和可靠性更高

完整项目代码已上传github:

可以通过下面的步骤快速上手这个kafka

获取一个可用的kafka实例

可以使用 docker 一键启动一个kafka集群,参考: github.com/simplesteph…

git clone https://github.com/simplesteph/kafka-stack-docker-compose.git
cd kafka-stack-docker-compose
docker-compose -f full-stack.yml up -d
复制代码

操作效果如下

在spring boot中三分钟上手日志堆积系统kafka

使用命令 docker-compose -f full-stack.yml ps 获取可以kafka监听的端口

在spring boot中三分钟上手日志堆积系统kafka

记下kafka监听的地址9092,这个后面会用到

8000端口是这个kafka的topic的ui界面,这个界面可以查看当前的topic列表,效果如下

在spring boot中三分钟上手日志堆积系统kafka

这里也看到topic里保存的数据

准备案例项目

可以在https://start.spring.io/创建测试项目

在spring boot中三分钟上手日志堆积系统kafka

需要加上下面这三个包

  1. spring-boot-starter-web
  2. spring-kafka
  3. lombok

appliation.properties 中配置kafka的地址和使用的group-id,这个group-id名称可以自行定义,比如:myconsumergroup

spring.kafka.bootstrap-servers=127.0.0.1:9092
spring.kafka.consumer.group-id=myconsumergroup
复制代码

用kafka客户端发送消息

使用一个spring boot的service封装kafka发送消息的代码,核心代码如下

package mykafka.service;

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class Producer {

    private final KafkaTemplate<String, String> kafkaTemplate;


    private String topic = "自行定义的topic";

    Producer(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void send(String message) {
        this.kafkaTemplate.send(topic, message);
        System.out.println("Sent sample message [" + message + "] to " + topic);
    }

}
复制代码

然后编写一个接口调用这个发送kafka消息的service,核心代码如下:

@RestController
@RequestMapping("/")
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class MyController {

    private final Producer producer;

    @RequestMapping("/test1")
    public String test1() {
        producer.send(String.format("my message currentTimeMillis: %d", System.currentTimeMillis()));
        return "test1";
    }
}
复制代码

注意:上面代码里使用的kafka的topic可以自行定义,比如mytopic

然后在浏览器中访问这个接口 ip:8080/test1

在spring boot中三分钟上手日志堆积系统kafka

可以在这个kafka的topic的ui看到发送到kafka的消息

在spring boot中三分钟上手日志堆积系统kafka

可以看到这个消息已经发送到kafka了

消费消息

消费消息只需要在方法上加上KafkaListener,并指定topic和groupId即可

核心代码如下

@KafkaListener(topics = "mytopic", groupId = "myconsumergroup")
public void processMessage(String message,
                           @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
                           @Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
                           @Header(KafkaHeaders.OFFSET) List<Long> offsets) {
    log.info(
            "received message, topic: {}, partition: {}, offset: {}, message: {}",
            topics.get(0),
            partitions.get(0),
            offsets.get(0),
            message
    );
}
复制代码

操作效果如下:

在spring boot中三分钟上手日志堆积系统kafka

可以看到已经成功收到了kafka里的消息

其它客户端

php发送和消费客户端参考: github.com/arnaud-lb/p…

go客户端参考: github.com/confluentin…

一些注意的点

发送消息和消费消息需要确保topic一致

日志可以先发送到kafka做缓冲,然后通过kafka的客户端把消息取出来放到elk等日志存储系统中分析和可视化

参考链接

  1. www.baeldung.com/spring-kafk…
  2. www.baeldung.com/spring-inje…
  3. docs.confluent.io/current/cli…

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

硅谷热

硅谷热

埃弗雷特.M.罗杰斯 / 范国鹰 等 / 1985.8 / 经济科学出版社 / 1.9

《硅谷热》总共分三部分。第一部分为“硅谷的崛起”,以苹果电脑的传奇故事为主线,讲述了硅谷的发展历史。第二部分为“高技术文明”,从风险投资、创业故事、人物传奇等各个方面描绘了硅谷的生态状况。第三部分为“硅谷的明天”,讲述了硅谷模式在全球的扩散、硅谷面临的全球竞争和深远影响。 书中,硅谷这场传奇的主要角色:人物、公司、技术、产品等都综合在其中,一锅子端给了嗷嗷待哺的人们:PC革命、半导体传奇、软......一起来看看 《硅谷热》 这本书的介绍吧!

JS 压缩/解压工具
JS 压缩/解压工具

在线压缩/解压 JS 代码

html转js在线工具
html转js在线工具

html转js在线工具

RGB CMYK 转换工具
RGB CMYK 转换工具

RGB CMYK 互转工具