Apache Pulsar Reference Guide
本参考指南展示了 Quarkus 应用程序如何使用 Quarkus Messaging 与 Apache Pulsar 交互。
Introduction
Apache Pulsar是一个开源分布式消息和流平台,专为云而构建。它提供了一个针对具有分层存储功能的服务器消息的多租户高性能解决方案。
Pulsar 实现了发布订阅模式:
-
生产者将消息发布到 topics。
-
消费者为这些主题创建 subscriptions,以接收和处理传入消息,并在处理完成后向代理发送 acknowledgments。
-
当创建订阅时,Pulsar 会保留所有消息,即使消费者断开连接。只有当消费者确认所有这些消息都已成功处理时,才会丢弃保留的消息。
Pulsar 集群包括:
-
一个或多个 brokers,属于无状态组件。
-
一个 metadata store,用于维护专题元数据、架构、协调和集群配置。
-
一组 bookies,用于持久化存储消息。
Quarkus Extension for Apache Pulsar
Quarkus 通过 SmallRye Reactive Messaging 框架支持 Apache Pulsar。基于 Eclipse MicroProfile Reactive Messaging 规范 3.0,它提出了桥接 CDI 和事件驱动的灵活编程模型。
本指南深入介绍了 Apache Pulsar 和 SmallRye Reactive Messaging 框架。快速入门,请参阅 Getting Started to Quarkus Messaging with Apache Pulsar。 |
你可以在项目基本目录中运行以下命令向你的项目添加 messaging-pulsar
扩展:
Unresolved directive in pulsar.adoc - include::{includes}/devtools/extension-add.adoc[]
这会将以下内容添加到构建文件中:
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-messaging-pulsar</artifactId>
</dependency>
implementation("io.quarkus:quarkus-messaging-pulsar")
该扩展包括 |
Configuring SmallRye Pulsar Connector
因为 SmallRye Reactive Messaging 框架支持不同的消息传递后端,例如 Apache Kafka、Apache Pulsar、AMQP、Apache Camel、JMS、MQTT 等,所以它采用通用词汇:
-
应用程序发送并接收 messages。
Message
封装一个 payload,可以用一些 metadata 扩展。这应该与一个 PulsarMessage
区分,后者由 value 和 key 组成。使用 Pulsar 连接器,一个 Reactive Messaging message 对应于一个 Pulsar message。 -
消息在 channels 上传输。应用程序组件连接到通道以发布和使用消息。Pulsar 连接器将 channels 映射到 Pulsar topics。
-
通道使用 connectors 连接到消息后端。连接器配置为将传入消息映射到一个特定通道(应用程序使用),并收集发送到特定通道的传出消息。每个连接器都专用于一个特定消息传递技术。例如,处理 Pulsar 的连接器名为
smallrye-pulsar
。
具有传入通道的 Pulsar 连接器的最小配置如下:
%prod.pulsar.client.serviceUrl=pulsar:6650 1
mp.messaging.incoming.prices.connector=smallrye-pulsar 2
<1> 为生产配置文件配置 Pulsar 代理服务 URL。你可以使用 `mp.messaging.incoming.$channel.serviceUrl` 属性全局配置或按通道配置。在开发模式下和运行测试时,<<pulsar-dev-services>> 自动启动一个 Pulsar 代理。 <1> 配置连接器以管理价格通道。默认情况下,_topic_ 名称与通道名称相同。
你可以配置主题属性来覆盖它。
|
Connector auto-attachment
如果你的类路径上有一个连接器,则可以省略
|
有关更多配置选项,请参见 Configuring Pulsar clients。
Receiving messages from Pulsar
Pulsar 连接器使用 Pulsar 客户端连接到 Pulsar 代理,并创建消费者以接收来自 Pulsar 代理的消息,并且将每个 Pulsar Message`映射到反应式消息传递 `Message
。
Example
假设您有一个正在运行且可以使用 `pulsar:6650`地址访问的 Pulsar 代理。按如下方式配置您的应用程序以接收 `prices`通道上的 Pulsar 消息:
mp.messaging.incoming.prices.serviceUrl=pulsar://pulsar:6650 (1)
mp.messaging.incoming.prices.subscriptionInitialPosition=Earliest (2)
-
配置 Pulsar 代理服务 URL。
-
确保消费者订阅开始接收来自 `Earliest`位置的消息。
您不需要设置 Pulsar 主题,也不需要设置消费者名称。默认情况下,连接器使用通道名称 ( |
在 Pulsar 中,消费者需要为主题订阅提供 |
然后,您的应用程序可以直接接收 `double`有效负载:
import org.eclipse.microprofile.reactive.messaging.Incoming;
import jakarta.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class PriceConsumer {
@Incoming("prices")
public void consume(double price) {
// process your price.
}
}
或者,您可以检索反应式消息传递类型 Message<Double>
:
@Incoming("prices")
public CompletionStage<Void> consume(Message<Double> msg) {
// access record metadata
var metadata = msg.getMetadata(PulsarIncomingMessageMetadata.class).orElseThrow();
// process the message payload.
double price = msg.getPayload();
// Acknowledge the incoming message (acknowledge the Pulsar message back to the broker)
return msg.ack();
}
反应式消息传递 `Message`类型允许使用的方法访问传入的消息元数据,并手动处理确认。
如果您想直接访问 Pulsar 消息对象,请使用:
@Incoming("prices")
public void consume(org.apache.pulsar.client.api.Message<Double> msg) {
String key = msg.getKey();
String value = msg.getValue();
String topic = msg.topicName();
// ...
}
`org.apache.pulsar.client.api.Message`由底层 Pulsar 客户端提供,可以直接在消费者方法中使用。
或者,您的应用程序可以在您的 bean 中注入一个 Multi
,使用通道名称进行标识,并订阅它的事件,如下例所示:
import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.reactive.messaging.Channel;
import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
import org.jboss.resteasy.reactive.RestStreamElementType;
@Path("/prices")
public class PriceResource {
@Inject
@Channel("prices")
Multi<Double> prices;
@GET
@Path("/prices")
@RestStreamElementType(MediaType.TEXT_PLAIN)
public Multi<Double> stream() {
return prices;
}
}
使用 |
可以将以下类型注入为通道:
@Inject @Channel("prices") Multi<Double> streamOfPayloads;
@Inject @Channel("prices") Multi<Message<Double>> streamOfMessages;
@Inject @Channel("prices") Publisher<Double> publisherOfPayloads;
@Inject @Channel("prices") Publisher<Message<Double>> publisherOfMessages;
与前面的 Message`示例一样,如果注入的通道接收有效负载 (`Multi<T>
),则会自动确认消息,并支持多个订阅者。如果已注入的通道接收 Message (Multi<Message<T>>
),您将负责确认和广播。
Blocking processing
响应式消息传递在 I/O 线程中调用你的方法。有关此主题的更多详细信息,请查看 Quarkus Reactive Architecture documentation。但是,你通常需要将响应式消息传递与数据库交互等阻塞式处理结合使用。为此,你需要使用 @Blocking
注解,指示处理为 blocking 且不应该在调用者线程上运行。
例如,以下代码说明了如何使用带有 Panache 的 Hibernate 将传入有效负载存储到数据库:
import io.smallrye.reactive.messaging.annotations.Blocking;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.transaction.Transactional;
@ApplicationScoped
public class PriceStorage {
@Incoming("prices")
@Transactional
public void store(int priceInUsd) {
Price price = new Price();
price.value = priceInUsd;
price.persist();
}
}
有 2 个
它们具有相同的效果。因此,可以同时使用。第一个提供更精细的调整,例如要使用的工作器池以及是否保留顺序。第二个也与 Quarkus 的其他响应式特性结合使用,使用默认工作器池并保留顺序。
可以在 SmallRye Reactive Messaging – Handling blocking execution 中找到关于 |
@RunOnVirtualThread
有关在 Java {@s7} 上运行阻塞处理,请参阅 {@s8}。 |
@Transactional
如果用 {@s9} 对您的方法进行了注解,则它将自动被视为`{@s11}`,即使没有用 {@s10} 对方法进行注解。 |
Pulsar Subscription Types
可以灵活地使用 Pulsar *subscriptionType*消费者配置来实现不同的消息传递场景,例如发布-订阅或排队。
-
*Exclusive*订阅类型允许为“扇出式发布-订阅消息传递”指定 unique subscription name。这是默认订阅类型。
-
Shared、*Key_Shared*或 *Failover*订阅类型允许多个消费者共享 same subscription name,以便在消费者之间实现“消息排队”。
如果未提供订阅名称,Quarkus 将生成一个唯一 ID。
Deserialization and Pulsar Schema
Pulsar 连接器允许配置 Pulsar 消费者基础架构的模式配置。有关更多信息,请参阅 Pulsar Schema Configuration & Auto Schema Discovery。
Acknowledgement Strategies
当由 Pulsar 消息产生的消息 acknowledged*时,连接器会向 Pulsar 代理发送 acknowledgement request。所有反应式消息传递消息都需要 *acknowledged,这在大多数情况下都是自动处理的。可以使用以下两种策略向 Pulsar 代理发送确认请求:
-
*Individual acknowledgement*是默认策略,即向代理发送每个消息的确认请求。
-
*Cumulative acknowledgement*使用 `ack-strategy=cumulative`配置,消费器仅确认收到的最后一条消息。流中直到(且包括)所提供的消息的所有消息都不会重新传递给该消费器。
默认情况下,Pulsar 消费器不会等待来自代理的确认确认来验证确认。你可以使用 `ackReceiptEnabled=true`启用此功能。 |
Failure Handling Strategies
如果由 Pulsar 消息生成的消息是 nacked,则会应用错误策略。Quarkus Pulsar 扩展支持 4 个策略:
-
nack
*(default)*向代理发送 negative acknowledgment,触发代理将此消息重新传递给消费器。可以使用 `negativeAckRedeliveryDelayMicros`和 `negativeAck.redeliveryBackoff`属性进一步配置否定确认。 -
`fail`使应用程序失败,不再处理任何消息。
-
`ignore`记录错误,但将应用确认策略并且处理将继续。
-
`continue`记录错误,但处理继续进行,不应用确认或否定确认。此策略可与 Acknowledgement timeout配置结合使用。
-
`reconsume-later`使用 `reconsumeLater`API 将消息发送至 retry letter topic以便在延迟后重新使用。可以使用 `reconsumeLater.delay`属性配置延迟,默认值为 3 秒。可以通过将 `io.smallrye.reactive.messaging.pulsar.PulsarReconsumeLaterMetadata`实例添加到错误元数据来配置每个消息的自定义延迟或属性。
Acknowledgement timeout
与否定确认类似,对于 acknowledgement timeout机制,Pulsar 客户端跟踪未确认的消息,针对给定的 ackTimeout*时间段,并向代理发送 *redeliver unacknowledged messages request,因此代理将未确认的消息重新发送给消费器。
要配置超时和重新传递退避机制,可以设置 `ackTimeoutMillis`和 `ackTimeout.redeliveryBackoff`属性。`ackTimeout.redeliveryBackoff`值接受以毫秒为单位的最小延迟、以毫秒为单位的最大延迟和乘数的逗号分隔值:
mp.messaging.incoming.out.failure-strategy=continue
mp.messaging.incoming.out.ackTimeoutMillis=10000
mp.messaging.incoming.out.ackTimeout.redeliveryBackoff=1000,60000,2
Reconsume later and retry letter topic
retry letter topic将未成功使用的消息推送到拒信主题中并继续消息使用。请注意,拒信主题可以用于不同的消息重新传递方法,例如确认超时、否定确认或重试信主题。
mp.messaging.incoming.data.failure-strategy=reconsume-later
mp.messaging.incoming.data.reconsumeLater.delay=5000
mp.messaging.incoming.data.retryEnable=true
mp.messaging.incoming.data.negativeAck.redeliveryBackoff=1000,60000,2
Dead-letter topic
dead letter topic将未成功使用的消息推送到拒信主题中,并继续消息使用。请注意,拒信主题可以用于不同的消息重新传递方法,例如确认超时、否定确认或重试信主题。
mp.messaging.incoming.data.failure-strategy=nack
mp.messaging.incoming.data.deadLetterPolicy.maxRedeliverCount=2
mp.messaging.incoming.data.deadLetterPolicy.deadLetterTopic=my-dead-letter-topic
mp.messaging.incoming.data.deadLetterPolicy.initialSubscriptionName=my-dlq-subscription
mp.messaging.incoming.data.subscriptionType=Shared
用于重新传递的 *Negative acknowledgment*或 *acknowledgment timeout*方法将重新传递包含至少一条未处理消息的消息批。有关更多信息,请参见 Producer Batching。
Receiving Pulsar Messages in Batches
默认情况下,传入的方法单独接收每个 Pulsar 消息。你可以使用 `batchReceive=true`属性,或在消费器配置中设置 `batchReceivePolicy`来启用批模式。
@Incoming("prices")
public CompletionStage<Void> consumeMessage(Message<org.apache.pulsar.client.api.Messages<Double>> messages) {
for (org.apache.pulsar.client.api.Message<Double> msg : messages.getPayload()) {
String key = msg.getKey();
String topic = msg.getTopicName();
long timestamp = msg.getEventTime();
//... process messages
}
// ack will commit the latest offsets (per partition) of the batch.
return messages.ack();
}
@Incoming("prices")
public void consumeRecords(org.apache.pulsar.client.api.Messages<Double> messages) {
for (org.apache.pulsar.client.api.Message<Double> msg : messages) {
//... process messages
}
}
或者你可以直接接收有效负载列表到消费方法:
@Incoming("prices")
public void consume(List<Double> prices) {
for (double price : prices) {
// process price
}
}
Quarkus 会自动侦测传入通道的批类型并自动设置批配置。你可以使用 `mp.messaging.incoming.$channel.batchReceive`属性明确配置批模式。 |
Sending messages to Pulsar
Pulsar 连接器可以将响应式消息传递 Message
写为 Pulsar 消息。
Example
假设你有一个正在运行且可以使用 `pulsar:6650`地址访问的 Pulsar 代理。按照以下方式配置你的应用程序,以将 `prices`通道中的消息写入 Pulsar 消息:
mp.messaging.outgoing.prices.serviceUrl=pulsar://pulsar:6650 (1)
-
配置 Pulsar 代理服务 URL。
无需设置 Pulsar 主题或生产者名称。默认情况下,连接器使用频道名称 ( |
然后,应用程序必须向 prices
频道发送 Message<Double>
。它可以使用 double
有效负载,如下面的代码片段所示:
import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import jakarta.enterprise.context.ApplicationScoped;
import java.time.Duration;
import java.util.Random;
@ApplicationScoped
public class PulsarPriceProducer {
private final Random random = new Random();
@Outgoing("prices-out")
public Multi<Double> generate() {
// Build an infinite stream of random prices
// It emits a price every second
return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
.map(x -> random.nextDouble());
}
}
请注意,generate 方法返回 Multi<Double>
,它实现了 Flow.Publisher
接口。此发布者将由框架用于生成消息并将其发送到已配置的 Pulsar 主题。
除了返回有效负载,还可以返回 io.smallrye.reactive.messaging.pulsar.OutgoingMessage
来发送 Pulsar 消息:
@Outgoing("out")
public Multi<OutgoingMessage<Double>> generate() {
return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
.map(x -> OutgoingMessage.of("my-key", random.nextDouble()));
}
有效负载可以封装在 `org.eclipse.microprofile.reactive.messaging.Message`中,以更好地控制写入记录:
@Outgoing("generated-price")
public Multi<Message<Double>> generate() {
return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
.map(x -> Message.of(random.nextDouble())
.addMetadata(PulsarOutgoingMessageMetadata.builder()
.withKey("my-key")
.withProperties(Map.of("property-key", "value"))
.build()));
}
发送 Messages
时,可以添加 io.smallrye.reactive.messaging.pulsar.PulsarOutgoingMessageMetadata
的实例来影响消息写入 Pulsar 的方式。
除了返回 Flow.Publisher
的方法签名之外,传出方法还可以返回单个消息。在这种情况下,生产者将使用此方法作为生成器来创建无限流。
@Outgoing("prices-out") T generate(); // T excluding void
@Outgoing("prices-out") Message<T> generate();
@Outgoing("prices-out") Uni<T> generate();
@Outgoing("prices-out") Uni<Message<T>> generate();
@Outgoing("prices-out") CompletionStage<T> generate();
@Outgoing("prices-out") CompletionStage<Message<T>> generate();
Serialization and Pulsar Schema
Pulsar 连接器允许为底层 Pulsar 生产者配置架构配置。有关更多信息,请参见 Pulsar Schema Configuration & Auto Schema Discovery。
Sending key/value pairs
为了将 Kev/Value 对发送到 Pulsar,可以使用 KeyValue 架构配置 Pulsar 生产者架构。
package pulsar.outbound;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Produces;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.schema.KeyValue;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import io.smallrye.common.annotation.Identifier;
@ApplicationScoped
public class PulsarKeyValueExample {
@Identifier("out")
@Produces
Schema<KeyValue<String, Long>> schema = Schema.KeyValue(Schema.STRING, Schema.INT64);
@Incoming("in")
@Outgoing("out")
public KeyValue<String, Long> process(long in) {
return new KeyValue<>("my-key", in);
}
}
如果需要更多控制来书写记录,请使用 PulsarOutgoingMessageMetadata
。
Acknowledgement
在从生产者接收到消息后,Pulsar 代理将 MessageId
分配给消息并将其发送回生产者,确认消息已发布。
默认情况下,连接器会等待 Pulsar 确认记录才能继续处理(确认接收的 Message
)。可以通过将 waitForWriteCompletion
属性设置为 false
来禁用此功能。
如果无法写入记录,则会 nacked
消息。
Pulsar 客户端会自动在发生故障时重试发送消息,直到达到 send timeout。可以将 send timeout 与 sendTimeoutMs
属性进行配置,默认值为 30 秒。
Back-pressure and inflight records
Pulsar 出站连接器处理背压,监控等待写入 Pulsar 代理的待处理消息数。待处理消息数使用 maxPendingMessages
属性进行配置,默认为 1000。
连接器只会同时发送该数量的消息。除非至少有一个待处理的消息得到代理确认,否则不会发送其他消息。然后,当代理的一个待处理消息得到确认时,连接器将向 Pulsar 写入一条新消息。
还可以通过将 maxPendingMessages
设置为 0
来删除待处理消息的限制。请注意,Pulsar 还允许使用 maxPendingMessagesAcrossPartitions
配置每个分区中待处理的消息数。
Pulsar Transactions and Exactly-Once Processing
Pulsar transactions 允许事件流应用程序在一次原子操作中消费、处理和生成消息。
事务允许一个或多个生产者向多个主题发送一批消息,其中一批消息中的所有消息最终对任何消费者可见,或对消费者不可见。
为了使用,事务支持需要在代理配置中使用 transactionCoordinatorEnabled=true
和 systemTopicEnabled=true
代理配置激活。
在客户端上,也需要在 PulsarClient
配置中启用事务支持:
mp.messaging.outgoing.tx-producer.enableTransaction=true
Pulsar 连接器提供 PulsarTransactions
自定义发射器,用于在事务中写入记录。
它可以作为常规发射器 @Channel
使用:
package pulsar.outbound;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.pulsar.OutgoingMessage;
import io.smallrye.reactive.messaging.pulsar.transactions.PulsarTransactions;
@ApplicationScoped
public class PulsarTransactionalProducer {
@Inject
@Channel("tx-out-example")
PulsarTransactions<OutgoingMessage<Integer>> txProducer;
@Inject
@Channel("other-producer")
PulsarTransactions<String> producer;
@Incoming("in")
public Uni<Void> emitInTransaction(Message<Integer> in) {
return txProducer.withTransaction(emitter -> {
emitter.send(OutgoingMessage.of("a", 1));
emitter.send(OutgoingMessage.of("b", 2));
emitter.send(OutgoingMessage.of("c", 3));
producer.send(emitter, "4");
producer.send(emitter, "5");
producer.send(emitter, "6");
return Uni.createFrom().completionStage(in::ack);
});
}
}
给定 withTransaction
方法的函数接收一个 TransactionalEmitter
来生成记录,并返回一个 Uni
来提供事务的结果。如果处理成功完成,则刷新生产者并提交事务。如果处理抛出异常、返回一个失败的 Uni
或标记 TransactionalEmitter
要中止,则中止事务。
多个事务性生产者可以参与单一事务。这确保在事务启动后发送所有消息,并在提交事务之前,刷新所有参与的生产者。 |
如果这种方法是在 Vert.x 上下文中调用的,则处理函数也在该上下文中调用。否则,它在生产者的发送线程中调用。
Exactly-Once Processing
Pulsar 事务 API 还可以与生成的消息一起管理事务中的消费者偏移。这反过来又支持以消费-转换-生成模式(也称为恰好一次处理)将消费者与事务性生产者耦合在一起。这意味着应用程序消费消息,处理它们,将结果发布到主题中,并在事务中提交已消费消息的偏移。
PulsarTransactions
发射器还提供了一种方法,可以在事务中对传入的 Pulsar 消息应用恰好一次处理。
以下示例包括事务中的批处理 Pulsar 消息。
mp.messaging.outgoing.tx-out-example.enableTransaction=true
# ...
mp.messaging.incoming.in-channel.enableTransaction=true
mp.messaging.incoming.in-channel.batchReceive=true
package pulsar.outbound;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.apache.pulsar.client.api.Messages;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.pulsar.transactions.PulsarTransactions;
@ApplicationScoped
public class PulsarExactlyOnceProcessor {
@Inject
@Channel("tx-out-example")
PulsarTransactions<Integer> txProducer;
@Incoming("in-channel")
public Uni<Void> emitInTransaction(Message<Messages<Integer>> batch) {
return txProducer.withTransactionAndAck(batch, emitter -> {
for (org.apache.pulsar.client.api.Message<Integer> record : batch.getPayload()) {
emitter.send(PulsarMessage.of(record.getValue() + 1, record.getKey()));
}
return Uni.createFrom().voidItem();
});
}
}
如果处理成功完成,则消息在事务中确认,并提交事务。
在使用恰好一次处理时,只能单独确认消息,而不能累积确认消息。
如果需要中止处理,则消息被拒绝。可以采用一种故障策略来重试处理或简单地停止故障。请注意,如果事务失败并被中止,则从 withTransaction
返回的 Uni
将产生故障。
应用程序可以选择处理错误情况,但为了使消息消耗继续进行,从 @Incoming
方法返回的 Uni
不能导致故障。PulsarTransactions#withTransactionAndAck
方法将确认和拒绝消息,但不会停止反应式流。忽略故障只是将消费者重置为上次提交的偏移,并从那里恢复处理。
为了避免故障情况下出现重复项,建议在代理端启用消息重复消除和批次索引级别确认:
quarkus.pulsar.devservices.broker-config.brokerDeduplicationEnabled=true
quarkus.pulsar.devservices.broker-config.brokerDeduplicationEntriesInterval=1000
quarkus.pulsar.devservices.broker-config.brokerDeduplicationSnapshotIntervalSeconds=3000
quarkus.pulsar.devservices.broker-config.acknowledgmentAtBatchIndexLevelEnabled=3000
mp.messaging.incoming.data.batchIndexAckEnabled=true
Pulsar Schema Configuration & Auto Schema Discovery
Pulsar 消息以非结构化字节数组形式连同有效负载存储。Pulsar schema 定义了如何将结构化数据序列化到原始消息字节中。schema 应用于生产者和消费者,以使用强制数据结构来写入和读取。在将数据发布到主题之前将其序列化为原始字节,并在将原始字节传递给消费者之前对其进行反序列化。
Pulsar 使用模式注册表作为一个存储已注册模式信息的中央存储库,通过代理,它使生产者/消费者能够协调主题消息的模式。默认情况下,Apache BookKeeper 用于存储模式。
Pulsar API 为许多 primitive types和 complex types提供内置架构信息,例如 Key/Value、Avro 和 Protobuf。
Pulsar 连接器允许使用 `schema`属性指定架构作为原始类型:
mp.messaging.incoming.prices.connector=smallrye-pulsar
mp.messaging.incoming.prices.schema=INT32
mp.messaging.outgoing.prices-out.connector=smallrye-pulsar
mp.messaging.outgoing.prices-out.schema=DOUBLE
如果 `schema`属性的值与 Schema Type匹配,将会使用该类型创建一个简单的架构,并用于该频道。
Pulsar 连接器允许通过 CDI 提供 `Schema`bean,并用 `@Identifier`限定符标识,来配置复杂的架构类型。
例如,以下 bean 提供 JSON 架构和 Key/Value 架构:
package pulsar.configuration;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Produces;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import io.smallrye.common.annotation.Identifier;
@ApplicationScoped
public class PulsarSchemaProvider {
@Produces
@Identifier("user-schema")
Schema<User> userSchema = Schema.JSON(User.class);
@Produces
@Identifier("a-channel")
Schema<KeyValue<Integer, User>> keyValueSchema() {
return Schema.KeyValue(Schema.INT32, Schema.JSON(User.class), KeyValueEncodingType.SEPARATED);
}
public static class User {
String name;
int age;
}
}
要为定义好的架构配置入站频道`users`,你需要将 `schema`属性设置为架构 `user-schema`的标识符:
mp.messaging.incoming.users.connector=smallrye-pulsar
mp.messaging.incoming.users.schema=user-schema
如果找不到 `schema`属性,连接器会查找用频道名称标识的 `Schema`bean。例如,出站频道 `a-channel`将使用 Key/Value 架构。
mp.messaging.outgoing.a-channel.connector=smallrye-pulsar
如果没有提供架构信息,入站频道将使用 Schema.AUTO_CONSUME()
,而连接器将使用 `Schema.AUTO_PRODUCE_BYTES()`架构。
Auto Schema Discovery
当使用 Quarkus 消息传递 Pulsar (io.quarkus:quarkus-messaging-pulsar
) 时,Quarkus 经常会自动检测配置正确的 Pulsar 架构。此自动检测基于 @Incoming`和 `@Outgoing`方法的声明,以及注入的 `@Channel
。
例如,如果您声明
@Outgoing("generated-price")
public Multi<Integer> generate() {
...
}
你的配置表明 generated-price`频道使用 `smallrye-pulsar`连接器时,Quarkus 会自动将 `generated-price`频道中 `schema`的属性设置为 Pulsar 架构 `INT32
。
类似地,如果您声明
@Incoming("my-pulsar-consumer")
public void consume(org.apache.pulsar.api.client.Message<byte[]> record) {
...
}
你的配置表明 `my-pulsar-consumer`频道使用 `smallrye-pulsar`连接器时,Quarkus 会自动将 `schema`的属性设置为 Pulsar `BYTES`架构。
最后,如果您声明
@Inject
@Channel("price-create")
Emitter<Double> priceEmitter;
你的配置表明 `price-create`频道使用 `smallrye-pulsar`连接器时,Quarkus 会自动将 `schema`设置为 Pulsar `INT64`架构。
Pulsar 架构自动检测支持的类型全套集是:
-
short
andjava.lang.Short
-
int
andjava.lang.Integer
-
long
andjava.lang.Long
-
float
andjava.lang.Float
-
double
andjava.lang.Double
-
byte[]
-
java.time.Instant
-
java.sql.Timestamp
-
java.time.LocalDate
-
java.time.LocalTime
-
java.time.LocalDateTime
-
java.nio.ByteBuffer
-
从 Avro 架构生成的类,以及 Avro
GenericRecord
,将使用 `AVRO`架构类型进行配置 -
从 Protobuf 架构生成的类,将使用 `PROTOBUF`架构类型进行配置
-
其他类将使用 `JSON`架构类型进行自动配置
请注意,`JSON`架构类型强制进行架构验证。 |
除那些 Pulsar 提供的架构外,Quarkus 还提供以下架构实现 without enforcing validation:
-
`io.vertx.core.buffer.Buffer`将使用`io.quarkus.pulsar.schema.BufferSchema`架构进行配置
-
io.vertx.core.json.JsonObject
将使用io.quarkus.pulsar.schema.JsonObjectSchema
架构进行配置 -
io.vertx.core.json.JsonArray
将使用io.quarkus.pulsar.schema.JsonArraySchema
架构进行配置 -
对于无架构的 Json 序列化,如果
schema
配置设置为ObjectMapper<fully_qualified_name_of_the_bean>
,Schema 将使用 JacksonObjectMapper
生成,而不强制实施 Pulsar Schema 验证。io.quarkus.pulsar.schema.ObjectMapperSchema
可用于显式配置 JSON 架构而不进行验证。
如果通过配置设置了 schema
,它将不会被自动检测替换。
如果您在序列化器自动检测方面遇到了任何问题,可以通过设置 quarkus.messaging.pulsar.serializer-autodetection.enabled=false
完全关闭它。如果您发现您需要这样做,请在 Quarkus issue tracker 中提交一个漏洞,以便我们修复您遇到的任何问题。
Dev Services for Pulsar
借助 Quarkus Messaging Pulsar 扩展(quarkus-messaging-pulsar
),针对 Pulsar 的 Dev Services 会在开发模式和运行测试时自动启动一个 Pulsar 代理。因此,你无需手动启动代理。应用程序会自动配置。
Enabling / Disabling Dev Services for Pulsar
除非满足以下条件,否则会自动启用针对 Pulsar 的 Dev Services:
-
将
quarkus.pulsar.devservices.enabled
设置为false
-
the
pulsar.client.serviceUrl
is configured -
所有的 Reactive Messaging Pulsar 频道都有
serviceUrl
属性设置
用于 Pulsar 的 Dev Services 依赖于 Docker 来启动代理。如果您的环境不支持 Docker,您将需要手动启动代理或连接到已运行的代理。您可以使用 pulsar.client.
配置代理地址。
Shared broker
大多数情况下,您需要在应用程序之间共享代理。用于 Pulsar 的 Dev Services 为在 dev 模式下运行您的多个 Quarkus 应用程序实施了一个 service discovery 机制,以共享一个代理。
用于 Pulsar 的 Dev Services 使用 |
如果您需要多个(共享)代理,您可以配置 quarkus.pulsar.devservices.service-name
属性并指出代理名称。它会寻找具有相同值的一个容器,或者如果找不到容器,则会启动一个新容器。默认服务名称是 pulsar
。
在 dev 模式下默认启用共享,但在测试模式下禁用共享。您可以使用 quarkus.pulsar.devservices.shared=false
禁用共享。
Setting the port
默认情况下,用于 Pulsar 的 Dev Services 会选择一个随机端口并配置应用程序。您可以通过配置 quarkus.pulsar.devservices.port
属性来设置端口。
请注意,Pulsar 通告地址会使用所选端口自动配置。
Configuring the image
用于 Pulsar 的 Dev Services 支持 official Apache Pulsar image。
可以配置自定义镜像名称,如下所示:
quarkus.pulsar.devservices.image-name=datastax/lunastreaming-all:2.10_4.7
Configuring the Pulsar broker
您可以使用自定义代理配置配置用于 Pulsar 的 Dev Services。
下面的示例启用事务支持:
quarkus.pulsar.devservices.broker-config.transaction-coordinator-enabled=true
quarkus.pulsar.devservices.broker-config.system-topic-enabled=true
Configuring Pulsar clients
Pulsar 客户端、使用者和生成器可以高度定制,以配置 Pulsar 客户端应用程序的行为。
Pulsar 连接器会分别为每个通道创建 Pulsar 客户端和使用者或生成器,每个都采用合理的默认值以简化其配置。尽管创建过程已得到处理,但仍然可以通过 Pulsar 通道来配置所有可用的配置选项。
虽然创建 PulsarClient
、PulsarConsumer
或 PulsarProducer
的习惯方法是通过构建器 API,但其实质是,这些 API 每次都构建一个配置对象,以传递给实现。那些是 ClientConfigurationData、 ConsumerConfigurationData和 ProducerConfigurationData。
Pulsar Connector 允许直接对此类配置对象接收属性。例如,用于 PulsarClient
的代理身份验证信息是使用 authPluginClassName
和 authParams
属性接收的。为了配置传入通道 data
的身份验证:
mp.messaging.incoming.data.connector=smallrye-pulsar
mp.messaging.incoming.data.serviceUrl=pulsar://localhost:6650
mp.messaging.incoming.data.topic=topic
mp.messaging.incoming.data.subscriptionInitialPosition=Earliest
mp.messaging.incoming.data.schema=INT32
mp.messaging.incoming.data.authPluginClassName=org.apache.pulsar.client.impl.auth.AuthenticationBasic
mp.messaging.incoming.data.authParams={"userId":"superuser","password":"admin"}
请注意,Pulsar 使用者属性 subscriptionInitialPosition
也已使用 enum 值 SubscriptionInitialPosition.Earliest
标识的 Earliest
值进行配置。
这种方法涵盖了大多数配置情况。但是,诸如 CryptoKeyReader
、ServiceUrlProvider
等非序列化对象无法通过这种方式进行配置。Pulsar Connector 允许考虑 Pulsar 配置数据对象的实例 –ClientConfigurationData
、ConsumerConfigurationData
、ProducerConfigurationData
:
import jakarta.enterprise.inject.Produces;
import io.smallrye.common.annotation.Identifier;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
class PulsarConfig {
@Produces
@Identifier("my-consumer-options")
public ConsumerConfigurationData<String> getConsumerConfig() {
ConsumerConfigurationData<String> data = new ConsumerConfigurationData<>();
data.setAckReceiptEnabled(true);
data.setCryptoKeyReader(DefaultCryptoKeyReader.builder()
//...
.build());
return data;
}
}
检索此实例并使用它来配置连接器使用的客户端。您需要使用 client-configuration
、consumer-configuration
或 producer-configuration
属性指示客户端的名称:
mp.messaging.incoming.prices.consumer-configuration=my-consumer-options
如果没有配置 [client|consumer|producer]-configuration
,连接器将查找带有通道名称的标识的实例:
import jakarta.enterprise.inject.Produces;
import io.smallrye.common.annotation.Identifier;
import org.apache.pulsar.client.impl.AutoClusterFailover;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
class PulsarConfig {
@Produces
@Identifier("prices")
public ClientConfigurationData getClientConfig() {
ClientConfigurationData data = new ClientConfigurationData();
data.setEnableTransaction(true);
data.setServiceUrlProvider(AutoClusterFailover.builder()
// ...
.build());
return data;
}
}
您还可以提供包含按键排列的配置值的 Map<String, Object>
:
import jakarta.enterprise.inject.Produces;
import io.smallrye.common.annotation.Identifier;
import org.apache.pulsar.client.api.BatcherBuilder;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.customroute.PartialRoundRobinMessageRouterImpl;
import java.util.Map;
class PulsarConfig {
@Produces
@Identifier("prices")
public Map<String, Object> getProducerConfig() {
return Map.of(
"batcherBuilder", BatcherBuilder.KEY_BASED,
"sendTimeoutMs", 3000,
"customMessageRouter", new PartialRoundRobinMessageRouterImpl(4));
}
}
不同的配置源按从最低到最高重要性的顺序加载,如下所示:
-
使用默认配置标识符
default-pulsar-client
、default-pulsar-consumer
、default-pulsar-producer
生成的Map<String, Object>
配置地图。 -
在配置或通道名称中使用标识符生成的
Map<String, Object>
配置地图 -
在通道配置或通道名称中使用标识符生成的
[Client|Producer|Consuemr]ConfigurationData
对象 -
使用
[Client|Producer|Consuemr]ConfigurationData
字段名称命名的通道配置属性。
有关配置选项的详尽列表,请参阅 Configuration Reference。
Configuring Pulsar Authentication
Pulsar 提供了一个可插入式验证框架,Pulsar 代理/代理使用这个机制来验证客户端。
客户端可以在 application.properties
文件中使用 authPluginClassName
和 authParams
属性进行配置:
pulsar.client.serviceUrl=pulsar://pulsar:6650
pulsar.client.authPluginClassName=org.apache.pulsar.client.impl.auth.AuthenticationBasic
pulsar.client.authParams={"userId":"superuser","password":"admin"}
或者以编程方式:
import java.util.Map;
import jakarta.enterprise.inject.Produces;
import io.smallrye.common.annotation.Identifier;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.auth.AuthenticationBasic;
class PulsarConfig {
@Produces
@Identifier("prices")
public ClientConfigurationData config() {
var data = new ClientConfigurationData();
var auth = new AuthenticationBasic();
auth.configure(Map.of("userId", "superuser", "password", "admin"));
data.setAuthentication(auth);
return data;
}
}
Configuring access to Datastax Luna Streaming
Luna Streaming 是 Apache Pulsar 的一个生产准备发布版本,带有 DataStax 的工具和支持。在创建 DataStax Luna Pulsar 租户后,请注意自动生成的令牌,并配置令牌验证:
pulsar.client.serviceUrl=pulsar+ssl://pulsar-aws-eucentral1.streaming.datastax.com:6651
pulsar.client.authPluginClassName=org.apache.pulsar.client.impl.auth.AuthenticationToken
pulsar.client.authParams=token:eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJpYXQiOjE2ODY4MTc4MzQsImlzcyI6ImRhdGFzdGF4Iiwic3ViIjoiY2xpZW50OzA3NGZhOTI4LThiODktNDBhNC04MDEzLWNlNjVkN2JmZWIwZTtjSEpwWTJWejsyMDI5ODdlOGUyIiwidG9rZW5pZCI6IjIwMjk4N2U4ZTIifQ....
确保事先创建主题或在命名空间配置中启用 Auto Topic Creation。
请注意,主题配置需要引用主题的全名:
mp.messaging.incoming.prices.topic=persistent://my-tenant/default/prices
Configuring access to StreamNative Cloud
StreamNative Cloud 是一个完全托管的 Pulsar-as-a-Service,可用于不同的部署选项,无论它是完全托管的、在由 StreamNative 管理的公共云中托管的或在 Kubernetes 上自托管的。
StreamNative Pulsar 集群使用 Oauth2 身份验证,因此您需要确保存在 service account,其中包含应用程序正在使用的必需的 permissions to the Pulsar namespace/topic。
接下来,您需要下载服务帐户的 Key file(充当 private key)并记下群集的 issuer URL(通常为 https://auth.streamnative.cloud/
)和 audience(例如 urn:sn:pulsar:o-rf3ol:redhat
)。Admin 部分中的 Pulsar Clients 页面 StreamNative Cloud 控制台可帮助您完成此过程。
使用 Pulsar Oauth2 验证配置您的应用程序:
pulsar.tenant=public
pulsar.namespace=default
pulsar.client.serviceUrl=pulsar+ssl://quarkus-71eaadbf-a6f3-4355-85d2-faf436b23d86.aws-euc1-prod-snci-pool-slug.streamnative.aws.snio.cloud:6651
pulsar.client.authPluginClassName=org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
pulsar.client.authParams={"type":"client_credentials","privateKey":"data:application/json;base64,<base64-encoded value>","issuerUrl":"https://auth.streamnative.cloud/","audience":"urn:sn:pulsar:o-rfwel:redhat"}
请注意,pulsar.client.authParams
配置包含一个包含 issuerUrl
、audience
和 privateKey
的 Json 字符串,该字符串采用 data:application/json;base64,<base64-encoded-key-file>
格式。
或者,您可以以编程方式配置身份验证:
package org.acme.pulsar;
import java.net.MalformedURLException;
import java.net.URL;
import org.apache.pulsar.client.impl.auth.oauth2.AuthenticationFactoryOAuth2;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import io.smallrye.common.annotation.Identifier;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Produces;
@ApplicationScoped
public class PulsarAuth {
@ConfigProperty(name = "pulsar.issuerUrl")
String issuerUrl;
@ConfigProperty(name = "pulsar.credentials")
String credentials;
@ConfigProperty(name = "pulsar.audience")
String audience;
@Produces
@Identifier("pulsar-auth")
public ClientConfigurationData pulsarClientConfig() throws MalformedURLException {
var data = new ClientConfigurationData();
data.setAuthentication(AuthenticationFactoryOAuth2.clientCredentials(new URL(issuerUrl), PulsarAuth.class.getResource(credentials), audience));
return data;
}
}
这假设密钥文件作为资源包含在应用程序类路径中,那么配置就像下面这样:
mp.messaging.incoming.prices.client-configuration=pulsar-auth
pulsar.tenant=public
pulsar.namespace=default
pulsar.client.serviceUrl=pulsar+ssl://quarkus-71eaadbf-a6f3-4355-85d2-faf436b23d86.aws-euc1-prod-snci-pool-slug.streamnative.aws.snio.cloud:6651
pulsar.issuerUrl=https://auth.streamnative.cloud/
pulsar.audience=urn:sn:pulsar:o-rfwel:redhat
pulsar.credentials=/o-rfwel-quarkus-app.json
请注意,使用 pulsar-auth
标识的客户端配置的通道需要设置 client-configuration
属性。
Health Checks
Quarkus 扩展报告由 Pulsar 连接器管理的每个通道的启动、准备和活动状态。运行状况检查依赖 Pulsar 客户端来验证是否已与代理建立连接。
Startup 和 Readiness 探针用于入站和出站通道均报告 OK,当时与代理的连接已建立。
用于入站和出站通道的 Liveness 探针报告 OK,当时已与代理建立连接,并且尚未捕获到任何故障。
请注意,消息处理在 nacks 中失败,该消息将由失败策略处理。失败策略负责报告故障并影响活动性检查的结果。fail
失败策略将报告故障,因此活动性检查也将报告故障。
Configuration Reference
以下是 Pulsar 连接器通道、使用者、生产者和客户端的配置属性列表。有关如何配置 Pulsar 客户端的更多信息,请参见 Pulsar Client Configuration。
Incoming channel configuration (receiving from Pulsar)
使用以下内容配置下列属性:
mp.messaging.incoming.your-channel-name.attribute=value
Unresolved directive in pulsar.adoc - include::{includes}/smallrye-pulsar-incoming.adoc[]
还可以配置底层 Pulsar 使用者支持的属性。
这些属性还可以使用 pulsar.consumer
前缀进行全局配置:
pulsar.consumer.subscriptionInitialPosition=Earliest
Unresolved directive in pulsar.adoc - include::{includes}/smallrye-pulsar-consumer.adoc[]
Outgoing channel configuration (publishing to Pulsar)
Unresolved directive in pulsar.adoc - include::{includes}/smallrye-pulsar-outgoing.adoc[]
还可以配置底层 Pulsar 生产者支持的属性。
这些属性还可以使用 pulsar.producer
前缀进行全局配置:
pulsar.producer.batchingEnabled=false
Unresolved directive in pulsar.adoc - include::{includes}/smallrye-pulsar-producer.adoc[]
Pulsar Client Configuration
以下是底层 `PulsarClient`的配置参考。可以使用通道属性配置这些选项:
mp.messaging.incoming.your-channel-name.numIoThreads=4
或使用 pulsar.client
前缀进行全局配置:
pulsar.client.serviceUrl=pulsar://pulsar:6650
Unresolved directive in pulsar.adoc - include::{includes}/smallrye-pulsar-client.adoc[]
在配置文件中不可配置的配置属性(不可序列化)在列 `Config file`中标明。
Going further
本指南展示了你可以使用 Quarkus 与 Pulsar 进行交互的方式。它利用 Quarkus Messaging 构建数据流应用程序。
如果您想进一步了解,请查看 SmallRye Reactive Messaging的文档,这是Quarkus中使用的实现。