Apache Kafka和Spring Integration的使用

栏目: Java · 发布时间: 7年前

内容简介:Apache Kafka当然是最常用的JMS代理,它有Apache Hadoop等分布式系统用于数据入口。与其他JMS代理相比,Apache Kafka的关键特性(从我的观点来看):让我们看看如何将Apache Kafka与Spring Integration结合使用。我们将构建将消息生成到Kafka主题的简单演示。

Apache Kafka当然是最常用的JMS代理,它有Apache Hadoop等分布式系统用于数据入口。与其他JMS代理相比,Apache Kafka的关键特性(从我的观点来看):

  • Apache Kafka是无状态的,当您使用Kafka主题的消息时,它不会被删除。Kafka对已发布的消息有明确的保留政策。所以你的所有终端都必须是幂等的。
  • Apache Kafka是严格的发布 - 订阅 JMS代理。使用Kafka,您只能向主题发送消息。没有队列概念。
  • 在Apache Kafka中,消费者分为消费者群体。已发布的消息将分发到这些使用者组中,其中每个组中只有一个使用者获取该消息。
  • 在Apache Kafka中没有队列,但如果您只有一个消费者者组,那么您可以获得点对点消息传递的效果。

让我们看看如何将Apache Kafka与Spring Integration结合使用。我们将构建将消息生成到Kafka主题的简单演示。

卡夫卡安装和启动

您需要启动Apache ZooKeeper:

$KAFKA_HOME/bin/zookeeper-server-start.sh  $KAFKA_HOME/config/zookeeper.properties

启动Kafka:

$KAFKA_HOME/bin/kafka-server-start.sh  $KAFKA_HOME/config/server.properties

创建一个主题topic:

$KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test_topic

列出Kafka创建的主题:

$KAFKA_HOME/bin/kafka-topics.sh --list --zookeeper localhost:2181

基于Spring Integration构建Apache Kafka生成器

我们将构建简单的Spring Integration应用程序发布消息到Apache Kafka作为程序参数输入。

首先,让我们创建Spring Integration基础架构:

<<b>int</b>:channel id=<font>"inputToKafka"</font><font>>
        <<b>int</b>:queue/>
    </<b>int</b>:channel>

    <<b>int</b>-kafka:outbound-channel-adapter
            id=</font><font>"kafkaOutboundChannelAdapter"</font><font>
            kafka-producer-context-ref=</font><font>"kafkaProducerContext"</font><font>
            channel=</font><font>"inputToKafka"</font><font>>
        <<b>int</b>:poller fixed-delay=</font><font>"1000"</font><font> time-unit=</font><font>"MILLISECONDS"</font><font> 
        receive-timeout=</font><font>"0"</font><font> task-executor=</font><font>"taskExecutor"</font><font>/>
    </<b>int</b>-kafka:outbound-channel-adapter>

    <task:executor id=</font><font>"taskExecutor"</font><font> pool-size=</font><font>"5"</font><font> 
     keep-alive=</font><font>"120"</font><font> queue-capacity=</font><font>"500"</font><font>/>
</font>

在这个xml配置,我们创建了队列通道 “inputToKafka”,我们将推送消息到这个通道。ID为“ kafkaOutboundChannelAdapter ”的Bean 是一个出站通道适配器,具有已定义的异步轮询,执行从“inputToKafka”通道读取消息并将其推送到Apache Kafka。

现在Apache Kafka生产者配置:

<bean id=<font>"kafkaStringSerializer"</font><font> 
          <b>class</b>=</font><font>"org.apache.kafka.common.serialization.StringSerializer"</font><font> />

    <<b>int</b>-kafka:producer-context id=</font><font>"kafkaProducerContext"</font><font>>
        <<b>int</b>-kafka:producer-configurations>
            <<b>int</b>-kafka:producer-configuration 
                  broker-list=</font><font>"localhost:9092"</font><font>
                  topic=</font><font>"test_topic"</font><font>
                  key-<b>class</b>-type=</font><font>"java.lang.String"</font><font>
                  value-<b>class</b>-type=</font><font>"java.lang.String"</font><font>
                  key-serializer=</font><font>"kafkaStringSerializer"</font><font>
                  value-serializer=</font><font>"kafkaStringSerializer"</font><font>
                  />
        </<b>int</b>-kafka:producer-configurations>
    </<b>int</b>-kafka:producer-context>
</font>

我们来看看生产者参数:

#Apache Kafka broker cluster. Cluster where we're going to 
be publishing messages. Let's go with <b>default</b> configuration. 

broker-list=<font>"localhost:9092"</font><font>
</font>
#Name of topic <b>for</b> publishing messages.
topic=<font>"test_topic"</font><font>
#Type of the optional key associated with the message
key-<b>class</b>-type=</font><font>"java.lang.String"</font><font>
#Type of value sent in the message.
value-<b>class</b>-type=</font><font>"java.lang.String"</font><font>
#Reference to the key serializer, all keys and values has to be serialized before sending into Apache Kafka.
key-serializer=</font><font>"kafkaStringSerializer"</font><font>
#Same and key serializer.
value-serializer=</font><font>"kafkaStringSerializer"</font><font>
</font>

启动消息流

要启动消息流,也就是消息生产者,我们将简单地创建SpringBoot CommandLineRunner,将命令行参数传递到提到的队列通道中:

@Component
@DependsOn(value=<font>"kafkaOutboundChannelAdapter"</font><font>)
<b>public</b> <b>class</b> MessageRunner implements CommandLineRunner {

    @Resource(name = </font><font>"inputToKafka"</font><font>)
    <b>private</b> MessageChannel messageChannel;

    @Override
    <b>public</b> <b>void</b> run(String... args) throws Exception {
        <b>for</b> (String arg1 : args) {
            messageChannel.send(
                    <b>new</b> GenericMessage<String>(arg1)
            );
        }
    }
}
</font>

如何测试应用程序

在maven应用程序的根目录中,运行:

mvn clean install

成功编译后,运行:

java -jar target / demo-0.0.1-SNAPSHOT.jar Test1 Test2 Test3

这将创建作为参数传递的String消息,并将它们推送到Apache Kafka代理中的test_topic中。

现在,如果您运行Apache Kafka消费者:

$KAFKA_HOME/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test_topic --from-beginning

会得到:

$ kafka-console-consumer.sh --zookeeper localhost:2181 --topic test_topic --from-beginning
Test1
Test2
Test3

太棒了,我们在Apache Kafka中创建了基于Spring Integration的简单消息生成器!


以上所述就是小编给大家介绍的《Apache Kafka和Spring Integration的使用》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Beginning XSLT 2.0

Beginning XSLT 2.0

Jeni Tennison / Apress / 2005-07-22 / USD 49.99

This is an updated revision of Tennison's "Beginning XSLT", updated for the new revision of the XSLT standard. XSLT is a technology used to transform an XML document with one structure into another ......一起来看看 《Beginning XSLT 2.0》 这本书的介绍吧!

MD5 加密
MD5 加密

MD5 加密工具

UNIX 时间戳转换
UNIX 时间戳转换

UNIX 时间戳转换

HSV CMYK 转换工具
HSV CMYK 转换工具

HSV CMYK互换工具