示例应用程序
Spring AMQP 示例项目包含两个示例应用程序。 第一个是一个简单的“Hello World”示例,演示了同步和异步消息接收。 它为理解基本组件提供了一个极好的起点。 第二个示例基于股票交易用例,演示了真实世界应用程序中常见的交互类型。 在本章中,我们将快速讲解每个示例,以便您可以专注于最重要的组件。 这两个示例都是基于 Maven 的,因此您应该能够将它们直接导入到任何支持 Maven 的 IDE(例如 SpringSource Tool Suite)。
“Hello World”示例
“Hello World”示例演示了同步和异步消息接收。
您可以将 spring-rabbit-helloworld
示例导入 IDE,然后按照下面的讨论进行操作。
同步示例
在 src/main/java
目录中,导航到 org.springframework.amqp.helloworld
包。
打开 HelloWorldConfiguration
类,注意它在类级别包含 @Configuration
注解,并在方法级别包含一些 @Bean
注解。
这是 Spring Java-based 配置的一个示例。
您可以在 {spring-framework-docs}/core/beans/java.html[此处]阅读更多相关信息。
以下清单显示了连接工厂的创建方式:
@Bean
public CachingConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory =
new CachingConnectionFactory("localhost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
return connectionFactory;
}
该配置还包含 RabbitAdmin
的一个实例,它默认查找任何类型为交换机、队列或绑定的 bean,然后将它们声明到代理上。
事实上,在 HelloWorldConfiguration
中生成的 helloWorldQueue
bean 就是一个示例,因为它是一个 Queue
实例。
以下清单显示了 helloWorldQueue
bean 的定义:
@Bean
public Queue helloWorldQueue() {
return new Queue(this.helloWorldQueueName);
}
回顾 rabbitTemplate
bean 配置,您可以看到它将 helloWorldQueue
的名称设置为其 queue
属性(用于接收消息)和 routingKey
属性(用于发送消息)。
现在我们已经探索了配置,我们可以查看实际使用这些组件的代码。
首先,打开同一包中的 Producer
类。
它包含一个 main()
方法,其中创建了 Spring ApplicationContext
。
以下清单显示了 main
方法:
public static void main(String[] args) {
ApplicationContext context =
new AnnotationConfigApplicationContext(RabbitConfiguration.class);
AmqpTemplate amqpTemplate = context.getBean(AmqpTemplate.class);
amqpTemplate.convertAndSend("Hello World");
System.out.println("Sent: Hello World");
}
在前面的示例中,AmqpTemplate
bean 被检索并用于发送 Message
。
由于客户端代码应尽可能依赖接口,因此类型是 AmqpTemplate
而不是 RabbitTemplate
。
尽管 HelloWorldConfiguration
中创建的 bean 是 RabbitTemplate
的实例,但依赖接口意味着此代码更具可移植性(您可以独立于代码更改配置)。
由于调用了 convertAndSend()
方法,模板将委托给其 MessageConverter
实例。
在这种情况下,它使用默认的 SimpleMessageConverter
,但可以在 HelloWorldConfiguration
中为 rabbitTemplate
bean 提供不同的实现。
现在打开 Consumer
类。
它实际上共享相同的配置基类,这意味着它共享 rabbitTemplate
bean。
这就是为什么我们将该模板配置为同时具有 routingKey
(用于发送)和 queue
(用于接收)的原因。
正如我们在 AmqpTemplate
中描述的那样,您也可以将“routingKey”参数传递给发送方法,并将“queue”参数传递给接收方法。
Consumer
代码基本上是 Producer
的镜像,调用 receiveAndConvert()
而不是 convertAndSend()
。
以下清单显示了 Consumer
的 main 方法:
public static void main(String[] args) {
ApplicationContext context =
new AnnotationConfigApplicationContext(RabbitConfiguration.class);
AmqpTemplate amqpTemplate = context.getBean(AmqpTemplate.class);
System.out.println("Received: " + amqpTemplate.receiveAndConvert());
}
如果您运行 Producer
然后运行 Consumer
,您应该会在控制台输出中看到 Received: Hello World
。
异步示例
hello-world-sync 讲解了同步的 Hello World 示例。
本节描述了一个稍微高级但功能更强大的选项。
经过一些修改,Hello World 示例可以提供异步接收的示例,也称为消息驱动 POJO。
事实上,有一个子包提供了正是这些:org.springframework.amqp.samples.helloworld.async
。
同样,我们从发送端开始。
打开 ProducerConfiguration
类,注意它创建了一个 connectionFactory
和一个 rabbitTemplate
bean。
这次,由于配置专门用于消息发送端,我们甚至不需要任何队列定义,RabbitTemplate
只有“routingKey”属性被设置。
回想一下,消息是发送到交换机而不是直接发送到队列的。
AMQP 默认交换机是一个没有名称的直连交换机。
所有队列都以其名称作为路由键绑定到该默认交换机。
这就是为什么我们只需要在这里提供路由键。
以下清单显示了 rabbitTemplate
的定义:
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
template.setRoutingKey(this.helloWorldQueueName);
return template;
}
由于此示例演示了异步消息接收,因此生产端被设计为连续发送消息(如果它是一个每执行一次发送一条消息的模型,就像同步版本一样,那么它实际上是一个消息驱动的消费者就不那么明显了)。
负责连续发送消息的组件被定义为 ProducerConfiguration
中的一个内部类。
它被配置为每三秒运行一次。
以下清单显示了该组件:
static class ScheduledProducer {
@Autowired
private volatile RabbitTemplate rabbitTemplate;
private final AtomicInteger counter = new AtomicInteger();
@Scheduled(fixedRate = 3000)
public void sendMessage() {
rabbitTemplate.convertAndSend("Hello World " + counter.incrementAndGet());
}
}
您不需要理解所有细节,因为真正的焦点应该放在接收端(我们将在下一节介绍)。
但是,如果您还不熟悉 Spring 任务调度支持,您可以在 {spring-framework-docs}/integration/scheduling.html#scheduling-annotation-support-scheduled[此处]了解更多信息。
简而言之,ProducerConfiguration
中的 postProcessor
bean 将任务注册到调度器。
现在我们可以转向接收端。
为了强调消息驱动 POJO 的行为,我们从响应消息的组件开始。
该类名为 HelloWorldHandler
,如下清单所示:
public class HelloWorldHandler {
public void handleMessage(String text) {
System.out.println("Received: " + text);
}
}
该类是一个 POJO。
它不扩展任何基类,不实现任何接口,甚至不包含任何导入。
它通过 Spring AMQP MessageListenerAdapter
被“适配”到 MessageListener
接口。
然后,您可以在 SimpleMessageListenerContainer
上配置该适配器。
对于此示例,容器是在 ConsumerConfiguration
类中创建的。
您可以在那里看到包装在适配器中的 POJO。
以下清单显示了 listenerContainer
的定义方式:
@Bean
public SimpleMessageListenerContainer listenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory());
container.setQueueName(this.helloWorldQueueName);
container.setMessageListener(new MessageListenerAdapter(new HelloWorldHandler()));
return container;
}
SimpleMessageListenerContainer
是一个 Spring 生命周期组件,默认情况下会自动启动。
如果您查看 Consumer
类,您会看到它的 main()
方法仅仅是一个创建 ApplicationContext
的单行引导程序。
Producer
的 main()
方法也是一个单行引导程序,因为其方法被 @Scheduled
注解的组件也会自动启动。
您可以按任意顺序启动 Producer
和 Consumer
,您应该会看到消息每三秒发送和接收一次。
股票交易
股票交易示例演示了比 hello-world-sample 更高级的消息传递场景。
然而,配置非常相似,只是稍微复杂一些。
由于我们已经详细讲解了 Hello World 配置,这里我们重点介绍此示例的不同之处。
有一个服务器将市场数据(股票报价)推送到主题交换机。
然后,客户端可以通过使用路由模式(例如,app.stock.quotes.nasdaq.*
)绑定队列来订阅市场数据源。
此演示的另一个主要功能是客户端发起并由服务器处理的请求-回复“股票交易”交互。
这涉及一个由客户端在订单请求消息本身中发送的私有 replyTo
队列。
服务器的核心配置位于 org.springframework.amqp.rabbit.stocks.config.server
包中的 RabbitServerConfiguration
类中。
它扩展了 AbstractStockAppRabbitConfiguration
。
服务器和客户端共用的资源都在那里定义,包括市场数据主题交换机(其名称为“app.stock.marketdata”)以及服务器暴露用于股票交易的队列(其名称为“app.stock.request”)。
在该通用配置文件中,您还可以看到 RabbitTemplate
上配置了一个 Jackson2JsonMessageConverter
。
服务器特定的配置包括两件事。
首先,它在 RabbitTemplate
上配置市场数据交换机,这样它就不需要每次发送 Message
时都提供该交换机名称。
它在基本配置类中定义的一个抽象回调方法中完成此操作。
以下清单显示了该方法:
public void configureRabbitTemplate(RabbitTemplate rabbitTemplate) {
rabbitTemplate.setExchange(MARKET_DATA_EXCHANGE_NAME);
}
其次,声明股票请求队列。
在这种情况下,它不需要任何显式绑定,因为它绑定到默认的无名交换机,并以其自身名称作为路由键。
如前所述,AMQP 规范定义了这种行为。
以下清单显示了 stockRequestQueue
bean 的定义:
@Bean
public Queue stockRequestQueue() {
return new Queue(STOCK_REQUEST_QUEUE_NAME);
}
现在您已经看到了服务器 AMQP 资源的配置,导航到 src/test/java
目录下的 org.springframework.amqp.rabbit.stocks
包。
在那里,您可以看到提供了 main()
方法的实际 Server
类。
它基于 server-bootstrap.xml
配置文件创建了一个 ApplicationContext
。
在那里,您可以看到发布虚拟市场数据的计划任务。
该配置依赖于 Spring 的 task
命名空间支持。
引导配置文件还导入了一些其他文件。
最有趣的是 server-messaging.xml
,它直接位于 src/main/resources
下。
在那里,您可以看到负责处理股票交易请求的 messageListenerContainer
bean。
最后,看看 server-handlers.xml
(也位于“src/main/resources”中)中定义的 serverHandler
bean。
该 bean 是 ServerHandler
类的一个实例,是消息驱动 POJO 的一个很好的例子,它也可以发送回复消息。
请注意,它本身不与框架或任何 AMQP 概念耦合。
它接受 TradeRequest
并返回 TradeResponse
。
以下清单显示了 handleMessage
方法的定义:
public TradeResponse handleMessage(TradeRequest tradeRequest) { ...
}
现在我们已经看到了服务器最重要的配置和代码,我们可以转向客户端。
最好的起点可能是 org.springframework.amqp.rabbit.stocks.config.client
包中的 RabbitClientConfiguration
。
请注意,它声明了两个队列,但没有提供显式名称。
以下清单显示了两个队列的 bean 定义:
@Bean
public Queue marketDataQueue() {
return amqpAdmin().declareQueue();
}
@Bean
public Queue traderJoeQueue() {
return amqpAdmin().declareQueue();
}
这些是私有队列,会自动生成唯一的名称。
第一个生成的队列被客户端用于绑定到服务器暴露的市场数据交换机。
回想一下,在 AMQP 中,消费者与队列交互,而生产者与交换机交互。
队列与交换机的“绑定”告诉代理将来自给定交换机的消息传递(或路由)到队列。
由于市场数据交换机是一个主题交换机,因此绑定可以用路由模式表示。
RabbitClientConfiguration
使用 Binding
对象来完成此操作,并且该对象是使用 BindingBuilder
流式 API 生成的。
以下清单显示了 Binding
:
@Value("${stocks.quote.pattern}")
private String marketDataRoutingKey;
@Bean
public Binding marketDataBinding() {
return BindingBuilder.bind(
marketDataQueue()).to(marketDataExchange()).with(marketDataRoutingKey);
}
请注意,实际值已在属性文件(src/main/resources
下的 client.properties
)中外部化,并且我们使用 Spring 的 @Value
注解来注入该值。
这通常是一个好主意。
否则,该值将被硬编码在类中,并且在不重新编译的情况下无法修改。
在这种情况下,运行多个版本的客户端同时更改用于绑定的路由模式要容易得多。
我们现在可以尝试一下。
首先运行 org.springframework.amqp.rabbit.stocks.Server
,然后运行 org.springframework.amqp.rabbit.stocks.Client
。
您应该会看到 NASDAQ
股票的虚拟报价,因为 client.properties
中与“stocks.quote.pattern”键关联的当前值为“app.stock.quotes.nasdaq.”。
现在,在保持现有 Server
和 Client
运行的同时,将该属性值更改为“app.stock.quotes.nyse.”并启动第二个 Client
实例。
您应该会看到第一个客户端仍然接收 NASDAQ 报价,而第二个客户端接收 NYSE 报价。
您可以改为更改模式以获取所有股票甚至单个股票代码。
我们探索的最后一个功能是从客户端角度的请求-回复交互。
回想一下,我们已经看到了接受 TradeRequest
对象并返回 TradeResponse
对象的 ServerHandler
。
客户端的相应代码是 org.springframework.amqp.rabbit.stocks.gateway
包中的 RabbitStockServiceGateway
。
它委托给 RabbitTemplate
来发送消息。
以下清单显示了 send
方法:
public void send(TradeRequest tradeRequest) {
getRabbitTemplate().convertAndSend(tradeRequest, new MessagePostProcessor() {
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setReplyTo(new Address(defaultReplyToQueue));
try {
message.getMessageProperties().setCorrelationId(
UUID.randomUUID().toString().getBytes("UTF-8"));
}
catch (UnsupportedEncodingException e) {
throw new AmqpException(e);
}
return message;
}
});
}
请注意,在发送消息之前,它设置了 replyTo
地址。
它提供了由 traderJoeQueue
bean 定义(前面所示)生成的队列。
以下清单显示了 StockServiceGateway
类本身的 @Bean
定义:
@Bean
public StockServiceGateway stockServiceGateway() {
RabbitStockServiceGateway gateway = new RabbitStockServiceGateway();
gateway.setRabbitTemplate(rabbitTemplate());
gateway.setDefaultReplyToQueue(traderJoeQueue());
return gateway;
}
如果您不再运行服务器和客户端,请立即启动它们。 尝试以“100 TCKR”的格式发送请求。 在模拟请求“处理”的短暂人工延迟之后,您应该会看到客户端上出现确认消息。
从非 Spring 应用程序接收 JSON
Spring 应用程序在发送 JSON 时,会将 TypeId
标头设置为完全限定的类名,以帮助接收应用程序将 JSON 转换回 Java 对象。
spring-rabbit-json
示例探讨了几种从非 Spring 应用程序转换 JSON 的技术。
另请参阅 Jackson2JsonMessageConverter
以及 {spring-amqp-java-docs}/index.html?org/springframework/amqp/support/converter/DefaultClassMapper.html[DefaultClassMapper
的 Javadoc]。