入站通道适配器

以下列表显示了AMQP入站通道适配器可能的配置选项:

  • Java DSL

  • Java

  • XML

@Bean
public IntegrationFlow amqpInbound(ConnectionFactory connectionFactory) {
    return IntegrationFlow.from(Amqp.inboundAdapter(connectionFactory, "aName"))
            .handle(m -> System.out.println(m.getPayload()))
            .get();
}
@Bean
public MessageChannel amqpInputChannel() {
    return new DirectChannel();
}

@Bean
public AmqpInboundChannelAdapter inbound(SimpleMessageListenerContainer listenerContainer,
        @Qualifier("amqpInputChannel") MessageChannel channel) {
    AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(listenerContainer);
    adapter.setOutputChannel(channel);
    return adapter;
}

@Bean
public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) {
    SimpleMessageListenerContainer container =
                               new SimpleMessageListenerContainer(connectionFactory);
    container.setQueueNames("aName");
    container.setConcurrentConsumers(2);
    // ...
    return container;
}

@Bean
@ServiceActivator(inputChannel = "amqpInputChannel")
public MessageHandler handler() {
    return new MessageHandler() {

        @Override
        public void handleMessage(Message<?> message) throws MessagingException {
            System.out.println(message.getPayload());
        }

    };
}
<int-amqp:inbound-channel-adapter
                                  id="inboundAmqp"                [id="CO1-1"]1
                                  channel="inboundChannel"        [id="CO1-2"]2
                                  queue-names="si.test.queue"     [id="CO1-3"]3
                                  acknowledge-mode="AUTO"         [id="CO1-4"]4
                                  advice-chain=""                 [id="CO1-5"]5
                                  channel-transacted=""           [id="CO1-6"]6
                                  concurrent-consumers=""         [id="CO1-7"]7
                                  connection-factory=""           [id="CO1-8"]8
                                  error-channel=""                [id="CO1-9"]9
                                  expose-listener-channel=""      [id="CO1-10"]10
                                  header-mapper=""                [id="CO1-11"]11
                                  mapped-request-headers=""       [id="CO1-12"]12
                                  listener-container=""           [id="CO1-13"]13
                                  message-converter=""            [id="CO1-14"]14
                                  message-properties-converter="" [id="CO1-15"]15
                                  phase=""                        [id="CO1-16"]16
                                  prefetch-count=""               [id="CO1-17"]17
                                  receive-timeout=""              [id="CO1-18"]18
                                  recovery-interval=""            [id="CO1-19"]19
                                  missing-queues-fatal=""         [id="CO1-20"]20
                                  shutdown-timeout=""             [id="CO1-21"]21
                                  task-executor=""                [id="CO1-22"]22
                                  transaction-attribute=""        [id="CO1-23"]23
                                  transaction-manager=""          [id="CO1-24"]24
                                  batch-size=""                   [id="CO1-25"]25
                                  consumers-per-queue             [id="CO1-26"]26
                                  batch-mode="MESSAGES"/>         [id="CO1-27"]27

<1> 此适配器的唯一ID。
可选。
<2> 转换后的消息应发送到的消息通道。
必需。
<3> 消息应从中消费的AMQP队列名称(逗号分隔列表)。
必需。
<4> `MessageListenerContainer`的确认模式。
当设置为`MANUAL`时,交付标签和通道分别在消息头`amqp_deliveryTag`和`amqp_channel`中提供。
用户应用程序负责确认。
`NONE`表示不确认(`autoAck`)。
`AUTO`表示当下游流完成时适配器的容器进行确认。
可选(默认为AUTO)。
请参阅xref:amqp/inbound-ack.adoc[入站端点确认模式]。
<5> 用于处理与此入站通道适配器相关的横切行为的额外AOP Advice。
可选。
<6> 标志,指示此组件创建的通道是否具有事务性。
如果为true,它会告诉框架使用事务性通道,并根据结果,以提交或回滚结束所有操作(发送或接收),如果出现异常则表示回滚。
可选(默认为false)。
<7> 指定要创建的并发消费者数量。
默认值为`1`。
我们建议增加并发消费者数量以扩展来自队列的消息消费。
但是,请注意,一旦注册了多个消费者,任何排序保证都会丢失。
通常,对于低流量队列,请使用一个消费者。
当设置了'consumers-per-queue'时不允许。
可选。
<8> RabbitMQ `ConnectionFactory`的Bean引用。
可选(默认为`connectionFactory`)。
<9> 错误消息应发送到的消息通道。
可选。
<10> 侦听器通道(com.rabbitmq.client.Channel)是否暴露给已注册的`ChannelAwareMessageListener`。
可选(默认为true)。
<11> 对接收AMQP消息时使用的`AmqpHeaderMapper`的引用。
可选。
默认情况下,只有标准的AMQP属性(例如`contentType`)会复制到Spring Integration `MessageHeaders`。
AMQP `MessageProperties`中任何用户定义的头都不会被默认的`DefaultAmqpHeaderMapper`复制到消息中。
如果提供了'request-header-names',则不允许。
<12> 逗号分隔的AMQP头名称列表,用于从AMQP请求映射到`MessageHeaders`中。
这只能在未提供'header-mapper'引用的情况下提供。
此列表中的值也可以是用于匹配头名称的简单模式(例如"*"或"thing1*, thing2"或"*something")。
<13> 对用于接收AMQP消息的`AbstractMessageListenerContainer`的引用。
如果提供了此属性,则不应提供与侦听器容器配置相关的任何其他属性。
换句话说,通过设置此引用,您必须对侦听器容器配置承担全部责任。
唯一的例外是`MessageListener`本身。
由于这是此通道适配器实现的核心职责,因此引用的侦听器容器不得已经拥有自己的`MessageListener`。
可选。
<14> 接收AMQP消息时使用的`MessageConverter`。
可选。
<15> 接收AMQP消息时使用的`MessagePropertiesConverter`。
可选。
<16> 指定底层`AbstractMessageListenerContainer`启动和停止的阶段。
启动顺序从低到高,停止顺序与此相反。
默认情况下,此值为`Integer.MAX_VALUE`,这意味着此容器尽可能晚启动,尽可能早停止。
可选。
<17> 告诉AMQP代理在单个请求中向每个消费者发送多少消息。
通常,您可以将此值设置得很高以提高吞吐量。
它应大于或等于事务大小(请参阅此列表后面的`batch-size`属性)。
可选(默认为`1`)。
<18> 接收超时(毫秒)。
可选(默认为`1000`)。
<19> 指定底层`AbstractMessageListenerContainer`恢复尝试之间的间隔(毫秒)。
可选(默认为`5000`)。
<20> 如果为'true'并且代理上没有队列可用,容器在启动期间会抛出致命异常并停止(如果在容器运行时队列被删除,则在三次尝试被动声明队列后)。
如果为`false`,容器不会抛出异常并进入恢复模式,尝试根据`recovery-interval`重新启动。
可选(默认为`true`)。
<21> 在底层`AbstractMessageListenerContainer`停止后,AMQP连接被强制关闭之前,等待工作者(毫秒)的时间。
如果在关闭信号到来时有任何工作者处于活动状态,只要它们能在此超时内完成处理,就允许它们完成处理。
否则,连接将关闭,消息将保持未确认(如果通道是事务性的)。
可选(默认为`5000`)。
<22> 默认情况下,底层`AbstractMessageListenerContainer`使用`SimpleAsyncTaskExecutor`实现,它为每个任务启动一个新线程,异步运行。
默认情况下,并发线程数是无限的。
请注意,此实现不重用线程。
考虑使用线程池`TaskExecutor`实现作为替代。
可选(默认为`SimpleAsyncTaskExecutor`)。
<23> 默认情况下,底层`AbstractMessageListenerContainer`创建`DefaultTransactionAttribute`的新实例(它采用EJB方法对运行时异常进行回滚,但不对已检查异常进行回滚)。
可选(默认为`DefaultTransactionAttribute`)。
<24> 在底层`AbstractMessageListenerContainer`上设置对外部`PlatformTransactionManager`的bean引用。
事务管理器与`channel-transacted`属性协同工作。
如果在框架发送或接收消息时已经存在事务,并且`channelTransacted`标志为`true`,则消息事务的提交或回滚将推迟到当前事务结束。
如果`channelTransacted`标志为`false`,则事务语义不适用于消息操作(它是自动确认的)。
有关更多信息,请参阅
https://docs.spring.io/spring-amqp/reference/html/%255Freference.html#%5Ftransactions[使用Spring AMQP进行事务]。
可选。
<25> 告诉`SimpleMessageListenerContainer`在单个请求中处理多少条消息。
为了获得最佳结果,它应小于或等于`prefetch-count`中设置的值。
当设置了'consumers-per-queue'时不允许。
可选(默认为`1`)。
<26> 指示底层侦听器容器应为`DirectMessageListenerContainer`而不是默认的`SimpleMessageListenerContainer`。
有关更多信息,请参阅https://docs.spring.io/spring-amqp/reference/html/[Spring AMQP参考手册]。
<27> 当容器的`consumerBatchEnabled`为`true`时,确定适配器如何在消息有效负载中呈现消息批次。
当设置为`MESSAGES`(默认)时,有效负载是`List<Message<?>>`,其中每条消息都包含从传入AMQP `Message`映射的头,并且有效负载是转换后的`body`。
当设置为`EXTRACT_PAYLOADS`时,有效负载是`List<?>`,其中元素从AMQP `Message`主体转换而来。
`EXTRACT_PAYLOADS_WITH_HEADERS`类似于`EXTRACT_PAYLOADS`,但此外,从`MessageProperties`映射的每个消息的头会以`List<Map<String, Object>`的形式存储在相应的索引处;头名称是`AmqpInboundChannelAdapter.CONSOLIDATED_HEADERS`。
container

请注意,当使用XML配置外部容器时,您不能使用Spring AMQP命名空间来定义容器。 这是因为命名空间至少需要一个`<listener/>`元素。 在此环境中,侦听器是适配器内部的。 因此,您必须使用正常的Spring `<bean/>`定义来定义容器,如以下示例所示:

<bean id="container"
 class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory" />
    <property name="queueNames" value="aName.queue" />
    <property name="defaultRequeueRejected" value="false"/>
</bean>

尽管Spring Integration JMS和AMQP支持类似,但存在重要差异。 JMS入站通道适配器在底层使用`JmsDestinationPollingSource`,并期望配置一个轮询器。 AMQP入站通道适配器使用`AbstractMessageListenerContainer`,并且是消息驱动的。 在这方面,它更类似于JMS消息驱动通道适配器。

从版本5.5开始,AmqpInboundChannelAdapter`可以配置一个`org.springframework.amqp.rabbit.retry.MessageRecoverer`策略,该策略在内部调用重试操作时用于`RecoveryCallback。 有关更多信息,请参阅`setMessageRecoverer()` JavaDocs。 @Publisher`注解也可以与@RabbitListener`结合使用:

@Configuration
@EnableIntegration
@EnableRabbit
@EnablePublisher
public static class ContextConfiguration {

    @Bean
    QueueChannel fromRabbitViaPublisher() {
        return new QueueChannel();
    }

    @RabbitListener(queuesToDeclare = @Queue("publisherQueue"))
    @Publisher("fromRabbitViaPublisher")
    @Payload("#args.payload.toUpperCase()")
    public void consumeForPublisher(String payload) {

    }

}

默认情况下,@Publisher AOP拦截器处理方法调用的返回值。 但是,@RabbitListener`方法的返回值被视为AMQP回复消息。 因此,这种方法不能与@Publisher`一起使用,所以建议使用带有针对方法参数的SpEL表达式的`@Payload`注解。 有关`@Publisher`的更多信息,请参阅注解驱动配置部分。

当在侦听器容器中使用独占或单活动消费者时,建议将容器属性`forceStop`设置为`true`。 这将防止竞态条件,即在停止容器后,另一个消费者可能在此实例完全停止之前开始消费消息。

批处理消息

有关批处理消息的更多信息,请参阅Spring AMQP文档

要使用Spring Integration生成批处理消息,只需使用`BatchingRabbitTemplate`配置出站端点。

接收批处理消息时,默认情况下,侦听器容器会提取每个片段消息,并且适配器会为每个片段生成一个`Message<?>`。 从版本5.2开始,如果容器的`deBatchingEnabled`属性设置为`false`,则由适配器执行解批处理,并生成单个`Message<List<?>>`,其有效负载是片段有效负载的列表(如果适用,经过转换)。

默认的`BatchingStrategy`是`SimpleBatchingStrategy`,但可以在适配器上重写。

当重试操作需要恢复时,`org.springframework.amqp.rabbit.retry.MessageBatchRecoverer`必须与批处理一起使用。