Merge pull request #22973 from anshlykov

* pr/22973:
  Polish "Consider RecordFilterStrategy in Kafka auto-configuration"
  Consider RecordFilterStrategy in Kafka auto-configuration

Closes gh-22973
pull/22412/head
Stephane Nicoll 4 years ago
commit 5985c58d3d

@ -1,5 +1,5 @@
/*
* Copyright 2012-2019 the original author or authors.
* Copyright 2012-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -29,6 +29,7 @@ import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.ErrorHandler;
import org.springframework.kafka.listener.RecordInterceptor;
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
import org.springframework.kafka.support.converter.MessageConverter;
import org.springframework.kafka.transaction.KafkaAwareTransactionManager;
@ -45,6 +46,8 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {
private MessageConverter messageConverter;
private RecordFilterStrategy<Object, Object> recordFilterStrategy;
private KafkaTemplate<Object, Object> replyTemplate;
private KafkaAwareTransactionManager<Object, Object> transactionManager;
@ -75,6 +78,14 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {
this.messageConverter = messageConverter;
}
/**
* Set the {@link RecordFilterStrategy} to use to filter incoming records.
* @param recordFilterStrategy the record filter strategy
*/
void setRecordFilterStrategy(RecordFilterStrategy<Object, Object> recordFilterStrategy) {
this.recordFilterStrategy = recordFilterStrategy;
}
/**
* Set the {@link KafkaTemplate} to use to send replies.
* @param replyTemplate the reply template
@ -151,6 +162,7 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {
Listener properties = this.properties.getListener();
map.from(properties::getConcurrency).to(factory::setConcurrency);
map.from(this.messageConverter).to(factory::setMessageConverter);
map.from(this.recordFilterStrategy).to(factory::setRecordFilterStrategy);
map.from(this.replyTemplate).to(factory::setReplyTemplate);
if (properties.getType().equals(Listener.Type.BATCH)) {
factory.setBatchListener(true);

@ -33,6 +33,7 @@ import org.springframework.kafka.listener.BatchErrorHandler;
import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
import org.springframework.kafka.listener.ErrorHandler;
import org.springframework.kafka.listener.RecordInterceptor;
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
import org.springframework.kafka.support.converter.BatchMessageConverter;
import org.springframework.kafka.support.converter.BatchMessagingMessageConverter;
import org.springframework.kafka.support.converter.MessageConverter;
@ -53,6 +54,8 @@ class KafkaAnnotationDrivenConfiguration {
private final RecordMessageConverter messageConverter;
private final RecordFilterStrategy<Object, Object> recordFilterStrategy;
private final BatchMessageConverter batchMessageConverter;
private final KafkaTemplate<Object, Object> kafkaTemplate;
@ -71,6 +74,7 @@ class KafkaAnnotationDrivenConfiguration {
KafkaAnnotationDrivenConfiguration(KafkaProperties properties,
ObjectProvider<RecordMessageConverter> messageConverter,
ObjectProvider<RecordFilterStrategy<Object, Object>> recordFilterStrategy,
ObjectProvider<BatchMessageConverter> batchMessageConverter,
ObjectProvider<KafkaTemplate<Object, Object>> kafkaTemplate,
ObjectProvider<KafkaAwareTransactionManager<Object, Object>> kafkaTransactionManager,
@ -80,6 +84,7 @@ class KafkaAnnotationDrivenConfiguration {
ObjectProvider<RecordInterceptor<Object, Object>> recordInterceptor) {
this.properties = properties;
this.messageConverter = messageConverter.getIfUnique();
this.recordFilterStrategy = recordFilterStrategy.getIfUnique();
this.batchMessageConverter = batchMessageConverter
.getIfUnique(() -> new BatchMessagingMessageConverter(this.messageConverter));
this.kafkaTemplate = kafkaTemplate.getIfUnique();
@ -99,6 +104,7 @@ class KafkaAnnotationDrivenConfiguration {
MessageConverter messageConverterToUse = (this.properties.getListener().getType().equals(Type.BATCH))
? this.batchMessageConverter : this.messageConverter;
configurer.setMessageConverter(messageConverterToUse);
configurer.setRecordFilterStrategy(this.recordFilterStrategy);
configurer.setReplyTemplate(this.kafkaTemplate);
configurer.setTransactionManager(this.transactionManager);
configurer.setRebalanceListener(this.rebalanceListener);

@ -62,6 +62,7 @@ import org.springframework.kafka.listener.ContainerProperties.AckMode;
import org.springframework.kafka.listener.RecordInterceptor;
import org.springframework.kafka.listener.SeekToCurrentBatchErrorHandler;
import org.springframework.kafka.listener.SeekToCurrentErrorHandler;
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer;
import org.springframework.kafka.support.converter.BatchMessageConverter;
import org.springframework.kafka.support.converter.BatchMessagingMessageConverter;
@ -464,6 +465,25 @@ class KafkaAutoConfigurationTests {
});
}
@Test
void testConcurrentKafkaListenerContainerFactoryWithDefaultRecordFilterStrategy() {
this.contextRunner.run((context) -> {
ConcurrentKafkaListenerContainerFactory<?, ?> factory = context
.getBean(ConcurrentKafkaListenerContainerFactory.class);
assertThat(factory).hasFieldOrPropertyWithValue("recordFilterStrategy", null);
});
}
@Test
void testConcurrentKafkaListenerContainerFactoryWithCustomRecordFilterStrategy() {
this.contextRunner.withUserConfiguration(RecordFilterStrategyConfiguration.class).run((context) -> {
ConcurrentKafkaListenerContainerFactory<?, ?> factory = context
.getBean(ConcurrentKafkaListenerContainerFactory.class);
assertThat(factory).hasFieldOrPropertyWithValue("recordFilterStrategy",
context.getBean("recordFilterStrategy"));
});
}
@Test
void testConcurrentKafkaListenerContainerFactoryWithCustomErrorHandler() {
this.contextRunner.withUserConfiguration(ErrorHandlerConfiguration.class).run((context) -> {
@ -608,6 +628,16 @@ class KafkaAutoConfigurationTests {
}
@Configuration(proxyBeanMethods = false)
static class RecordFilterStrategyConfiguration {
@Bean
RecordFilterStrategy<Object, Object> recordFilterStrategy() {
return (record) -> false;
}
}
@Configuration(proxyBeanMethods = false)
static class ErrorHandlerConfiguration {

@ -5725,7 +5725,7 @@ The following component creates a listener endpoint on the `someTopic` topic:
----
If a `KafkaTransactionManager` bean is defined, it is automatically associated to the container factory.
Similarly, if a `ErrorHandler`, `AfterRollbackProcessor` or `ConsumerAwareRebalanceListener` bean is defined, it is automatically associated to the default factory.
Similarly, if a `RecordFilterStrategy`, `ErrorHandler`, `AfterRollbackProcessor` or `ConsumerAwareRebalanceListener` bean is defined, it is automatically associated to the default factory.
Depending on the listener type, a `RecordMessageConverter` or `BatchMessageConverter` bean is associated to the default factory.
If only a `RecordMessageConverter` bean is present for a batch listener, it is wrapped in a `BatchMessageConverter`.

Loading…
Cancel
Save