Message Listener Containers
KafkaMessageListenerContainer 从单个线程上处理所有主题或分区的全部消息。ConcurrentMessageListenerContainer 则委派给一个或多个 KafkaMessageListenerContainer 实例,以便提供多线程消耗。文章还讨论了记录拦截器、偏移提交、确认和侦听器容器自动启动等其他相关主题。
提供两个 MessageListenerContainer
实现:
Two MessageListenerContainer
implementations are provided:
-
KafkaMessageListenerContainer
-
ConcurrentMessageListenerContainer
KafkaMessageListenerContainer
从单个线程上所有主题或分区接收所有消息。ConcurrentMessageListenerContainer
委托给一个或多个 KafkaMessageListenerContainer
实例,以便提供多线程消耗。
The KafkaMessageListenerContainer
receives all message from all topics or partitions on a single thread.
The ConcurrentMessageListenerContainer
delegates to one or more KafkaMessageListenerContainer
instances to provide multi-threaded consumption.
从版本 2.2.7 开始,您可以向侦听器容器添加 RecordInterceptor
;它将在调用侦听器之前被调用,从而允许检查或修改记录。如果拦截器返回 null,则不会调用侦听器。从版本 2.7 开始,它具有其他在侦听器退出后(正常退出或通过抛出异常)调用的方法。此外,从版本 2.7 开始,现在有一个 BatchInterceptor
,为 Batch Listeners 提供类似的功能。此外,ConsumerAwareRecordInterceptor
(和 BatchInterceptor
)提供对 Consumer<?, ?>
的访问。例如,这可用于在拦截器中访问使用指标。
Starting with version 2.2.7, you can add a RecordInterceptor
to the listener container; it will be invoked before calling the listener allowing inspection or modification of the record.
If the interceptor returns null, the listener is not called.
Starting with version 2.7, it has additional methods which are called after the listener exits (normally, or by throwing an exception).
Also, starting with version 2.7, there is now a BatchInterceptor
, providing similar functionality for Batch Listeners.
In addition, the ConsumerAwareRecordInterceptor
(and BatchInterceptor
) provide access to the Consumer<?, ?>
.
This might be used, for example, to access the consumer metrics in the interceptor.
您不应在这些拦截器中执行任何影响消费者位置和已提交偏移量的方法;容器需要管理此类信息。
You should not execute any methods that affect the consumer’s positions and or committed offsets in these interceptors; the container needs to manage such information.
如果拦截器改变了记录(通过创建一个新的记录),则 topic
、partition
和 offset
必须保持不变以避免意外的副作用,如记录丢失。
If the interceptor mutates the record (by creating a new one), the topic
, partition
, and offset
must remain the same to avoid unexpected side effects such as record loss.
CompositeRecordInterceptor
和 CompositeBatchInterceptor
可以用于调用多个拦截器。
The CompositeRecordInterceptor
and CompositeBatchInterceptor
can be used to invoke multiple interceptors.
默认情况下,从版本 2.8 开始,在使用事务时,会在事务启动之前调用拦截器。可以将侦听器容器的 interceptBeforeTx
属性设置为 false
,以在事务启动之后调用拦截器。从版本 2.9 开始,这将适用于任何事务管理器,而不仅仅是 KafkaAwareTransactionManager
。例如,这允许拦截器参与容器启动的 JDBC 事务。
By default, starting with version 2.8, when using transactions, the interceptor is invoked before the transaction has started.
You can set the listener container’s interceptBeforeTx
property to false
to invoke the interceptor after the transaction has started instead.
Starting with version 2.9, this will apply to any transaction manager, not just `KafkaAwareTransactionManager`s.
This allows, for example, the interceptor to participate in a JDBC transaction started by the container.
从版本 2.3.8、2.4.6 开始,当并发性大于 1 时,ConcurrentMessageListenerContainer
现在支持 Static Membership。group.instance.id
以 -n
为后缀,其中 n
从 1
开始。这与增加的 session.timeout.ms
一起,可用于减少再平衡事件,例如,在重新启动应用程序实例时。
Starting with versions 2.3.8, 2.4.6, the ConcurrentMessageListenerContainer
now supports Static Membership when the concurrency is greater than one.
The group.instance.id
is suffixed with -n
with n
starting at 1
.
This, together with an increased session.timeout.ms
, can be used to reduce rebalance events, for example, when application instances are restarted.
Using KafkaMessageListenerContainer
以下构造函数可用:
The following constructor is available:
public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
ContainerProperties containerProperties)
它接收一个 ConsumerFactory
和有关主题和分区的的信息,以及 ContainerProperties
对象中的其他配置。ContainerProperties
具有以下构造函数:
It receives a ConsumerFactory
and information about topics and partitions, as well as other configuration, in a ContainerProperties
object.
ContainerProperties
has the following constructors:
public ContainerProperties(TopicPartitionOffset... topicPartitions)
public ContainerProperties(String... topics)
public ContainerProperties(Pattern topicPattern)
第一个构造函数使用 TopicPartitionOffset
参数数组来明确地指示容器使用哪些分区(使用消费者 assign()
方法)以及可选的初始偏移量。正值默认情况下是绝对偏移量。负值默认情况下相对分区内的当前最后偏移量。提供了一个带有额外布尔参数的 TopicPartitionOffset
构造函数。如果它为 true
,则初始偏移量(正或负)将相对此消费者的当前位置。在启动容器时应用偏移量。第二个构造函数使用主题数组,并且 Kafka 根据 group.id
属性分配分区,将分区分布到组中。第三个构造函数使用正则表达式 Pattern
来选择主题。
The first constructor takes an array of TopicPartitionOffset
arguments to explicitly instruct the container about which partitions to use (using the consumer assign()
method) and with an optional initial offset.
A positive value is an absolute offset by default.
A negative value is relative to the current last offset within a partition by default.
A constructor for TopicPartitionOffset
that takes an additional boolean
argument is provided.
If this is true
, the initial offsets (positive or negative) are relative to the current position for this consumer.
The offsets are applied when the container is started.
The second takes an array of topics, and Kafka allocates the partitions based on the group.id
property — distributing partitions across the group.
The third uses a regex Pattern
to select the topics.
要向容器分配 MessageListener
,可以在创建容器时使用 ContainerProps.setMessageListener
方法。以下示例显示如何执行此操作:
To assign a MessageListener
to a container, you can use the ContainerProps.setMessageListener
method when creating the Container.
The following example shows how to do so:
ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
containerProps.setMessageListener(new MessageListener<Integer, String>() {
...
});
DefaultKafkaConsumerFactory<Integer, String> cf =
new DefaultKafkaConsumerFactory<>(consumerProps());
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
return container;
请注意,在创建 DefaultKafkaConsumerFactory
时,使用仅在上面获取属性的构造函数意味着键和值 Deserializer
类将从配置中获取。或者,可以为键和/或值将 Deserializer
实例传递给 DefaultKafkaConsumerFactory
构造函数,在这种情况下,所有消费者共享相同的实例。另一种选择是提供 Supplier<Deserializer>
s(从 2.3 版开始),这些 Supplier<Deserializer>
s 将用于为每个 Consumer
获取单独的 Deserializer
实例:
Note that when creating a DefaultKafkaConsumerFactory
, using the constructor that just takes in the properties as above means that key and value Deserializer
classes are picked up from configuration.
Alternatively, Deserializer
instances may be passed to the DefaultKafkaConsumerFactory
constructor for key and/or value, in which case all Consumers share the same instances.
Another option is to provide Supplier<Deserializer>
s (starting with version 2.3) that will be used to obtain separate Deserializer
instances for each Consumer
:
DefaultKafkaConsumerFactory<Integer, CustomValue> cf =
new DefaultKafkaConsumerFactory<>(consumerProps(), null, () -> new CustomValueDeserializer());
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
return container;
有关可以设置的各种属性的更多信息,请参阅 Javadoc 中的 ContainerProperties
。
Refer to the Javadoc for ContainerProperties
for more information about the various properties that you can set.
从版本 2.1.1 开始,可以使用名为 logContainerConfig
的新属性。当为 true
并且启用了 INFO
记录时,每个侦听器容器都会写入一个日志消息,总结其配置属性。
Since version 2.1.1, a new property called logContainerConfig
is available.
When true
and INFO
logging is enabled each listener container writes a log message summarizing its configuration properties.
默认情况下,以 DEBUG
记录级别执行主题偏移量提交的记录。从版本 2.1.2 开始,ContainerProperties
中的一个名为 commitLogLevel
的属性允许指定这些消息的日志级别。例如,要将日志级别更改为 INFO
,可以使用 containerProperties.setCommitLogLevel(LogIfLevelEnabled.Level.INFO);
。
By default, logging of topic offset commits is performed at the DEBUG
logging level.
Starting with version 2.1.2, a property in ContainerProperties
called commitLogLevel
lets you specify the log level for these messages.
For example, to change the log level to INFO
, you can use containerProperties.setCommitLogLevel(LogIfLevelEnabled.Level.INFO);
.
从版本 2.2 开始,添加了一个名为 missingTopicsFatal
的新容器属性(自 2.3.4 起的默认值为 false
)。如果代理上不存在任何已配置主题,则此属性将阻止容器启动。如果将容器配置为侦听主题模式(正则表达式),则它不会应用。以前,容器线程在记录许多消息的同时循环于 consumer.poll()
方法内,等待主题出现。除了日志之外,没有表明存在问题。
Starting with version 2.2, a new container property called missingTopicsFatal
has been added (default: false
since 2.3.4).
This prevents the container from starting if any of the configured topics are not present on the broker.
It does not apply if the container is configured to listen to a topic pattern (regex).
Previously, the container threads looped within the consumer.poll()
method waiting for the topic to appear while logging many messages.
Aside from the logs, there was no indication that there was a problem.
从版本 2.8 开始,引入了名为 authExceptionRetryInterval
的新容器属性。这会导致容器在从 KafkaConsumer
获取任何 AuthenticationException
或 AuthorizationException
之后重试获取消息。例如,当配置的用户被拒绝读取某个主题的访问权限或凭据不正确时,就会发生这种情况。定义 authExceptionRetryInterval
允许当授予适当权限时容器恢复。
As of version 2.8, a new container property authExceptionRetryInterval
has been introduced.
This causes the container to retry fetching messages after getting any AuthenticationException
or AuthorizationException
from the KafkaConsumer
.
This can happen when, for example, the configured user is denied access to read a certain topic or credentials are incorrect.
Defining authExceptionRetryInterval
allows the container to recover when proper permissions are granted.
默认情况下,不配置任何时间间隔 - 身份验证和授权错误被视为致命错误,导致容器停止。 |
By default, no interval is configured - authentication and authorization errors are considered fatal, which causes the container to stop. |
从版本 2.8 开始,在创建消费者工厂时,如果将反序列化程序作为对象提供(在构造函数中或通过设置程序提供),则工厂将调用 configure()
方法,使用配置属性对其进行配置。
Starting with version 2.8, when creating the consumer factory, if you provide deserializers as objects (in the constructor or via the setters), the factory will invoke the configure()
method to configure them with the configuration properties.
Using ConcurrentMessageListenerContainer
单个构造器与 KafkaListenerContainer
构造器类似。以下清单显示了构造器的签名:
The single constructor is similar to the KafkaListenerContainer
constructor.
The following listing shows the constructor’s signature:
public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
ContainerProperties containerProperties)
它还具有 concurrency
属性。例如,container.setConcurrency(3)
创建三个 KafkaMessageListenerContainer
实例。
It also has a concurrency
property.
For example, container.setConcurrency(3)
creates three KafkaMessageListenerContainer
instances.
对于第一个构造器,Kafka 使用其组管理功能在消费者之间分配分区。
For the first constructor, Kafka distributes the partitions across the consumers using its group management capabilities.
在监听多个主题时,默认的分区分配可能不是您所期望的。例如,如果您有 3 个每个包含 5 个分区的主题,并且您想使用 concurrency=15
,那么您只会看到 5 个活动消费者,每个消费者从每个主题中分配一个分区,而另有 10 个消费者处于空闲状态。这是因为默认的 Kafka PartitionAssignor
是 RangeAssignor
(请参阅其 Javadoc)。对于此方案,您可能需要考虑改用 RoundRobinAssignor
,它将分区分配给所有消费者。然后,每个消费者被分配一个主题或分区。要更改 PartitionAssignor
,您可以在提供给 DefaultKafkaConsumerFactory
的属性中设置 partition.assignment.strategy
消费者属性 (ConsumerConfigs.PARTITION_ASSIGNMENT_STRATEGY_CONFIG
)。
When listening to multiple topics, the default partition distribution may not be what you expect.
For example, if you have three topics with five partitions each and you want to use concurrency=15
, you see only five active consumers, each assigned one partition from each topic, with the other 10 consumers being idle.
This is because the default Kafka PartitionAssignor
is the RangeAssignor
(see its Javadoc).
For this scenario, you may want to consider using the RoundRobinAssignor
instead, which distributes the partitions across all of the consumers.
Then, each consumer is assigned one topic or partition.
To change the PartitionAssignor
, you can set the partition.assignment.strategy
consumer property (ConsumerConfigs.PARTITION_ASSIGNMENT_STRATEGY_CONFIG
) in the properties provided to the DefaultKafkaConsumerFactory
.
在使用 Spring Boot 时,您可以按如下方式分配策略:
When using Spring Boot, you can assign set the strategy as follows:
spring.kafka.consumer.properties.partition.assignment.strategy=\
org.apache.kafka.clients.consumer.RoundRobinAssignor
当容器属性被配置为 TopicPartitionOffset
时,ConcurrentMessageListenerContainer
将 TopicPartitionOffset
实例分配给委托 KafkaMessageListenerContainer
实例。
When the container properties are configured with TopicPartitionOffset`s, the `ConcurrentMessageListenerContainer
distributes the TopicPartitionOffset
instances across the delegate KafkaMessageListenerContainer
instances.
如果(例如)提供了六个 TopicPartitionOffset
实例,并且 concurrency
为 3
;每个容器获取两个分区。对于五个 TopicPartitionOffset
实例,两个容器获取两个分区,第三个容器获取一个分区。如果 concurrency
大于 TopicPartition
的数量,concurrency
将向下调整,使得每个容器获取一个分区。
If, say, six TopicPartitionOffset
instances are provided and the concurrency
is 3
; each container gets two partitions.
For five TopicPartitionOffset
instances, two containers get two partitions, and the third gets one.
If the concurrency
is greater than the number of TopicPartitions
, the concurrency
is adjusted down such that each container gets one partition.
|
The |
从 1.3 版本开始,MessageListenerContainer
提供对底层 KafkaConsumer
指标的访问。对于 ConcurrentMessageListenerContainer
,metrics()
方法返回所有目标 KafkaMessageListenerContainer
实例的指标。指标被分组到 Map<MetricName, ? extends Metric>
中,其中 client-id
提供给底层 KafkaConsumer
。
Starting with version 1.3, the MessageListenerContainer
provides access to the metrics of the underlying KafkaConsumer
.
In the case of ConcurrentMessageListenerContainer
, the metrics()
method returns the metrics for all the target KafkaMessageListenerContainer
instances.
The metrics are grouped into the Map<MetricName, ? extends Metric>
by the client-id
provided for the underlying KafkaConsumer
.
从 2.3 版本开始,ContainerProperties
提供了一个 idleBetweenPolls
选项,以便监听器容器中的主循环在 KafkaConsumer.poll()
调用之间休眠。实际睡眠间隔是从提供的选项和 max.poll.interval.ms
消费者配置与当前记录批处理时间之间的差异中选择的最小值。
Starting with version 2.3, the ContainerProperties
provides an idleBetweenPolls
option to let the main loop in the listener container to sleep between KafkaConsumer.poll()
calls.
An actual sleep interval is selected as the minimum from the provided option and difference between the max.poll.interval.ms
consumer config and the current records batch processing time.
Committing Offsets
提供了多种提交偏移量选项。如果 enable.auto.commit
消费者属性为 true
,那么 Kafka 根据其配置自动提交偏移量。如果它为 false
,容器支持几种 AckMode
设置(在下一列表中描述)。默认的 AckMode
为 BATCH
。从 2.3 版本开始,除非在配置中明确设置,否则框架将 enable.auto.commit
设置为 false
。以前,如果未设置该属性,则使用 Kafka 默认值 (true
)。
Several options are provided for committing offsets.
If the enable.auto.commit
consumer property is true
, Kafka auto-commits the offsets according to its configuration.
If it is false
, the containers support several AckMode
settings (described in the next list).
The default AckMode
is BATCH
.
Starting with version 2.3, the framework sets enable.auto.commit
to false
unless explicitly set in the configuration.
Previously, the Kafka default (true
) was used if the property was not set.
消费者 poll()
方法返回一个或多个 ConsumerRecords
。MessageListener
被调用一次记录。以下列表描述了容器为每个 AckMode
(当未使用事务时)执行的操作:
The consumer poll()
method returns one or more ConsumerRecords
.
The MessageListener
is called for each record.
The following lists describes the action taken by the container for each AckMode
(when transactions are not being used):
-
RECORD
: Commit the offset when the listener returns after processing the record. -
BATCH
: Commit the offset when all the records returned by thepoll()
have been processed. -
TIME
: Commit the offset when all the records returned by thepoll()
have been processed, as long as theackTime
since the last commit has been exceeded. -
COUNT
: Commit the offset when all the records returned by thepoll()
have been processed, as long asackCount
records have been received since the last commit. -
COUNT_TIME
: Similar toTIME
andCOUNT
, but the commit is performed if either condition istrue
. -
MANUAL
: The message listener is responsible toacknowledge()
theAcknowledgment
. After that, the same semantics asBATCH
are applied. -
MANUAL_IMMEDIATE
: Commit the offset immediately when theAcknowledgment.acknowledge()
method is called by the listener.
在使用 transactions 时,偏移量将发送至事务,并且语义等同于 RECORD
或 BATCH
,具体取决于侦听器类型(记录或批处理)。
When using transactions, the offset(s) are sent to the transaction and the semantics are equivalent to RECORD
or BATCH
, depending on the listener type (record or batch).
|
|
根据 syncCommits
容器属性,消费者中的 commitSync()
或 commitAsync()
方法被使用。默认情况下 syncCommits
为 true
;还请参阅 setSyncCommitTimeout
。请参阅 setCommitCallback
以获取异步提交的结果;默认回调是 LoggingCommitCallback
,它记录错误(以及调试级别的成功)。
Depending on the syncCommits
container property, the commitSync()
or commitAsync()
method on the consumer is used.
syncCommits
is true
by default; also see setSyncCommitTimeout
.
See setCommitCallback
to get the results of asynchronous commits; the default callback is the LoggingCommitCallback
which logs errors (and successes at debug level).
因为监听器容器具有自己的提交偏移量机制,它偏好 Kafka ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
为 false
。从 2.3 版本开始,它会无条件地将它设置为 false,除非在消费者工厂中专门设置或容器的消费者属性覆盖。
Because the listener container has its own mechanism for committing offsets, it prefers the Kafka ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
to be false
.
Starting with version 2.3, it unconditionally sets it to false unless specifically set in the consumer factory or the container’s consumer property overrides.
Acknowledgment
具有以下方法:
The Acknowledgment
has the following method:
public interface Acknowledgment {
void acknowledge();
}
此方法赋予侦听器控制权,决定何时提交偏移量。
This method gives the listener control over when offsets are committed.
从 2.3 版本开始,Acknowledgment
接口有两个额外的方法 nack(long sleep)
和 nack(int index, long sleep)
。第一个与记录侦听器一起使用,第二个与批处理侦听器一起使用。为您的侦听器类型调用错误的方法将抛出一个 IllegalStateException
。
Starting with version 2.3, the Acknowledgment
interface has two additional methods nack(long sleep)
and nack(int index, long sleep)
.
The first one is used with a record listener, the second with a batch listener.
Calling the wrong method for your listener type will throw an IllegalStateException
.
如果您希望使用 |
If you want to commit a partial batch, using |
只能在调用侦听器的消费者线程上调用 nack()
。
nack()
can only be called on the consumer thread that invokes your listener.
使用 Out of Order Commits 时不允许出现 nack()
。
nack()
is not allowed when using Out of Order Commits.
对于记录侦听器,当调用 nack()
时,提交任何待处理的偏移量,丢弃上次轮询中剩下的记录,并在其分区上执行搜索,以便失败的记录和未处理的记录在下次 poll()
上重新发送。可以通过设置 sleep
参数在重新发送前暂停消费者。这与在使用 DefaultErrorHandler
配置容器时抛出异常的功能类似。
With a record listener, when nack()
is called, any pending offsets are committed, the remaining records from the last poll are discarded, and seeks are performed on their partitions so that the failed record and unprocessed records are redelivered on the next poll()
.
The consumer can be paused before redelivery, by setting the sleep
argument.
This is similar functionality to throwing an exception when the container is configured with a DefaultErrorHandler
.
nack()
暂停整个侦听器的指定睡眠时间,包括所有分配的分区。
nack()
pauses the entire listener for the specified sleep duration including all assigned partitions.
在使用批处理侦听器时,您可以在批处理中指定发生故障的索引。当调用 nack()
时,偏移量将提交给索引前的记录,并针对失败和丢弃的记录在其分区上执行搜索,以便它们在下次 poll()
上重新发送。
When using a batch listener, you can specify the index within the batch where the failure occurred.
When nack()
is called, offsets will be committed for records before the index and seeks are performed on the partitions for the failed and discarded records so that they will be redelivered on the next poll()
.
有关更多信息,请参阅 Container Error Handlers。
See Container Error Handlers for more information.
在睡眠期间暂停消费者,以便继续轮询代理以保持消费者处于活动状态。实际的睡眠时间及其解析取决于容器的 pollTimeout
,默认情况下为 5 秒。最短睡眠时间等于 pollTimeout
,所有睡眠时间均为其倍数。对于较短的睡眠时间或为了提高其准确性,请考虑减少容器的 pollTimeout
。
The consumer is paused during the sleep so that we continue to poll the broker to keep the consumer alive.
The actual sleep time, and its resolution, depends on the container’s pollTimeout
which defaults to 5 seconds.
The minimum sleep time is equal to the pollTimeout
and all sleep times will be a multiple of it.
For small sleep times or, to increase its accuracy, consider reducing the container’s pollTimeout
.
从 3.0.10 版本开始,批量侦听器可以在 Acknowledgment
参数上使用 acknowledge(index)
提交批处理中部分的偏移。当调用该方法时,该索引处的记录的偏移量(以及所有前面的记录)将被提交。在执行部分批量提交后调用 acknowledge()
将提交批处理其余部分的偏移量。以下限制适用:
Starting with version 3.0.10, batch listeners can commit the offsets of parts of the batch, using acknowledge(index)
on the Acknowledgment
argument.
When this method is called, the offset of the record at the index (as well as all previous records) will be committed.
Calling acknowledge()
after a partial batch commit is performed will commit the offsets of the remainder of the batch.
The following limitations apply:
-
AckMode.MANUAL_IMMEDIATE
is required -
The method must be called on the listener thread
-
The listener must consume a
List
rather than the rawConsumerRecords
-
The index must be in the range of the list’s elements
-
The index must be larger than that used in a previous call
这些限制已得到强制,该方法将基于违规情况抛出 IllegalArgumentException
或 IllegalStateException
。
These restrictions are enforced and the method will throw an IllegalArgumentException
or IllegalStateException
, depending on the violation.
Listener Container Auto Startup
侦听器容器实现了 SmartLifecycle
,且默认情况下 autoStartup
为 true
。容器在较晚阶段启动(Integer.MAX-VALUE - 100
)。为处理来自侦听器的 data 而实现 SmartLifecycle
的其他组件应在较早阶段启动。- 100
为后面的阶段留出了余地,允许在容器启动之后自动启动组件。
The listener containers implement SmartLifecycle
, and autoStartup
is true
by default.
The containers are started in a late phase (Integer.MAX-VALUE - 100
).
Other components that implement SmartLifecycle
, to handle data from listeners, should be started in an earlier phase.
The - 100
leaves room for later phases to enable components to be auto-started after the containers.