Getting Started to Quarkus Messaging with RabbitMQ

本指南演示了您的 Quarkus 应用程序如何利用 Quarkus 消息传递与 RabbitMQ 交互。 Unresolved directive in rabbitmq.adoc - include::{includes}/extension-status.adoc[]

Prerequisites

Unresolved directive in rabbitmq.adoc - include::{includes}/prerequisites.adoc[]

Architecture

在本指南中,我们将开发两个与 RabbitMQ 代理通信的应用程序。第一个应用程序将 quote request 发送到 RabbitMQ quote requests 交换并从 quote 队列中使用消息。第二个应用程序接收 quote request 并发送回 quote

amqp qs architecture

第一个应用程序 producer 将允许用户通过 HTTP 端点请求一些报价。对于每个报价请求,都会生成一个随机标识符并返还给用户,以将报价请求置于 pending 中。与此同时,生成的请求 id 会被发送到 quote-requests 交换。

amqp qs app screenshot

第二个应用程序 processor 接下来将从 quote-requests 队列读取内容,为报价添加随机价格,然后将其发送到名为 quotes 的交换中。

最后,producer 将读取报价并使用服务器端发送的事件将它们发送到浏览器。因此,用户将看到报价价格从 pending 实时更新为接收到的价格。

Solution

我们建议您按照下一节中的说明一步步创建应用程序。但是,您可以直接转到已完成的示例。

克隆 Git 存储库: git clone {quickstarts-clone-url},或下载 {quickstarts-archive-url}[存档]。

该解决方案位于 rabbitmq-quickstart directory 中。

Creating the Maven Project

首先,我们需要创建两个项目:producer_和 _processor

要在终端中创建 _producer_项目,请运行:

Unresolved directive in rabbitmq.adoc - include::{includes}/devtools/create-app.adoc[]

此命令创建项目结构并选择我们将使用的两个 Quarkus 扩展:

  1. 反应式消息传递 RabbitMQ 连接器

  2. Quarkus REST(以前的 RESTEasy Reactive)及其 Jackson 支持来处理 JSON 有效负载

如果您已配置 Quarkus 项目,则可以通过在项目基本目录中运行以下命令将 messaging-rabbitmq 扩展添加到您的项目: Unresolved directive in rabbitmq.adoc - include::{includes}/devtools/extension-add.adoc[] 这会将以下内容添加到您的 pom.xml

pom.xml
<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-messaging-rabbitmq</artifactId>
</dependency>
build.gradle
implementation("io.quarkus:quarkus-messaging-rabbitmq")

要从相同目录创建 _processor_项目,请运行:

Unresolved directive in rabbitmq.adoc - include::{includes}/devtools/create-app.adoc[]

此时,您应该拥有以下结构:

.
├── rabbitmq-quickstart-processor
│  ├── README.md
│  ├── mvnw
│  ├── mvnw.cmd
│  ├── pom.xml
│  └── src
│     └── main
│        ├── docker
│        ├── java
│        └── resources
│           └── application.properties
└── rabbitmq-quickstart-producer
   ├── README.md
   ├── mvnw
   ├── mvnw.cmd
   ├── pom.xml
   └── src
      └── main
         ├── docker
         ├── java
         └── resources
            └── application.properties

在你的首选 IDE 中打开这两个项目。

The Quote object

Quote 类将用于 producerprocessor 项目。为了简单起见,我们将复制该类。在这两个项目中,创建 src/main/java/org/acme/rabbitmq/model/Quote.java 文件,其内容如下:

package org.acme.rabbitmq.model;

import io.quarkus.runtime.annotations.RegisterForReflection;

@RegisterForReflection
public class Quote {

    public String id;
    public int price;

    /**
    * Default constructor required for Jackson serializer
    */
    public Quote() { }

    public Quote(String id, int price) {
        this.id = id;
        this.price = price;
    }

    @Override
    public String toString() {
        return "Quote{" +
                "id='" + id + '\'' +
                ", price=" + price +
                '}';
    }
}

发送到 RabbitMQ 队列的消息和发送到浏览器客户端的服务器发送事件中将使用 Quote 对象的 JSON 表示形式。

Quarkus 具有内置功能来处理 JSON RabbitMQ 消息。

@RegisterForReflection

@RegisterForReflection 注释指示 Quarkus 在创建本机可执行文件时保留类、其字段和方法。当我们稍后在容器中以本机可执行文件的形式运行应用程序时,这点很重要。如果没有此注释,本机编译过程将在死代码消除阶段丢弃字段和方法,这会导致运行时错误。有关 @RegisterForReflection 注释的更多详细信息,请参阅 native application tips 页面。

Sending quote request

producer 项目中找到生成的 src/main/java/org/acme/rabbitmq/producer/QuotesResource.java 文件,并将内容更新为:

package org.acme.rabbitmq.producer;

import java.util.UUID;

import jakarta.ws.rs.GET;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;

import org.acme.rabbitmq.model.Quote;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;

import io.smallrye.mutiny.Multi;

@Path("/quotes")
public class QuotesResource {

    @Channel("quote-requests") Emitter<String> quoteRequestEmitter; (1)

    /**
     * Endpoint to generate a new quote request id and send it to "quote-requests" channel (which
     * maps to the "quote-requests" RabbitMQ exchange) using the emitter.
     */
    @POST
    @Path("/request")
    @Produces(MediaType.TEXT_PLAIN)
    public String createRequest() {
        UUID uuid = UUID.randomUUID();
        quoteRequestEmitter.send(uuid.toString()); (2)
        return uuid.toString();
    }
}
<1>  注入 Reactive Messaging `Emitter`,以便向 `quote-requests`通道发送消息。
<1>  在 post 请求中,生成一个随机 UUID 并使用发送器将其发送到 RabbitMQ 队列。

此通道使用我们将添加到 application.properties 文件的配置映射到 RabbitMQ 交换。打开 src/main/resource/application.properties 文件并添加:

# Configure the outgoing RabbitMQ exchange `quote-requests`
mp.messaging.outgoing.quote-requests.connector=smallrye-rabbitmq
mp.messaging.outgoing.quote-requests.exchange.name=quote-requests

我们只需要指定 smallrye-rabbitmq 连接器。默认情况下,响应式消息传递将通道名 quote-requests 映射到相同的 RabbitMQ 交换名称。

Processing quote requests

现在,让我们使用报价请求并给出价格。在 processor 项目中,找到 src/main/java/org/acme/rabbitmq/processor/QuoteProcessor.java 文件并添加以下内容:

package org.acme.rabbitmq.processor;

import java.util.Random;

import jakarta.enterprise.context.ApplicationScoped;

import org.acme.rabbitmq.model.Quote;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import io.smallrye.reactive.messaging.annotations.Blocking;

/**
 * A bean consuming data from the "quote-requests" RabbitMQ queue and giving out a random quote.
 * The result is pushed to the "quotes" RabbitMQ exchange.
 */
@ApplicationScoped
public class QuoteProcessor {

    private Random random = new Random();

    @Incoming("requests")       (1)
    @Outgoing("quotes")         (2)
    @Blocking                   (3)
    public Quote process(String quoteRequest) throws InterruptedException {
        // simulate some hard-working task
        Thread.sleep(1000);
        return new Quote(quoteRequest, random.nextInt(100));
    }
}
<1>  表明方法使用 `requests` 通道中的项目
<1>  表明方法返回的对象被发送到 `quotes` 通道
<1>  指示处理为 _blocking_,并且不能在调用方线程上运行。

process 方法将针对来自 quote-requests 队列的每个 RabbitMQ 消息而调用,并将 Quote 对象发送到 quotes 交换。

与前面的示例一样,我们需要在 application.properties 文件中配置连接器。打开 src/main/resources/application.properties 文件并添加:

# Configure the incoming RabbitMQ queue `quote-requests`
mp.messaging.incoming.requests.connector=smallrye-rabbitmq
mp.messaging.incoming.requests.queue.name=quote-requests
mp.messaging.incoming.requests.exchange.name=quote-requests

# Configure the outgoing RabbitMQ exchange `quotes`
mp.messaging.outgoing.quotes.connector=smallrye-rabbitmq
mp.messaging.outgoing.quotes.exchange.name=quotes

请注意,在这种情况下,我们有一个入站和一个出站连接器配置,每个都明确命名。配置属性的结构如下:

mp.messaging.[outgoing|incoming].{channel-name}.property=value

channel-name 片段必须与 @Incoming@Outgoing 注释中设置的值相匹配:

  • quote-requests → RabbitMQ 队列,我们从其中读取报价请求

  • quotes → RabbitMQ 交换,我们向其中写入报价

Receiving quotes

回到我们的 producer 项目。让我们修改 QuotesResource 以使用报价、将其绑定到一个 HTTP 端点以向客户端发送事件:

import io.smallrye.mutiny.Multi;
//...

@Channel("quotes") Multi<Quote> quotes;     (1)

/**
 * Endpoint retrieving the "quotes" queue and sending the items to a server sent event.
 */
@GET
@Produces(MediaType.SERVER_SENT_EVENTS) (2)
public Multi<Quote> stream() {
    return quotes; (3)
}
<1>  使用 `@Channel` 限定符注入 `quotes` 通道
<1>  表明内容是使用 `Server Sent Events` 发送的
<1>  返回流(_Reactive Stream_)

我们再次需要配置 producer 项目内的传入 quotes 通道。在 application.properties 文件内添加以下内容:

# Configure the outgoing `quote-requests` queue
mp.messaging.outgoing.quote-requests.connector=smallrye-rabbitmq

# Configure the incoming `quotes` queue
mp.messaging.incoming.quotes.connector=smallrye-rabbitmq

The HTML page

最后一步,使用 SSE 读取已转换价格的 HTML 页面。

producer 项目内创建 src/main/resources/META-INF/resources/quotes.html 文件,并包含以下内容:

<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Quotes</title>

    <link rel="stylesheet" type="text/css"
          href="https://cdnjs.cloudflare.com/ajax/libs/patternfly/3.24.0/css/patternfly.min.css">
    <link rel="stylesheet" type="text/css"
          href="https://cdnjs.cloudflare.com/ajax/libs/patternfly/3.24.0/css/patternfly-additions.min.css">
</head>
<body>
<div class="container">
    <div class="card">
        <div class="card-body">
            <h2 class="card-title">Quotes</h2>
            <button class="btn btn-info" id="request-quote">Request Quote</button>
            <div class="quotes"></div>
        </div>
    </div>
</div>
</body>
<script src="https://code.jquery.com/jquery-3.6.0.min.js"></script>
<script>
    $("#request-quote").click((event) => {
        fetch("/quotes/request", {method: "POST"})
        .then(res => res.text())
        .then(qid => {
            var row = $(`<h4 class='col-md-12' id='${qid}'>Quote # <i>${qid}</i> | <strong>Pending</strong></h4>`);
            $(".quotes").append(row);
        });
    });
    var source = new EventSource("/quotes");
    source.onmessage = (event) => {
      var json = JSON.parse(event.data);
      $(`#${json.id}`).html(function(index, html) {
        return html.replace("Pending", `\$\xA0${json.price}`);
      });
    };
</script>
</html>

这里没有特别之处。它会更新每个收到的页面的报价。

Get it running

你只需要使用以下命令运行这两个应用程序:

> mvn -f rabbitmq-quickstart-producer quarkus:dev

在另一个终端中:

> mvn -f rabbitmq-quickstart-processor quarkus:dev

Quarkus 会自动启动 RabbitMQ 代理,配置应用程序,并在不同应用程序之间共享代理实例。有关更多详细信息,请参见 Dev Services for RabbitMQ

在浏览器中打开 http://localhost:8080/quotes.html,然后通过单击按钮请求一些报价。

Running in JVM or Native mode

在非开发或测试模式下运行时,你需要启动 RabbitMQ 代理。你可以按照 RabbitMQ Docker website 中的说明进行操作,或创建一个包含以下内容的 docker-compose.yaml 文件:

version: '2'

services:

  rabbit:
    image: rabbitmq:3.12-management
    ports:
      - "5672:5672"
    networks:
      - rabbitmq-quickstart-network

  producer:
    image: quarkus-quickstarts/rabbitmq-quickstart-producer:1.0-${QUARKUS_MODE:-jvm}
    build:
      context: rabbitmq-quickstart-producer
      dockerfile: src/main/docker/Dockerfile.${QUARKUS_MODE:-jvm}
    environment:
      RABBITMQ_HOST: rabbit
      RABBITMQ_PORT: 5672
    ports:
      - "8080:8080"
    networks:
      - rabbitmq-quickstart-network

  processor:
    image: quarkus-quickstarts/rabbitmq-quickstart-processor:1.0-${QUARKUS_MODE:-jvm}
    build:
      context: rabbitmq-quickstart-processor
      dockerfile: src/main/docker/Dockerfile.${QUARKUS_MODE:-jvm}
    environment:
      RABBITMQ_HOST: rabbit
      RABBITMQ_PORT: 5672
    networks:
      - rabbitmq-quickstart-network

networks:
  rabbitmq-quickstart-network:
    name: rabbitmq-quickstart

注意 RabbitMQ 代理位置的配置方式。rabbitmq-hostrabbitmq-port 属性(AMQP_HOSTAMQP_PORT 环境变量)配置位置。

首先,确保已停止应用程序,并使用以下命令以 JVM 模式构建这两个应用程序:

> mvn -f rabbitmq-quickstart-producer clean package
> mvn -f rabbitmq-quickstart-processor clean package

打包完成后,运行 docker compose up --build。用户界面在 [role="bare"][role="bare"]http://localhost:8080/quotes.html 上公开

要以本机方式运行应用程序,我们首先需要构建本机可执行文件:

> mvn -f rabbitmq-quickstart-producer package -Dnative  -Dquarkus.native.container-build=true
> mvn -f rabbitmq-quickstart-processor package -Dnative -Dquarkus.native.container-build=true

-Dquarkus.native.container-build=true 指示 Quarkus 构建可在容器内运行的 Linux 64 位本机可执行文件。然后,使用以下命令运行系统:

> export QUARKUS_MODE=native
> docker compose up --build

和之前一样,用户界面在 [role="bare"][role="bare"]http://localhost:8080/quotes.html 上公开

Going further

本指南展示了如何使用 Quarkus 与 RabbitMQ 交互。它利用 SmallRye Reactive Messaging 构建数据流应用程序。

如果你已使用过 Kafka,你就会意识到代码是相同的。唯一的区别是连接器配置和 JSON 映射。