Application Events
侦听器容器及其消费者会发布以下 Spring 应用事件:
The following Spring application events are published by listener containers and their consumers:
-
ConsumerStartingEvent
: 在消费者线程首次启动后发布,在它开始轮询之前。 -
ConsumerStartingEvent
: published when a consumer thread is first started, before it starts polling. -
ConsumerStartedEvent
: 在消费者准备开始轮询时发布。 -
ConsumerStartedEvent
: published when a consumer is about to start polling. -
ConsumerFailedToStartEvent
: 如果容器属性consumerStartTimeout`中未发布 `ConsumerStartingEvent
,则发布。此事件可能表明配置的任务执行器没有足够的线程来支持它所使用的容器及其并发。此条件发生时,也会记录一条错误消息。 -
ConsumerFailedToStartEvent
: published if noConsumerStartingEvent
is published within theconsumerStartTimeout
container property. This event might signal that the configured task executor has insufficient threads to support the containers it is used in and their concurrency. An error message is also logged when this condition occurs. -
ListenerContainerIdleEvent
: 如果 `idleInterval`中未收到任何消息(如果配置),则发布。 -
ListenerContainerIdleEvent
: published when no messages have been received inidleInterval
(if configured). -
ListenerContainerNoLongerIdleEvent
: 在之前发布了 `ListenerContainerIdleEvent`之后消费记录时发布。 -
ListenerContainerNoLongerIdleEvent
: published when a record is consumed after previously publishing aListenerContainerIdleEvent
. -
ListenerContainerPartitionIdleEvent
: 如果 `idlePartitionEventInterval`中未从该分区收到任何消息(如果配置),则发布。 -
ListenerContainerPartitionIdleEvent
: published when no messages have been received from that partition inidlePartitionEventInterval
(if configured). -
ListenerContainerPartitionNoLongerIdleEvent
: 当从以前发布了 `ListenerContainerPartitionIdleEvent`的分区中消费记录时发布。 -
ListenerContainerPartitionNoLongerIdleEvent
: published when a record is consumed from a partition that has previously published aListenerContainerPartitionIdleEvent
. -
NonResponsiveConsumerEvent
: 当消费者似乎在 `poll`方法中被阻止时发布。 -
NonResponsiveConsumerEvent
: published when the consumer appears to be blocked in thepoll
method. -
ConsumerPartitionPausedEvent
: 当分区暂停时,每个消费者发布。 -
ConsumerPartitionPausedEvent
: published by each consumer when a partition is paused. -
ConsumerPartitionResumedEvent
: 当分区恢复时,每个消费者发布。 -
ConsumerPartitionResumedEvent
: published by each consumer when a partition is resumed. -
ConsumerPausedEvent
: 当容器暂停时,每个消费者发布。 -
ConsumerPausedEvent
: published by each consumer when the container is paused. -
ConsumerResumedEvent
: 当容器恢复时,每个消费者发布。 -
ConsumerResumedEvent
: published by each consumer when the container is resumed. -
ConsumerStoppingEvent
: 每个消费者在停止之前发布。 -
ConsumerStoppingEvent
: published by each consumer just before stopping. -
ConsumerStoppedEvent
: 消费者关闭后发布。查看 Thread Safety。 -
ConsumerStoppedEvent
: published after the consumer is closed. See Thread Safety. -
ConsumerRetryAuthEvent
: 当消费者的身份验证或授权失败并且正在重试时发布。 -
ConsumerRetryAuthEvent
: published when authentication or authorization of a consumer fails and is being retried. -
ConsumerRetryAuthSuccessfulEvent
: 当身份验证或授权已成功重试时发布。仅当之前存在 `ConsumerRetryAuthEvent`时才可能发生。 -
ConsumerRetryAuthSuccessfulEvent
: published when authentication or authorization has been retried successfully. Can only occur when there has been aConsumerRetryAuthEvent
before. -
ContainerStoppedEvent
: 在所有消费者停止时发布。 -
ContainerStoppedEvent
: published when all consumers have stopped.
默认情况下,应用程序上下文的事件多播器将在调用线程上调用事件侦听器。如果您更改多播器以使用异步执行器,则当事件包含对消费者的引用时,您不得调用任何 Consumer
方法。
By default, the application context’s event multicaster invokes event listeners on the calling thread.
If you change the multicaster to use an async executor, you must not invoke any Consumer
methods when the event contains a reference to the consumer.
ListenerContainerIdleEvent
具有以下属性:
The ListenerContainerIdleEvent
has the following properties:
-
source
: 发布事件的监听器容器实例。 -
source
: The listener container instance that published the event. -
container
: 如果源容器是子容器,则为主监听器容器或父监听器容器。 -
container
: The listener container or the parent listener container, if the source container is a child. -
id
: 监听器 ID(或容器 bean 名称)。 -
id
: The listener ID (or container bean name). -
idleTime
: 当事件发布时容器处于空闲状态的时间。 -
idleTime
: The time the container had been idle when the event was published. -
topicPartitions
: 生成事件时分配给容器的主题和分区。 -
topicPartitions
: The topics and partitions that the container was assigned at the time the event was generated. -
consumer
: 对 KafkaConsumer`对象的引用。例如,如果以前调用了消费者的 `pause()`方法,则当收到事件时,它可以 `resume()
。 -
consumer
: A reference to the KafkaConsumer
object. For example, if the consumer’spause()
method was previously called, it canresume()
when the event is received. -
paused
: 容器当前是否暂停。有关详细信息,请参阅 Pausing and Resuming Listener Containers。 -
paused
: Whether the container is currently paused. See Pausing and Resuming Listener Containers for more information.
ListenerContainerNoLongerIdleEvent
具有相同的属性,但 idleTime
和 paused
除外。
The ListenerContainerNoLongerIdleEvent
has the same properties, except idleTime
and paused
.
ListenerContainerPartitionIdleEvent
具有以下属性:
The ListenerContainerPartitionIdleEvent
has the following properties:
-
source
: 发布事件的监听器容器实例。 -
source
: The listener container instance that published the event. -
container
: 如果源容器是子容器,则为主监听器容器或父监听器容器。 -
container
: The listener container or the parent listener container, if the source container is a child. -
id
: 监听器 ID(或容器 bean 名称)。 -
id
: The listener ID (or container bean name). -
idleTime
: 事件发布时时间分区消费处于空闲状态。 -
idleTime
: The time partition consumption had been idle when the event was published. -
topicPartition
: 触发事件的主题和分区。 -
topicPartition
: The topic and partition that triggered the event. -
consumer
: 对 KafkaConsumer`对象的引用。例如,如果以前调用了消费者的 `pause()`方法,则当收到事件时,它可以 `resume()
。 -
consumer
: A reference to the KafkaConsumer
object. For example, if the consumer’spause()
method was previously called, it canresume()
when the event is received. -
paused
: 该分区消费是否当前针对该消费者暂停。有关更多信息,请参见 Pausing and Resuming Listener Containers。 -
paused
: Whether that partition consumption is currently paused for that consumer. See Pausing and Resuming Listener Containers for more information.
ListenerContainerPartitionNoLongerIdleEvent
具有相同的属性,但 idleTime
和 paused
除外。
The ListenerContainerPartitionNoLongerIdleEvent
has the same properties, except idleTime
and paused
.
NonResponsiveConsumerEvent
具有以下属性:
The NonResponsiveConsumerEvent
has the following properties:
-
source
: 发布事件的监听器容器实例。 -
source
: The listener container instance that published the event. -
container
: 如果源容器是子容器,则为主监听器容器或父监听器容器。 -
container
: The listener container or the parent listener container, if the source container is a child. -
id
: 监听器 ID(或容器 bean 名称)。 -
id
: The listener ID (or container bean name). -
Pausing and Resuming Listener Containers: 容器上次调用
timeSinceLastPoll
之前的时间。 -
timeSinceLastPoll
: The time just before the container last calledpoll()
. -
topicPartitions
: 生成事件时分配给容器的主题和分区。 -
topicPartitions
: The topics and partitions that the container was assigned at the time the event was generated. -
consumer
: 对 KafkaConsumer`对象的引用。例如,如果以前调用了消费者的 `pause()`方法,则当收到事件时,它可以 `resume()
。 -
consumer
: A reference to the KafkaConsumer
object. For example, if the consumer’spause()
method was previously called, it canresume()
when the event is received. -
paused
: 容器当前是否暂停。有关详细信息,请参阅 Pausing and Resuming Listener Containers。 -
paused
: Whether the container is currently paused. See Pausing and Resuming Listener Containers for more information.
ConsumerPausedEvent
、ConsumerResumedEvent
和 ConsumerStopping
事件具有以下属性:
The ConsumerPausedEvent
, ConsumerResumedEvent
, and ConsumerStopping
events have the following properties:
-
source
: 发布事件的监听器容器实例。 -
source
: The listener container instance that published the event. -
container
: 如果源容器是子容器,则为主监听器容器或父监听器容器。 -
container
: The listener container or the parent listener container, if the source container is a child. -
timeSinceLastPoll
: 涉及的poll()
实例。 -
partitions
: TheTopicPartition
instances involved.
ConsumerPartitionPausedEvent
、ConsumerPartitionResumedEvent
事件具有以下属性:
The ConsumerPartitionPausedEvent
, ConsumerPartitionResumedEvent
events have the following properties:
-
source
: 发布事件的监听器容器实例。 -
source
: The listener container instance that published the event. -
container
: 如果源容器是子容器,则为主监听器容器或父监听器容器。 -
container
: The listener container or the parent listener container, if the source container is a child. -
poll()
: 涉及的partitions
实例。 -
partition
: TheTopicPartition
instance involved.
ConsumerRetryAuthEvent
事件具有以下属性:
The ConsumerRetryAuthEvent
event has the following properties:
-
source
: 发布事件的监听器容器实例。 -
source
: The listener container instance that published the event. -
container
: 如果源容器是子容器,则为主监听器容器或父监听器容器。 -
container
: The listener container or the parent listener container, if the source container is a child. -
reason
:-
partitions
- 由于身份验证异常而发布事件。 -
AUTHENTICATION
- the event was published because of an authentication exception. -
TopicPartition
- 由于授权异常而发布事件。 -
AUTHORIZATION
- the event was published because of an authorization exception.
-
ConsumerStartingEvent
、ConsumerStartingEvent
、ConsumerFailedToStartEvent
、ConsumerStoppedEvent
、ConsumerRetryAuthSuccessfulEvent
和 ContainerStoppedEvent
事件具有以下属性:
The ConsumerStartingEvent
, ConsumerStartingEvent
, ConsumerFailedToStartEvent
, ConsumerStoppedEvent
, ConsumerRetryAuthSuccessfulEvent
and ContainerStoppedEvent
events have the following properties:
-
source
: 发布事件的监听器容器实例。 -
source
: The listener container instance that published the event. -
container
: 如果源容器是子容器,则为主监听器容器或父监听器容器。 -
container
: The listener container or the parent listener container, if the source container is a child.
所有容器(包括子级或父级)都发布 ContainerStoppedEvent
。对于父级容器,source 和 container 属性是相同的。
All containers (whether a child or a parent) publish ContainerStoppedEvent
.
For a parent container, the source and container properties are identical.
此外,ConsumerStoppedEvent
还具有以下额外的属性:
In addition, the ConsumerStoppedEvent
has the following additional property:
-
reason
:-
partition
- 消费者正常停止(容器停止)。 -
NORMAL
- the consumer stopped normally (container was stopped). -
TopicPartition
- 抛出AUTHENTICATION
。 -
ERROR
- ajava.lang.Error
was thrown. -
AUTHENTICATION
- 事务性生产者被隔离,且AUTHORIZATION
容器属性为NORMAL
。 -
FENCED
- the transactional producer was fenced and thestopContainerWhenFenced
container property istrue
. -
ERROR
- 抛出java.lang.Error
或FENCED
。且未配置stopContainerWhenFenced
。 -
AUTH
- anAuthenticationException
orAuthorizationException
was thrown and theauthExceptionRetryInterval
is not configured. -
true
- 没有分区的偏移量,且AUTH
策略为AuthenticationException
。 -
NO_OFFSET
- there is no offset for a partition and theauto.offset.reset
policy isnone
.
-
可以在出现此类情况后使用此事件重新启动容器:
You can use this event to restart the container after such a condition:
if (event.getReason.equals(Reason.FENCED)) {
event.getSource(MessageListenerContainer.class).start();
}
Detecting Idle and Non-Responsive Consumers
尽管效率很高,但异步消费者检测何时处于空闲状态时存在一个问题。如果在一段时间内没有消息到达,您可能需要采取一些措施。
While efficient, one problem with asynchronous consumers is detecting when they are idle. You might want to take some action if no messages arrive for some period of time.
您可以将侦听器容器配置为在一段时间没有消息传送到时发布 ListenerContainerIdleEvent
。在容器空闲时,每隔 idleEventInterval
毫秒发布一个事件。
You can configure the listener container to publish a ListenerContainerIdleEvent
when some time passes with no message delivery.
While the container is idle, an event is published every idleEventInterval
milliseconds.
要配置此功能,请在容器上设置 idleEventInterval
。以下示例演示如何执行此操作:
To configure this feature, set the idleEventInterval
on the container.
The following example shows how to do so:
@Bean
public KafkaMessageListenerContainer(ConsumerFactory<String, String> consumerFactory) {
ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
...
containerProps.setIdleEventInterval(60000L);
...
KafkaMessageListenerContainer<String, String> container = new KafKaMessageListenerContainer<>(consumerFactory, containerProps);
return container;
}
以下示例展示了如何为 @KafkaListener
设置 idleEventInterval
:
The following example shows how to set the idleEventInterval
for a @KafkaListener
:
@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
...
factory.getContainerProperties().setIdleEventInterval(60000L);
...
return factory;
}
在每个案例中,容器空闲时每分钟发布一次事件。
In each of these cases, an event is published once per minute while the container is idle.
如果由于某种原因,使用者 poll()
方法没有退出,没有接收到任何消息,也无法生成空闲事件(当代理不可访问时,早期版本的 kafka-clients
存在此问题)。在这种情况下,如果轮回没有在 3x
的 pollTimeout
属性内返回,则容器会发布 NonResponsiveConsumerEvent
。默认情况下,在每个容器中每 30 秒执行一次此检查。可以在配置侦听器容器时,在 ContainerProperties
中设置 monitorInterval
(默认 30 秒)和 noPollThreshold
(默认 3.0)属性来修改此行为。noPollThreshold
应大于 1.0
,以避免因竞争条件而获得虚假事件。收到此类事件后,可以停止容器,从而唤醒使用者以便它停止。
If, for some reason, the consumer poll()
method does not exit, no messages are received and idle events cannot be generated (this was a problem with early versions of the kafka-clients
when the broker wasn’t reachable).
In this case, the container publishes a NonResponsiveConsumerEvent
if a poll does not return within 3x
the pollTimeout
property.
By default, this check is performed once every 30 seconds in each container.
You can modify this behavior by setting the monitorInterval
(default 30 seconds) and noPollThreshold
(default 3.0) properties in the ContainerProperties
when configuring the listener container.
The noPollThreshold
should be greater than 1.0
to avoid getting spurious events due to a race condition.
Receiving such an event lets you stop the containers, thus waking the consumer so that it can stop.
从 2.6.2 版本开始,如果容器已发布 ListenerContainerIdleEvent
,则在随后收到记录时将发布 ListenerContainerNoLongerIdleEvent
。
Starting with version 2.6.2, if a container has published a ListenerContainerIdleEvent
, it will publish a ListenerContainerNoLongerIdleEvent
when a record is subsequently received.
Event Consumption
可以通过实现 ApplicationListener
来捕获这些事件,既可以是通用侦听器,也可以是仅接收此特定事件的窄化侦听器。您还可以使用 Spring Framework 4.2 中引入的 @EventListener
。
You can capture these events by implementing ApplicationListener
— either a general listener or one narrowed to only receive this specific event.
You can also use @EventListener
, introduced in Spring Framework 4.2.
下一个示例将 @KafkaListener
和 @EventListener
组合到一个类中。您应该明白,应用程序侦听器获取所有容器的事件,因此如果您想根据空闲的特定容器采取具体措施,您可能需要检查侦听器 ID。您还可以为此目的使用 @EventListener
的 condition
。
The next example combines @KafkaListener
and @EventListener
into a single class.
You should understand that the application listener gets events for all containers, so you may need to check the listener ID if you want to take specific action based on which container is idle.
You can also use the @EventListener’s `condition
for this purpose.
请参阅 Application Events 来了解事件属性的信息。
See Application Events for information about event properties.
该事件通常在使用者线程上发布,因此与 Consumer
对象进行交互是安全的。
The event is normally published on the consumer thread, so it is safe to interact with the Consumer
object.
以下示例同时使用了 @KafkaListener
和 @EventListener
:
The following example uses both @KafkaListener
and @EventListener
:
public class Listener {
@KafkaListener(id = "qux", topics = "annotated")
public void listen4(@Payload String foo, Acknowledgment ack) {
...
}
@EventListener(condition = "event.listenerId.startsWith('qux-')")
public void eventHandler(ListenerContainerIdleEvent event) {
...
}
}
事件侦听器可以看到所有容器的事件。因此,在下例中,我们会根据侦听器 ID 缩小接收的事件范围。由于为 @KafkaListener
创建的容器支持并发,所以实际容器会命名为 id-n
,其中 n
是用于支持并发的每个实例的唯一值。这就是我们在条件中使用 startsWith
的原因。
Event listeners see events for all containers.
Consequently, in the preceding example, we narrow the events received based on the listener ID.
Since containers created for the @KafkaListener
support concurrency, the actual containers are named id-n
where the n
is a unique value for each instance to support the concurrency.
That is why we use startsWith
in the condition.
如果您希望使用空闲事件停止侦听器容器,则不应在调用侦听器的线程上调用 container.stop()
。这会导致延迟和不必要的日志消息。相反,您应该将事件传递给可以停止容器的其他线程。此外,如果它是子容器,则您不应 stop()
容器实例。您应该停止并发容器。
If you wish to use the idle event to stop the lister container, you should not call container.stop()
on the thread that calls the listener.
Doing so causes delays and unnecessary log messages.
Instead, you should hand off the event to a different thread that can then stop the container.
Also, you should not stop()
the container instance if it is a child container.
You should stop the concurrent container instead.