Merge pull request #32951 from ThomasKasene

* pr/32951:
  Auto-configure BatchInterceptor on ConcurrentKafkaListenerContainerFactory

Closes gh-32951
pull/34049/head
Moritz Halbritter 2 years ago
commit 3882656d1d

@ -24,6 +24,7 @@ import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.AfterRollbackProcessor;
import org.springframework.kafka.listener.BatchInterceptor;
import org.springframework.kafka.listener.CommonErrorHandler;
import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
import org.springframework.kafka.listener.ContainerProperties;
@ -37,6 +38,7 @@ import org.springframework.kafka.transaction.KafkaAwareTransactionManager;
*
* @author Gary Russell
* @author Eddú Meléndez
* @author Thomas Kåsene
* @since 1.5.0
*/
public class ConcurrentKafkaListenerContainerFactoryConfigurer {
@ -59,6 +61,8 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {
private RecordInterceptor<Object, Object> recordInterceptor;
private BatchInterceptor<Object, Object> batchInterceptor;
/**
* Set the {@link KafkaProperties} to use.
* @param properties the properties
@ -133,6 +137,14 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {
this.recordInterceptor = recordInterceptor;
}
/**
* Set the {@link BatchInterceptor} to use.
* @param batchInterceptor the batch interceptor.
*/
void setBatchInterceptor(BatchInterceptor<Object, Object> batchInterceptor) {
this.batchInterceptor = batchInterceptor;
}
/**
* Configure the specified Kafka listener container factory. The factory can be
* further tuned and default settings can be overridden.
@ -161,6 +173,7 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {
map.from(this.commonErrorHandler).to(factory::setCommonErrorHandler);
map.from(this.afterRollbackProcessor).to(factory::setAfterRollbackProcessor);
map.from(this.recordInterceptor).to(factory::setRecordInterceptor);
map.from(this.batchInterceptor).to(factory::setBatchInterceptor);
}
private void configureContainer(ContainerProperties container) {

@ -29,6 +29,7 @@ import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.AfterRollbackProcessor;
import org.springframework.kafka.listener.BatchInterceptor;
import org.springframework.kafka.listener.CommonErrorHandler;
import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
import org.springframework.kafka.listener.RecordInterceptor;
@ -44,6 +45,7 @@ import org.springframework.kafka.transaction.KafkaAwareTransactionManager;
*
* @author Gary Russell
* @author Eddú Meléndez
* @author Thomas Kåsene
*/
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(EnableKafka.class)
@ -69,6 +71,8 @@ class KafkaAnnotationDrivenConfiguration {
private final RecordInterceptor<Object, Object> recordInterceptor;
private final BatchInterceptor<Object, Object> batchInterceptor;
KafkaAnnotationDrivenConfiguration(KafkaProperties properties,
ObjectProvider<RecordMessageConverter> messageConverter,
ObjectProvider<RecordFilterStrategy<Object, Object>> recordFilterStrategy,
@ -78,7 +82,8 @@ class KafkaAnnotationDrivenConfiguration {
ObjectProvider<ConsumerAwareRebalanceListener> rebalanceListener,
ObjectProvider<CommonErrorHandler> commonErrorHandler,
ObjectProvider<AfterRollbackProcessor<Object, Object>> afterRollbackProcessor,
ObjectProvider<RecordInterceptor<Object, Object>> recordInterceptor) {
ObjectProvider<RecordInterceptor<Object, Object>> recordInterceptor,
ObjectProvider<BatchInterceptor<Object, Object>> batchInterceptor) {
this.properties = properties;
this.messageConverter = messageConverter.getIfUnique();
this.recordFilterStrategy = recordFilterStrategy.getIfUnique();
@ -90,6 +95,7 @@ class KafkaAnnotationDrivenConfiguration {
this.commonErrorHandler = commonErrorHandler.getIfUnique();
this.afterRollbackProcessor = afterRollbackProcessor.getIfUnique();
this.recordInterceptor = recordInterceptor.getIfUnique();
this.batchInterceptor = batchInterceptor.getIfUnique();
}
@Bean
@ -107,6 +113,7 @@ class KafkaAnnotationDrivenConfiguration {
configurer.setCommonErrorHandler(this.commonErrorHandler);
configurer.setAfterRollbackProcessor(this.afterRollbackProcessor);
configurer.setRecordInterceptor(this.recordInterceptor);
configurer.setBatchInterceptor(this.batchInterceptor);
return configurer;
}

@ -63,6 +63,7 @@ import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.AfterRollbackProcessor;
import org.springframework.kafka.listener.BatchInterceptor;
import org.springframework.kafka.listener.CommonErrorHandler;
import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
import org.springframework.kafka.listener.ContainerProperties;
@ -95,6 +96,7 @@ import static org.mockito.Mockito.never;
* @author Eddú Meléndez
* @author Nakul Mishra
* @author Tomaz Fernandes
* @author Thomas Kåsene
*/
class KafkaAutoConfigurationTests {
@ -645,6 +647,15 @@ class KafkaAutoConfigurationTests {
});
}
@Test
void testConcurrentKafkaListenerContainerFactoryWithCustomBatchInterceptor() {
this.contextRunner.withUserConfiguration(BatchInterceptorConfiguration.class).run((context) -> {
ConcurrentKafkaListenerContainerFactory<?, ?> factory = context
.getBean(ConcurrentKafkaListenerContainerFactory.class);
assertThat(factory).hasFieldOrPropertyWithValue("batchInterceptor", context.getBean("batchInterceptor"));
});
}
@Test
void testConcurrentKafkaListenerContainerFactoryWithCustomRebalanceListener() {
this.contextRunner.withUserConfiguration(RebalanceListenerConfiguration.class).run((context) -> {
@ -764,6 +775,16 @@ class KafkaAutoConfigurationTests {
}
@Configuration(proxyBeanMethods = false)
static class BatchInterceptorConfiguration {
@Bean
BatchInterceptor<Object, Object> batchInterceptor() {
return (batch, consumer) -> batch;
}
}
@Configuration(proxyBeanMethods = false)
static class RebalanceListenerConfiguration {

Loading…
Cancel
Save