diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java index e1f104b000..861e501ebe 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2018 the original author or authors. + * Copyright 2012-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,7 +26,7 @@ import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.listener.AfterRollbackProcessor; import org.springframework.kafka.listener.ContainerProperties; import org.springframework.kafka.listener.ErrorHandler; -import org.springframework.kafka.support.converter.RecordMessageConverter; +import org.springframework.kafka.support.converter.MessageConverter; import org.springframework.kafka.transaction.KafkaAwareTransactionManager; /** @@ -40,7 +40,7 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer { private KafkaProperties properties; - private RecordMessageConverter messageConverter; + private MessageConverter messageConverter; private KafkaTemplate replyTemplate; @@ -59,10 +59,10 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer { } /** - * Set the {@link RecordMessageConverter} to use. + * Set the {@link MessageConverter} to use. * @param messageConverter the message converter */ - void setMessageConverter(RecordMessageConverter messageConverter) { + void setMessageConverter(MessageConverter messageConverter) { this.messageConverter = messageConverter; } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java index e6d4af48bc..00fb536643 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2018 the original author or authors. + * Copyright 2012-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,6 +19,7 @@ package org.springframework.boot.autoconfigure.kafka; import org.springframework.beans.factory.ObjectProvider; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Listener.Type; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; @@ -28,6 +29,9 @@ import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.listener.AfterRollbackProcessor; import org.springframework.kafka.listener.ErrorHandler; +import org.springframework.kafka.support.converter.BatchMessageConverter; +import org.springframework.kafka.support.converter.BatchMessagingMessageConverter; +import org.springframework.kafka.support.converter.MessageConverter; import org.springframework.kafka.support.converter.RecordMessageConverter; import org.springframework.kafka.transaction.KafkaAwareTransactionManager; @@ -46,6 +50,8 @@ class KafkaAnnotationDrivenConfiguration { private final RecordMessageConverter messageConverter; + private final BatchMessageConverter batchMessageConverter; + private final KafkaTemplate kafkaTemplate; private final KafkaAwareTransactionManager transactionManager; @@ -56,12 +62,15 @@ class KafkaAnnotationDrivenConfiguration { KafkaAnnotationDrivenConfiguration(KafkaProperties properties, ObjectProvider messageConverter, + ObjectProvider batchMessageConverter, ObjectProvider> kafkaTemplate, ObjectProvider> kafkaTransactionManager, ObjectProvider errorHandler, ObjectProvider> afterRollbackProcessor) { this.properties = properties; this.messageConverter = messageConverter.getIfUnique(); + this.batchMessageConverter = batchMessageConverter.getIfUnique( + () -> new BatchMessagingMessageConverter(this.messageConverter)); this.kafkaTemplate = kafkaTemplate.getIfUnique(); this.transactionManager = kafkaTransactionManager.getIfUnique(); this.errorHandler = errorHandler.getIfUnique(); @@ -73,7 +82,9 @@ class KafkaAnnotationDrivenConfiguration { public ConcurrentKafkaListenerContainerFactoryConfigurer kafkaListenerContainerFactoryConfigurer() { ConcurrentKafkaListenerContainerFactoryConfigurer configurer = new ConcurrentKafkaListenerContainerFactoryConfigurer(); configurer.setKafkaProperties(this.properties); - configurer.setMessageConverter(this.messageConverter); + MessageConverter messageConverterToUse = (this.properties.getListener().getType() + .equals(Type.BATCH)) ? this.batchMessageConverter : this.messageConverter; + configurer.setMessageConverter(messageConverterToUse); configurer.setReplyTemplate(this.kafkaTemplate); configurer.setTransactionManager(this.transactionManager); configurer.setErrorHandler(this.errorHandler); diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java index 2332a6b255..53dcd1f0c8 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2018 the original author or authors. + * Copyright 2012-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -56,6 +56,8 @@ import org.springframework.kafka.listener.ContainerProperties; import org.springframework.kafka.listener.ContainerProperties.AckMode; import org.springframework.kafka.listener.SeekToCurrentErrorHandler; import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer; +import org.springframework.kafka.support.converter.BatchMessageConverter; +import org.springframework.kafka.support.converter.BatchMessagingMessageConverter; import org.springframework.kafka.support.converter.MessagingMessageConverter; import org.springframework.kafka.support.converter.RecordMessageConverter; import org.springframework.kafka.test.utils.KafkaTestUtils; @@ -503,7 +505,7 @@ public class KafkaAutoConfigurationTests { } @Test - public void testConcurrentKafkaListenerContainerFactoryWithCustomMessageConverters() { + public void testConcurrentKafkaListenerContainerFactoryWithCustomMessageConverter() { this.contextRunner.withUserConfiguration(MessageConverterConfiguration.class) .run((context) -> { ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory = context @@ -513,6 +515,51 @@ public class KafkaAutoConfigurationTests { }); } + @Test + public void testConcurrentKafkaListenerContainerFactoryInBatchModeWithCustomMessageConverter() { + this.contextRunner + .withUserConfiguration(BatchMessageConverterConfiguration.class, + MessageConverterConfiguration.class) + .withPropertyValues("spring.kafka.listener.type=batch").run((context) -> { + ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory = context + .getBean(ConcurrentKafkaListenerContainerFactory.class); + assertThat(kafkaListenerContainerFactory).hasFieldOrPropertyWithValue( + "messageConverter", + context.getBean("myBatchMessageConverter")); + }); + } + + @Test + public void testConcurrentKafkaListenerContainerFactoryInBatchModeWrapsCustomMessageConverter() { + this.contextRunner.withUserConfiguration(MessageConverterConfiguration.class) + .withPropertyValues("spring.kafka.listener.type=batch").run((context) -> { + ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory = context + .getBean(ConcurrentKafkaListenerContainerFactory.class); + Object messageConverter = ReflectionTestUtils + .getField(kafkaListenerContainerFactory, "messageConverter"); + assertThat(messageConverter) + .isInstanceOf(BatchMessagingMessageConverter.class); + assertThat(((BatchMessageConverter) messageConverter) + .getRecordMessageConverter()) + .isSameAs(context.getBean("myMessageConverter")); + }); + } + + @Test + public void testConcurrentKafkaListenerContainerFactoryInBatchModeWithNoMessageConverter() { + this.contextRunner.withPropertyValues("spring.kafka.listener.type=batch") + .run((context) -> { + ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory = context + .getBean(ConcurrentKafkaListenerContainerFactory.class); + Object messageConverter = ReflectionTestUtils + .getField(kafkaListenerContainerFactory, "messageConverter"); + assertThat(messageConverter) + .isInstanceOf(BatchMessagingMessageConverter.class); + assertThat(((BatchMessageConverter) messageConverter) + .getRecordMessageConverter()).isNull(); + }); + } + @Test public void testConcurrentKafkaListenerContainerFactoryWithCustomErrorHandler() { this.contextRunner.withUserConfiguration(ErrorHandlerConfiguration.class) @@ -583,6 +630,16 @@ public class KafkaAutoConfigurationTests { } + @Configuration + protected static class BatchMessageConverterConfiguration { + + @Bean + public BatchMessageConverter myBatchMessageConverter() { + return mock(BatchMessageConverter.class); + } + + } + @Configuration protected static class ErrorHandlerConfiguration { diff --git a/spring-boot-project/spring-boot-docs/src/main/asciidoc/spring-boot-features.adoc b/spring-boot-project/spring-boot-docs/src/main/asciidoc/spring-boot-features.adoc index 4d85bdf27b..6b52940e44 100644 --- a/spring-boot-project/spring-boot-docs/src/main/asciidoc/spring-boot-features.adoc +++ b/spring-boot-project/spring-boot-docs/src/main/asciidoc/spring-boot-features.adoc @@ -5880,9 +5880,12 @@ The following component creates a listener endpoint on the `someTopic` topic: ---- If a `KafkaTransactionManager` bean is defined, it is automatically associated to the -container factory. Similarly, if a `RecordMessageConverter`, `ErrorHandler` or -`AfterRollbackProcessor` bean is defined, it is automatically associated to the default -factory. +container factory. Similarly, if a `ErrorHandler` or `AfterRollbackProcessor` bean is +defined, it is automatically associated to the default factory. + +Depending on the listener type, a `RecordMessageConverter` or `BatchMessageConverter` bean +is associated to the default factory. If only a `RecordMessageConverter` bean is present +for a batch listener, it is wrapped in a `BatchMessageConverter`. TIP: A custom `ChainedKafkaTransactionManager` must be marked `@Primary` as it usually references the auto-configured `KafkaTransactionManager` bean.