From 54e0a61b425d00ceb220b82dc8abbad121245c10 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edd=C3=BA=20Mel=C3=A9ndez?= Date: Sun, 14 Jun 2020 01:32:12 -0500 Subject: [PATCH] Reinstate metrics for Kafka Streams See gh-21921 --- .../build.gradle | 1 + .../KafkaMetricsAutoConfiguration.java | 16 +++++++++ .../KafkaMetricsAutoConfigurationTests.java | 34 ++++++++++++++++++ ...aStreamsAnnotationDrivenConfiguration.java | 6 +++- .../StreamsBuilderFactoryBeanCustomizer.java | 36 +++++++++++++++++++ 5 files changed, 92 insertions(+), 1 deletion(-) create mode 100644 spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/StreamsBuilderFactoryBeanCustomizer.java diff --git a/spring-boot-project/spring-boot-actuator-autoconfigure/build.gradle b/spring-boot-project/spring-boot-actuator-autoconfigure/build.gradle index eb50381e06..f49bc9fe02 100644 --- a/spring-boot-project/spring-boot-actuator-autoconfigure/build.gradle +++ b/spring-boot-project/spring-boot-actuator-autoconfigure/build.gradle @@ -67,6 +67,7 @@ dependencies { optional("org.apache.activemq:activemq-broker") optional("org.apache.commons:commons-dbcp2") optional("org.apache.kafka:kafka-clients") + optional("org.apache.kafka:kafka-streams") optional("org.apache.tomcat.embed:tomcat-embed-core") optional("org.apache.tomcat.embed:tomcat-embed-el") optional("org.apache.tomcat:tomcat-jdbc") diff --git a/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/java/org/springframework/boot/actuate/autoconfigure/metrics/KafkaMetricsAutoConfiguration.java b/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/java/org/springframework/boot/actuate/autoconfigure/metrics/KafkaMetricsAutoConfiguration.java index fbde6c33ac..d200498fda 100644 --- a/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/java/org/springframework/boot/actuate/autoconfigure/metrics/KafkaMetricsAutoConfiguration.java +++ b/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/java/org/springframework/boot/actuate/autoconfigure/metrics/KafkaMetricsAutoConfiguration.java @@ -18,6 +18,7 @@ package org.springframework.boot.actuate.autoconfigure.metrics; import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.binder.kafka.KafkaClientMetrics; +import io.micrometer.core.instrument.binder.kafka.KafkaStreamsMetrics; import org.springframework.boot.autoconfigure.AutoConfigureAfter; import org.springframework.boot.autoconfigure.AutoConfigureBefore; @@ -26,19 +27,23 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.kafka.DefaultKafkaConsumerFactoryCustomizer; import org.springframework.boot.autoconfigure.kafka.DefaultKafkaProducerFactoryCustomizer; import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration; +import org.springframework.boot.autoconfigure.kafka.StreamsBuilderFactoryBeanCustomizer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.config.StreamsBuilderFactoryBean; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.MicrometerConsumerListener; import org.springframework.kafka.core.MicrometerProducerListener; import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.streams.KafkaStreamsMicrometerListener; /** * Auto-configuration for Kafka metrics. * * @author Andy Wilkinson * @author Stephane Nicoll + * @author Eddú Meléndez * @since 2.1.0 */ @Configuration(proxyBeanMethods = false) @@ -66,4 +71,15 @@ public class KafkaMetricsAutoConfiguration { factory.addListener(new MicrometerProducerListener<>(meterRegistry)); } + @Configuration(proxyBeanMethods = false) + @ConditionalOnClass({ KafkaStreamsMetrics.class, StreamsBuilderFactoryBean.class }) + static class KafkaStreamsMetricsAutoConfiguration { + + @Bean + StreamsBuilderFactoryBeanCustomizer kafkaStreamsProducerMetrics(MeterRegistry meterRegistry) { + return (factoryBean) -> factoryBean.addListener(new KafkaStreamsMicrometerListener(meterRegistry)); + } + + } + } diff --git a/spring-boot-project/spring-boot-actuator-autoconfigure/src/test/java/org/springframework/boot/actuate/autoconfigure/metrics/KafkaMetricsAutoConfigurationTests.java b/spring-boot-project/spring-boot-actuator-autoconfigure/src/test/java/org/springframework/boot/actuate/autoconfigure/metrics/KafkaMetricsAutoConfigurationTests.java index d15918ea91..11c14b6056 100644 --- a/spring-boot-project/spring-boot-actuator-autoconfigure/src/test/java/org/springframework/boot/actuate/autoconfigure/metrics/KafkaMetricsAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-actuator-autoconfigure/src/test/java/org/springframework/boot/actuate/autoconfigure/metrics/KafkaMetricsAutoConfigurationTests.java @@ -22,10 +22,14 @@ import org.springframework.boot.actuate.autoconfigure.metrics.test.MetricsRun; import org.springframework.boot.autoconfigure.AutoConfigurations; import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration; import org.springframework.boot.test.context.runner.ApplicationContextRunner; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.EnableKafkaStreams; +import org.springframework.kafka.config.StreamsBuilderFactoryBean; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.MicrometerConsumerListener; import org.springframework.kafka.core.MicrometerProducerListener; +import org.springframework.kafka.streams.KafkaStreamsMicrometerListener; import static org.assertj.core.api.Assertions.assertThat; @@ -34,6 +38,7 @@ import static org.assertj.core.api.Assertions.assertThat; * * @author Andy Wilkinson * @author Stephane Nicoll + * @author Eddú Meléndez */ class KafkaMetricsAutoConfigurationTests { @@ -61,4 +66,33 @@ class KafkaMetricsAutoConfigurationTests { }); } + @Test + void whenKafkaStreamsIsEnabledAndThereIsAMeterRegistryThenMetricsListenersAreAdded() { + this.contextRunner.withConfiguration(AutoConfigurations.of(KafkaAutoConfiguration.class)) + .withUserConfiguration(EnableKafkaStreamsConfiguration.class) + .withPropertyValues("spring.application.name=my-test-app").with(MetricsRun.simple()).run((context) -> { + StreamsBuilderFactoryBean streamsBuilderFactoryBean = context + .getBean(StreamsBuilderFactoryBean.class); + assertThat(streamsBuilderFactoryBean.getListeners()).hasSize(1) + .hasOnlyElementsOfTypes(KafkaStreamsMicrometerListener.class); + }); + } + + @Test + void whenKafkaStreamsIsEnabledAndThereIsNoMeterRegistryThenListenerCustomizationBacksOff() { + this.contextRunner.withConfiguration(AutoConfigurations.of(KafkaAutoConfiguration.class)) + .withUserConfiguration(EnableKafkaStreamsConfiguration.class) + .withPropertyValues("spring.application.name=my-test-app").run((context) -> { + StreamsBuilderFactoryBean streamsBuilderFactoryBean = context + .getBean(StreamsBuilderFactoryBean.class); + assertThat(streamsBuilderFactoryBean.getListeners()).isEmpty(); + }); + } + + @Configuration(proxyBeanMethods = false) + @EnableKafkaStreams + static class EnableKafkaStreamsConfiguration { + + } + } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaStreamsAnnotationDrivenConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaStreamsAnnotationDrivenConfiguration.java index 06f22c7e0b..d052adddc0 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaStreamsAnnotationDrivenConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaStreamsAnnotationDrivenConfiguration.java @@ -22,6 +22,7 @@ import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.springframework.beans.factory.InitializingBean; +import org.springframework.beans.factory.ObjectProvider; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; @@ -39,6 +40,7 @@ import org.springframework.kafka.config.StreamsBuilderFactoryBean; * * @author Gary Russell * @author Stephane Nicoll + * @author Eddú Meléndez */ @Configuration(proxyBeanMethods = false) @ConditionalOnClass(StreamsBuilder.class) @@ -68,7 +70,9 @@ class KafkaStreamsAnnotationDrivenConfiguration { @Bean KafkaStreamsFactoryBeanConfigurer kafkaStreamsFactoryBeanConfigurer( - @Qualifier(KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_BUILDER_BEAN_NAME) StreamsBuilderFactoryBean factoryBean) { + @Qualifier(KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_BUILDER_BEAN_NAME) StreamsBuilderFactoryBean factoryBean, + ObjectProvider customizers) { + customizers.orderedStream().forEach((customizer) -> customizer.customize(factoryBean)); return new KafkaStreamsFactoryBeanConfigurer(this.properties, factoryBean); } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/StreamsBuilderFactoryBeanCustomizer.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/StreamsBuilderFactoryBeanCustomizer.java new file mode 100644 index 0000000000..789eb846d7 --- /dev/null +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/StreamsBuilderFactoryBeanCustomizer.java @@ -0,0 +1,36 @@ +/* + * Copyright 2012-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.boot.autoconfigure.kafka; + +import org.springframework.kafka.config.StreamsBuilderFactoryBean; + +/** + * Callback interface for customizing {@code StreamsBuilderFactoryBean} beans. + * + * @author Eddú Meléndez + * @since 2.3.2 + */ +@FunctionalInterface +public interface StreamsBuilderFactoryBeanCustomizer { + + /** + * Customize the {@link StreamsBuilderFactoryBean}. + * @param factoryBean the factory bean to customize + */ + void customize(StreamsBuilderFactoryBean factoryBean); + +}