Polish Kafka transaction support property

Closes gh-11076
pull/11075/merge
Phillip Webb 7 years ago
parent 6d396b973f
commit 4c29c35cbb

@ -96,11 +96,12 @@ public class KafkaAutoConfiguration {
@Bean
@ConditionalOnMissingBean(ProducerFactory.class)
public ProducerFactory<?, ?> kafkaProducerFactory() {
DefaultKafkaProducerFactory<Object, Object> factory = new DefaultKafkaProducerFactory<>(
DefaultKafkaProducerFactory<?, ?> factory = new DefaultKafkaProducerFactory<>(
this.properties.buildProducerProperties());
KafkaProperties.Producer producer = this.properties.getProducer();
if (producer.getTransactionIdPrefix() != null) {
factory.setTransactionIdPrefix(producer.getTransactionIdPrefix());
String transactionIdPrefix = this.properties.getProducer()
.getTransactionIdPrefix();
if (transactionIdPrefix != null) {
factory.setTransactionIdPrefix(transactionIdPrefix);
}
return factory;
}
@ -108,7 +109,8 @@ public class KafkaAutoConfiguration {
@Bean
@ConditionalOnProperty(name = "spring.kafka.producer.transaction-id-prefix")
@ConditionalOnMissingBean
public KafkaTransactionManager<?, ?> kafkaTransactionManager(ProducerFactory<?, ?> producerFactory) {
public KafkaTransactionManager<?, ?> kafkaTransactionManager(
ProducerFactory<?, ?> producerFactory) {
return new KafkaTransactionManager<>(producerFactory);
}

@ -521,7 +521,7 @@ public class KafkaProperties {
private Integer retries;
/**
* When non empty, enables transactional support for producer.
* When non empty, enables transaction support for producer.
*/
private String transactionIdPrefix;

@ -200,8 +200,7 @@ public class KafkaAutoConfigurationTests {
assertThat(
context.getBeansOfType(KafkaJaasLoginModuleInitializer.class))
.isEmpty();
assertThat(
context.getBeansOfType(KafkaTransactionManager.class))
assertThat(context.getBeansOfType(KafkaTransactionManager.class))
.isEmpty();
assertThat(configs.get("foo.bar.baz")).isEqualTo("qux.fiz.buz");
assertThat(configs.get("fiz.buz")).isEqualTo("fix.fox");
@ -303,8 +302,7 @@ public class KafkaAutoConfigurationTests {
assertThat(dfa.getPropertyValue("loginModule")).isEqualTo("foo");
assertThat(dfa.getPropertyValue("controlFlag")).isEqualTo(
AppConfigurationEntry.LoginModuleControlFlag.REQUISITE);
assertThat(
context.getBeansOfType(KafkaTransactionManager.class))
assertThat(context.getBeansOfType(KafkaTransactionManager.class))
.hasSize(1);
assertThat(((Map<String, String>) dfa.getPropertyValue("options")))
.containsExactly(entry("useKeyTab", "true"));

@ -998,6 +998,7 @@ content into your application; rather pick only the properties that you need.
spring.kafka.producer.ssl.keystore-password= # Store password for the key store file.
spring.kafka.producer.ssl.truststore-location= # Location of the trust store file.
spring.kafka.producer.ssl.truststore-password= # Store password for the trust store file.
spring.kafka.producer.transaction-id-prefix= # When non empty, enables transaction support for producer.
spring.kafka.producer.value-serializer= # Serializer class for values.
spring.kafka.properties.*= # Additional properties, common to producers and consumers, used to configure the client.
spring.kafka.ssl.key-password= # Password of the private key in the key store file.

Loading…
Cancel
Save