|
|
|
@ -1,5 +1,5 @@
|
|
|
|
|
/*
|
|
|
|
|
* Copyright 2012-2022 the original author or authors.
|
|
|
|
|
* Copyright 2012-2023 the original author or authors.
|
|
|
|
|
*
|
|
|
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
|
* you may not use this file except in compliance with the License.
|
|
|
|
@ -24,6 +24,7 @@ import org.springframework.context.annotation.Bean;
|
|
|
|
|
import org.springframework.context.annotation.Configuration;
|
|
|
|
|
import org.springframework.kafka.annotation.EnableKafka;
|
|
|
|
|
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
|
|
|
|
|
import org.springframework.kafka.config.ContainerCustomizer;
|
|
|
|
|
import org.springframework.kafka.config.KafkaListenerConfigUtils;
|
|
|
|
|
import org.springframework.kafka.core.ConsumerFactory;
|
|
|
|
|
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
|
|
|
|
@ -31,6 +32,7 @@ import org.springframework.kafka.core.KafkaTemplate;
|
|
|
|
|
import org.springframework.kafka.listener.AfterRollbackProcessor;
|
|
|
|
|
import org.springframework.kafka.listener.BatchInterceptor;
|
|
|
|
|
import org.springframework.kafka.listener.CommonErrorHandler;
|
|
|
|
|
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
|
|
|
|
|
import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
|
|
|
|
|
import org.springframework.kafka.listener.RecordInterceptor;
|
|
|
|
|
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
|
|
|
|
@ -121,10 +123,12 @@ class KafkaAnnotationDrivenConfiguration {
|
|
|
|
|
@ConditionalOnMissingBean(name = "kafkaListenerContainerFactory")
|
|
|
|
|
ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
|
|
|
|
|
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
|
|
|
|
|
ObjectProvider<ConsumerFactory<Object, Object>> kafkaConsumerFactory) {
|
|
|
|
|
ObjectProvider<ConsumerFactory<Object, Object>> kafkaConsumerFactory,
|
|
|
|
|
ObjectProvider<ContainerCustomizer<Object, Object, ConcurrentMessageListenerContainer<Object, Object>>> kafkaContainerCustomizer) {
|
|
|
|
|
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
|
|
|
|
|
configurer.configure(factory, kafkaConsumerFactory
|
|
|
|
|
.getIfAvailable(() -> new DefaultKafkaConsumerFactory<>(this.properties.buildConsumerProperties())));
|
|
|
|
|
kafkaContainerCustomizer.ifAvailable(factory::setContainerCustomizer);
|
|
|
|
|
return factory;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|