diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java index 47b12837bd..d32643b6ff 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java @@ -25,6 +25,7 @@ import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.listener.AfterRollbackProcessor; import org.springframework.kafka.listener.BatchErrorHandler; +import org.springframework.kafka.listener.CommonErrorHandler; import org.springframework.kafka.listener.ConsumerAwareRebalanceListener; import org.springframework.kafka.listener.ContainerProperties; import org.springframework.kafka.listener.ErrorHandler; @@ -58,6 +59,8 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer { private BatchErrorHandler batchErrorHandler; + private CommonErrorHandler commonErrorHandler; + private AfterRollbackProcessor afterRollbackProcessor; private RecordInterceptor recordInterceptor; @@ -127,6 +130,15 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer { this.batchErrorHandler = batchErrorHandler; } + /** + * Set the {@link CommonErrorHandler} to use. + * @param commonErrorHandler the error handler. + * @since 2.6 + */ + public void setCommonErrorHandler(CommonErrorHandler commonErrorHandler) { + this.commonErrorHandler = commonErrorHandler; + } + /** * Set the {@link AfterRollbackProcessor} to use. * @param afterRollbackProcessor the after rollback processor @@ -171,6 +183,7 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer { else { factory.setErrorHandler(this.errorHandler); } + map.from(this.commonErrorHandler).to(factory::setCommonErrorHandler); map.from(this.afterRollbackProcessor).to(factory::setAfterRollbackProcessor); map.from(this.recordInterceptor).to(factory::setRecordInterceptor); } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java index e11a6b1386..edbe5cc097 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java @@ -30,6 +30,7 @@ import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.listener.AfterRollbackProcessor; import org.springframework.kafka.listener.BatchErrorHandler; +import org.springframework.kafka.listener.CommonErrorHandler; import org.springframework.kafka.listener.ConsumerAwareRebalanceListener; import org.springframework.kafka.listener.ErrorHandler; import org.springframework.kafka.listener.RecordInterceptor; @@ -68,6 +69,8 @@ class KafkaAnnotationDrivenConfiguration { private final BatchErrorHandler batchErrorHandler; + private final CommonErrorHandler commonErrorHandler; + private final AfterRollbackProcessor afterRollbackProcessor; private final RecordInterceptor recordInterceptor; @@ -79,7 +82,7 @@ class KafkaAnnotationDrivenConfiguration { ObjectProvider> kafkaTemplate, ObjectProvider> kafkaTransactionManager, ObjectProvider rebalanceListener, ObjectProvider errorHandler, - ObjectProvider batchErrorHandler, + ObjectProvider batchErrorHandler, ObjectProvider commonErrorHandler, ObjectProvider> afterRollbackProcessor, ObjectProvider> recordInterceptor) { this.properties = properties; @@ -92,6 +95,7 @@ class KafkaAnnotationDrivenConfiguration { this.rebalanceListener = rebalanceListener.getIfUnique(); this.errorHandler = errorHandler.getIfUnique(); this.batchErrorHandler = batchErrorHandler.getIfUnique(); + this.commonErrorHandler = commonErrorHandler.getIfUnique(); this.afterRollbackProcessor = afterRollbackProcessor.getIfUnique(); this.recordInterceptor = recordInterceptor.getIfUnique(); } @@ -110,6 +114,7 @@ class KafkaAnnotationDrivenConfiguration { configurer.setRebalanceListener(this.rebalanceListener); configurer.setErrorHandler(this.errorHandler); configurer.setBatchErrorHandler(this.batchErrorHandler); + configurer.setCommonErrorHandler(this.commonErrorHandler); configurer.setAfterRollbackProcessor(this.afterRollbackProcessor); configurer.setRecordInterceptor(this.recordInterceptor); return configurer; diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java index 4c6d842854..320a53ea09 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java @@ -58,6 +58,7 @@ import org.springframework.kafka.core.KafkaAdmin; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.listener.AfterRollbackProcessor; import org.springframework.kafka.listener.BatchErrorHandler; +import org.springframework.kafka.listener.CommonErrorHandler; import org.springframework.kafka.listener.ConsumerAwareRebalanceListener; import org.springframework.kafka.listener.ContainerProperties; import org.springframework.kafka.listener.ContainerProperties.AckMode; @@ -546,6 +547,17 @@ class KafkaAutoConfigurationTests { }); } + @Test + void testConcurrentKafkaListenerContainerFactoryWithCustomCommonErrorHandler() { + this.contextRunner.withBean("errorHandler", CommonErrorHandler.class, () -> mock(CommonErrorHandler.class)) + .run((context) -> { + ConcurrentKafkaListenerContainerFactory factory = context + .getBean(ConcurrentKafkaListenerContainerFactory.class); + assertThat(factory).hasFieldOrPropertyWithValue("commonErrorHandler", + context.getBean("errorHandler")); + }); + } + @Test void testConcurrentKafkaListenerContainerFactoryWithDefaultTransactionManager() { this.contextRunner.withPropertyValues("spring.kafka.producer.transaction-id-prefix=test").run((context) -> {