No alive nodes found in your cluster Spring Boot集成Java DSL | 码农网

Spring Boot集成Java DSL

栏目: Java · 发布时间: 5个月前

来源: www.jdon.com

本文转载自:https://www.jdon.com/51378,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有。

Spring Integration Java DSL已经融合到 Spring Integration Core 5.0 ,这是一个聪明而明显的举动,因为:

  • 基于Java Config启动新Spring项目的每个人都使用它
  • SI Java DSL使您可以使用Lambdas等新的强大 Java 8功能
  • 您可以使用 基于 IntegrationFlowBuilderBuilder模式 构建流

让我们看看基于ActiveMQ JMS的示例如何使用它。

Maven依赖:

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-activemq</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.integration</groupId>
        <artifactId>spring-integration-core</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.integration</groupId>
        <artifactId>spring-integration-jms</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>

    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-kahadb-store</artifactId>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.springframework.integration/spring-integration-java-dsl -->
    <dependency>
        <groupId>org.springframework.integration</groupId>
        <artifactId>spring-integration-java-dsl</artifactId>
        <version>1.2.3.RELEASE</version>
    </dependency>
</dependencies>

示例1:Jms入站网关

我们有以下 ServiceActivator

@Service
public class ActiveMQEndpoint {
    @ServiceActivator(inputChannel = "inboundChannel")
    public void processMessage(final String inboundPayload) {
        System.out.println("Inbound message: "+inboundPayload);
    }
}

如果您想使用SI Java DSL 将inboundPayload从Jms队列发送到 Gateway 风格的激活器,那么请使用DSL  Jms 工厂:

@Bean
public DynamicDestinationResolver dynamicDestinationResolver() {
    return new DynamicDestinationResolver();
}

@Bean
public ActiveMQConnectionFactory connectionFactory() {
    return new ActiveMQConnectionFactory();
}

@Bean
public DefaultMessageListenerContainer listenerContainer() {
    final DefaultMessageListenerContainer defaultMessageListenerContainer = new DefaultMessageListenerContainer();
    defaultMessageListenerContainer.setDestinationResolver(dynamicDestinationResolver());
    defaultMessageListenerContainer.setConnectionFactory(connectionFactory());
    defaultMessageListenerContainer.setDestinationName("jms.activeMQ.Test");
    return defaultMessageListenerContainer;
}

@Bean
public MessageChannel inboundChannel() {
    return MessageChannels.direct("inboundChannel").get();
}

@Bean
public JmsInboundGateway dataEndpoint() {
    return Jms.inboundGateway(listenerContainer())
            .requestChannel(inboundChannel()).get();
}

通过dataEndpoint bean 返回 JmsInboundGatewaySpec ,您还可以向SI通道或Jms目标发送回复。查看文档。

示例2:Jms消息驱动的通道适配器

如果您正在寻找替换消息驱动通道适配器的XML JMS配置,那么 JmsMessageDrivenChannelAdapter 是一种适合您的方式:

@Bean
public DynamicDestinationResolver dynamicDestinationResolver() {
    return new DynamicDestinationResolver();
}

@Bean
public ActiveMQConnectionFactory connectionFactory() {
    return new ActiveMQConnectionFactory();
}

@Bean
public DefaultMessageListenerContainer listenerContainer() {
    final DefaultMessageListenerContainer defaultMessageListenerContainer = new DefaultMessageListenerContainer();
    defaultMessageListenerContainer.setDestinationResolver(dynamicDestinationResolver());
    defaultMessageListenerContainer.setConnectionFactory(connectionFactory());
    defaultMessageListenerContainer.setDestinationName("jms.activeMQ.Test");
    return defaultMessageListenerContainer;
}

@Bean
public MessageChannel inboundChannel() {
    return MessageChannels.direct("inboundChannel").get();
}

@Bean
public JmsMessageDrivenChannelAdapter dataEndpoint() {
    final ChannelPublishingJmsMessageListener channelPublishingJmsMessageListener =
            new ChannelPublishingJmsMessageListener();
    channelPublishingJmsMessageListener.setExpectReply(false);
    final JmsMessageDrivenChannelAdapter messageDrivenChannelAdapter = new
            JmsMessageDrivenChannelAdapter(listenerContainer(), channelPublishingJmsMessageListener
            );

    messageDrivenChannelAdapter.setOutputChannel(inboundChannel());
    return messageDrivenChannelAdapter;
}

与前面的示例一样,入站有效负载如样本1中一样发送给激活器。

示例3:使用JAXB的Jms消息驱动的通道适配器

在典型的场景中,您希望通过Jms接受 XML 作为文本消息,将其转换为JAXB存根并在服务激活器中处理它。我将向您展示如何使用SI Java DSL执行此操作,但首先让我们为xml处理添加两个依赖项:

<dependency>
        <groupId>org.springframework.integration</groupId>
        <artifactId>spring-integration-xml</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-oxm</artifactId>
    </dependency>

我们将通过JMS接受shiporders ,所以首先XSD命名为shiporder.xsd:

<?xml version="1.0" encoding="UTF-8" ?>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">

    <xs:element name="shiporder">
        <xs:complexType>
            <xs:sequence>
                <xs:element name="orderperson" type="xs:string"/>
                <xs:element name="shipto">
                    <xs:complexType>
                        <xs:sequence>
                            <xs:element name="name" type="xs:string"/>
                            <xs:element name="address" type="xs:string"/>
                            <xs:element name="city" type="xs:string"/>
                            <xs:element name="country" type="xs:string"/>
                        </xs:sequence>
                    </xs:complexType>
                </xs:element>
                <xs:element name="item" maxOccurs="unbounded">
                    <xs:complexType>
                        <xs:sequence>
                            <xs:element name="title" type="xs:string"/>
                            <xs:element name="note" type="xs:string" minOccurs="0"/>
                            <xs:element name="quantity" type="xs:positiveInteger"/>
                            <xs:element name="price" type="xs:decimal"/>
                        </xs:sequence>
                    </xs:complexType>
                </xs:element>
            </xs:sequence>
            <xs:attribute name="orderid" type="xs:string" use="required"/>
        </xs:complexType>
    </xs:element>

</xs:schema>

新增JAXB maven plugin 生成JAXB存根:

  <plugin>
            <groupId>org.codehaus.mojo</groupId>
            <artifactId>jaxb2-maven-plugin</artifactId>
            <version>2.3.1</version>
            <executions>
                <execution>
                    <id>xjc-schema1</id>
                    <goals>
                        <goal>xjc</goal>
                    </goals>
                    <configuration>
                        <!-- Use all XSDs under the west directory for sources here. -->
                        <sources>
                            <source>src/main/resources/xsds/shiporder.xsd</source>
                        </sources>

                        <!-- Package name of the generated sources. -->
                        <packageName>com.example.stubs</packageName>
                        <outputDirectory>src/main/java</outputDirectory>
                        <clearOutputDir>false</clearOutputDir>
                    </configuration>
                </execution>
            </executions>
        </plugin>

我们已经准备好了存根类和一切,现在使用Jaxb magic的Java DSL JMS消息驱动适配器:

/**
 * Sample 3: Jms message driven adapter with JAXB
 */<font>
@Bean
<b>public</b> JmsMessageDrivenChannelAdapter dataEndpoint() {
    <b>final</b> ChannelPublishingJmsMessageListener channelPublishingJmsMessageListener =
            <b>new</b> ChannelPublishingJmsMessageListener();
    channelPublishingJmsMessageListener.setExpectReply(false);
    channelPublishingJmsMessageListener.setMessageConverter(<b>new</b> MarshallingMessageConverter(shipOrdersMarshaller()));
    <b>final</b> JmsMessageDrivenChannelAdapter messageDrivenChannelAdapter = <b>new</b>
            JmsMessageDrivenChannelAdapter(listenerContainer(), channelPublishingJmsMessageListener
    );

    messageDrivenChannelAdapter.setOutputChannel(inboundChannel());
    <b>return</b> messageDrivenChannelAdapter;
}

@Bean
<b>public</b> Jaxb2Marshaller shipOrdersMarshaller() {
    Jaxb2Marshaller marshaller = <b>new</b> Jaxb2Marshaller();
    marshaller.setContextPath(</font><font>"com.example.stubs"</font><font>);
    <b>return</b> marshaller;
}
</font>

XML配置在Java中使用它可以为您提供如此强大的功能和灵活性。要完成此示例,inboundChannel的服务激活器将如下所示:

<font><i>/**
 * Sample 3
 * @param shiporder
 */</i></font><font>
@ServiceActivator(inputChannel = </font><font>"inboundChannel"</font><font>)
<b>public</b> <b>void</b> processMessage(<b>final</b> Shiporder shiporder) {
    System.out.println(shiporder.getOrderid());
    System.out.println(shiporder.getOrderperson());
}
</font>

要测试流,您可以使用以下XML通过JConsole发送到JMS队列:

 <?xml version=<font>"1.0"</font><font> encoding=</font><font>"UTF-8"</font><font>?>        
    <shiporder orderid=</font><font>"889923"</font><font>
        xmlns:xsi=</font><font>"http://www.w3.org/2001/XMLSchema-instance"</font><font>
        xsi:noNamespaceSchemaLocation=</font><font>"shiporder.xsd"</font><font>>
      <orderperson>John Smith</orderperson>
        <shipto>
          <name>Ola Nordmann</name>
          <address>Langgt 23</address>
          <city>4000 Stavanger</city>
          <country>Norway</country>
        </shipto>
        <item>
          <title>Empire Burlesque</title>
          <note>Special Edition</note>
          <quantity>1</quantity>
          <price>10.90</price>
          </item>
        <item>
          <title>Hide your heart</title>
          <quantity>1</quantity>
          <price>9.90</price>
        </item>
    </shiporder>
</font>

有关如何使用ActiveMQ和JConsole的快速概述,请查看本 教程

示例4:具有JAXB和有效负载根路由的Jms消息驱动的通道适配器

另一种典型情况是接受XML作为JMS文本消息,将其转换为JAXB存根并根据有效负载根类型将有效负载路由到某个服务激活器。当然SI Java DSL支持所有类型的路由,我将向您展示如何根据有效载荷类型进行路由。

首先,将以下XSD添加到shiporder.xsd所在的文件夹中,并将其命名为purchaseorder.xsd:

<xsd:schema xmlns:xsd=<font>"http://www.w3.org/2001/XMLSchema"</font><font>
            xmlns:tns=</font><font>"http://tempuri.org/PurchaseOrderSchema.xsd"</font><font>
            targetNamespace=</font><font>"http://tempuri.org/PurchaseOrderSchema.xsd"</font><font>
            elementFormDefault=</font><font>"qualified"</font><font>>
    <xsd:element name=</font><font>"PurchaseOrder"</font><font>>
        <xsd:complexType>
            <xsd:sequence>
                <xsd:element name=</font><font>"ShipTo"</font><font> type=</font><font>"tns:USAddress"</font><font> maxOccurs=</font><font>"2"</font><font>/>
                <xsd:element name=</font><font>"BillTo"</font><font> type=</font><font>"tns:USAddress"</font><font>/>
            </xsd:sequence>
            <xsd:attribute name=</font><font>"OrderDate"</font><font> type=</font><font>"xsd:date"</font><font>/>
        </xsd:complexType>
    </xsd:element>

    <xsd:complexType name=</font><font>"USAddress"</font><font>>
        <xsd:sequence>
            <xsd:element name=</font><font>"name"</font><font>   type=</font><font>"xsd:string"</font><font>/>
            <xsd:element name=</font><font>"street"</font><font> type=</font><font>"xsd:string"</font><font>/>
            <xsd:element name=</font><font>"city"</font><font>   type=</font><font>"xsd:string"</font><font>/>
            <xsd:element name=</font><font>"state"</font><font>  type=</font><font>"xsd:string"</font><font>/>
            <xsd:element name=</font><font>"zip"</font><font>    type=</font><font>"xsd:integer"</font><font>/>
        </xsd:sequence>
        <xsd:attribute name=</font><font>"country"</font><font> type=</font><font>"xsd:NMTOKEN"</font><font> fixed=</font><font>"US"</font><font>/>
    </xsd:complexType>
</xsd:schema>
</font>

然后添加到jaxb maven插件配置:

 <plugin>
            <groupId>org.codehaus.mojo</groupId>
            <artifactId>jaxb2-maven-plugin</artifactId>
            <version>2.3.1</version>
            <executions>
                <execution>
                    <id>xjc-schema1</id>
                    <goals>
                        <goal>xjc</goal>
                    </goals>
                    <configuration>
                        <!-- Use all XSDs under the west directory <b>for</b> sources here. -->
                        <sources>
                            <source>src/main/resources/xsds/shiporder.xsd</source>
                            <source>src/main/resources/xsds/purchaseorder.xsd</source>
                        </sources>

                        <!-- Package name of the generated sources. -->
                        <packageName>com.example.stubs</packageName>
                        <outputDirectory>src/main/java</outputDirectory>
                        <clearOutputDir>false</clearOutputDir>
                    </configuration>
                </execution>
            </executions>
        </plugin>

运行mvn clean install以生成新XSD的JAXB存根。现在承诺有效负载根映射:

@Bean
<b>public</b> Jaxb2Marshaller ordersMarshaller() {
    Jaxb2Marshaller marshaller = <b>new</b> Jaxb2Marshaller();
    marshaller.setContextPath(<font>"com.example.stubs"</font><font>);
    <b>return</b> marshaller;
}

</font><font><i>/**
 * Sample 4: Jms message driven adapter with Jaxb and Payload routing.
 * @return
 */</i></font><font>
@Bean
<b>public</b> JmsMessageDrivenChannelAdapter dataEndpoint() {
    <b>final</b> ChannelPublishingJmsMessageListener channelPublishingJmsMessageListener =
            <b>new</b> ChannelPublishingJmsMessageListener();
    channelPublishingJmsMessageListener.setMessageConverter(<b>new</b> MarshallingMessageConverter(ordersMarshaller()));
    <b>final</b> JmsMessageDrivenChannelAdapter messageDrivenChannelAdapter = <b>new</b>
            JmsMessageDrivenChannelAdapter(listenerContainer(), channelPublishingJmsMessageListener
    );

    messageDrivenChannelAdapter.setOutputChannel(inboundChannel());
    <b>return</b> messageDrivenChannelAdapter;
}

@Bean
<b>public</b> IntegrationFlow payloadRootMapping() {
    <b>return</b> IntegrationFlows.from(inboundChannel()).<Object, Class<?>>route(Object::getClass, m->m
            .subFlowMapping(Shiporder.<b>class</b>, sf->sf.handle((MessageHandler) message -> {
                <b>final</b> Shiporder shiporder = (Shiporder) message.getPayload();
                System.out.println(shiporder.getOrderperson());
                System.out.println(shiporder.getOrderid());
            }))
            .subFlowMapping(PurchaseOrder.<b>class</b>, sf->sf.handle((MessageHandler) message -> {
                <b>final</b> PurchaseOrder purchaseOrderType = (PurchaseOrder) message.getPayload();
                System.out.println(purchaseOrderType.getBillTo().getName());
            }))
    ).get();
}
</font>

注意payloadRootMapping bean,让我们解释一下重要的部分:

  • <Object, Class<?>> route - 表示来自inboundChannel的输入将是Object,并且将根据Class <?>执行路由
  • subFlowMapping(Shiporder.class.. - ShipOders的处理。
  • subFlowMapping(PurchaseOrder.class ... - 处理PurchaseOrders。

要测试ShipOrder有效负载,请使用示例3中的XML,以测试PurchaseOrder有效负载,使用以下XML:

<?xml version=<font>"1.0"</font><font> encoding=</font><font>"utf-8"</font><font>?>  
<PurchaseOrder OrderDate=</font><font>"1900-01-01"</font><font> xmlns=</font><font>"http://tempuri.org/PurchaseOrderSchema.xsd"</font><font>>  
  <ShipTo country=</font><font>"US"</font><font>>  
    <name>name1</name>  
    <street>street1</street>  
    <city>city1</city>  
    <state>state1</state>  
    <zip>1</zip>  
  </ShipTo>  
  <ShipTo country=</font><font>"US"</font><font>>  
    <name>name2</name>  
    <street>street2</street>  
    <city>city2</city>  
    <state>state2</state>  
    <zip>-79228162514264337593543950335</zip>  
  </ShipTo>  
  <BillTo country=</font><font>"US"</font><font>>  
    <name>name1</name>  
    <street>street1</street>  
    <city>city1</city>  
    <state>state1</state>  
    <zip>1</zip>  
  </BillTo>  
</PurchaseOrder>
</font>

应根据subflow 子流Map路由两个有效载荷。

示例5:IntegrationFlowAdapter

除了企业集成模式的其他实现(check them out )),我需要提到IntegrationFlowAdapter。通过扩展此类并实现buildFlow方法,如:

[url=https:<font><i>//bitbucket.org/Component/]@Component[/url] </i></font><font>
<b>public</b> <b>class</b> MyFlowAdapter <b>extends</b> IntegrationFlowAdapter {

@Autowired
 <b>private</b> ConnectionFactory rabbitConnectionFactory;

 @Override
 <b>protected</b> IntegrationFlowDefinition<?> buildFlow() {
      <b>return</b> from(Amqp.inboundAdapter(<b>this</b>.rabbitConnectionFactory, </font><font>"myQueue"</font><font>))
               .<String, String>transform(String::toLowerCase)
               .channel(c -> c.queue(</font><font>"myFlowAdapterOutput"</font><font>));
 }
</font>

你可以将bean的重复声明包装成一个组件并给它们所需的流量。然后可以配置这样的组件并将其作为一个类实例提供给调用代码!

因此,让我们举例说明这个repo中的示例3更短一些,并为所有JmsEndpoints定义基类,并在其中定义重复bean:

<b>public</b> <b>class</b> JmsEndpoint <b>extends</b> IntegrationFlowAdapter {

    <b>private</b> String queueName;

    <b>private</b> String channelName;

    <b>private</b> String contextPath;

    <font><i>/**
     * @param queueName
     * @param channelName
     * @param contextPath
     */</i></font><font>
    <b>public</b> JmsEndpoint(String queueName, String channelName, String contextPath) {
        <b>this</b>.queueName = queueName;
        <b>this</b>.channelName = channelName;
        <b>this</b>.contextPath = contextPath;
    }

    @Override
    <b>protected</b> IntegrationFlowDefinition<?> buildFlow() {
        <b>return</b> from(Jms.messageDrivenChannelAdapter(listenerContainer())
            .jmsMessageConverter(<b>new</b> MarshallingMessageConverter(shipOrdersMarshaller()))
        ).channel(channelName);
    }

    @Bean
    <b>public</b> Jaxb2Marshaller shipOrdersMarshaller() {
        Jaxb2Marshaller marshaller = <b>new</b> Jaxb2Marshaller();
        marshaller.setContextPath(contextPath);
        <b>return</b> marshaller;
    }

    @Bean
    <b>public</b> DynamicDestinationResolver dynamicDestinationResolver() {
        <b>return</b> <b>new</b> DynamicDestinationResolver();
    }

    @Bean
    <b>public</b> ActiveMQConnectionFactory connectionFactory() {
        <b>return</b> <b>new</b> ActiveMQConnectionFactory();
    }

    @Bean
    <b>public</b> DefaultMessageListenerContainer listenerContainer() {
        <b>final</b> DefaultMessageListenerContainer defaultMessageListenerContainer = <b>new</b> DefaultMessageListenerContainer();
        defaultMessageListenerContainer.setDestinationResolver(dynamicDestinationResolver());
        defaultMessageListenerContainer.setConnectionFactory(connectionFactory());
        defaultMessageListenerContainer.setDestinationName(queueName);
        <b>return</b> defaultMessageListenerContainer;
    }

    @Bean
    <b>public</b> MessageChannel inboundChannel() {
        <b>return</b> MessageChannels.direct(channelName).get();
    }
}
</font>

现在声明特定队列的Jms端点很容易:

@Bean
<b>public</b> JmsEndpoint jmsEndpoint() {
    <b>return</b> <b>new</b> JmsEndpoint(<font>"jms.activeMQ.Test"</font><font>, </font><font>"inboundChannel"</font><font>, </font><font>"com.example.stubs"</font><font>);
}
</font>

inboundChannel的服务激活器:

<font><i>/**
 * Sample 3, 5
 * @param shiporder
 */</i></font><font>
@ServiceActivator(inputChannel = </font><font>"inboundChannel"</font><font>)
<b>public</b> <b>void</b> processMessage(<b>final</b> Shiporder shiporder) {
    System.out.println(shiporder.getOrderid());
    System.out.println(shiporder.getOrderperson());
}
</font>

您不应该错过在项目中使用IntegrationFlowAdapter。我喜欢它的概念。

我最近在 Embedit 的新的基于Spring Boot的项目中开始使用Spring Integration Java DSL 。即使有一些配置,我发现它非常有用。

  • 它很容易调试。不添加像wiretap这样的配置。
  • 阅读起来要容易得多。是的,即使是lambdas!
  • 它很强大。在Java配置中,您现在有很多选择。

点击标题见原文, 源码


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Essential ActionScript 3.0

Essential ActionScript 3.0

Colin Moock / Adobe Dev Library / June 22, 2007 / $34.64

ActionScript 3.0 is a huge upgrade to Flash's programming language. The enhancements to ActionScript's performance, feature set, ease of use, cleanliness, and sophistication are considerable. Essentia......一起来看看 《Essential ActionScript 3.0》 这本书的介绍吧!

JSON 在线解析
JSON 在线解析

在线 JSON 格式化工具

XML、JSON 在线转换
XML、JSON 在线转换

在线XML、JSON转换工具

XML 在线格式化
XML 在线格式化

在线 XML 格式化压缩工具