From 03a8937d5c442dd00108bef795b6b9b5cee209ec Mon Sep 17 00:00:00 2001 From: hoaphan Date: Sat, 22 Aug 2020 20:57:11 +1000 Subject: [PATCH 1/2] Add a configuration property for KLC's idleBetweenPolls See gh-23048 --- ...ntKafkaListenerContainerFactoryConfigurer.java | 1 + .../boot/autoconfigure/kafka/KafkaProperties.java | 15 ++++++++++++++- 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java index e6d0ac47d0..9a99ce6c08 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java @@ -187,6 +187,7 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer { map.from(properties::getIdleEventInterval).as(Duration::toMillis).to(container::setIdleEventInterval); map.from(properties::getMonitorInterval).as(Duration::getSeconds).as(Number::intValue) .to(container::setMonitorInterval); + map.from(properties.getIdleBetweenPolls()).as(Duration::toMillis).to(container::setIdleBetweenPolls); map.from(properties::getLogContainerConfig).to(container::setLogContainerConfig); map.from(properties::isMissingTopicsFatal).to(container::setMissingTopicsFatal); map.from(this.transactionManager).to(container::setTransactionManager); diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java index ffbc2c50a3..9698255ea7 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java @@ -890,6 +890,12 @@ public class KafkaProperties { */ private Duration idleEventInterval; + /** + * The sleep interval in milliseconds between + * {@link org.apache.kafka.clients.consumer.Consumer#poll(Duration)} calls. + */ + private Duration idleBetweenPolls; + /** * Time between checks for non-responsive consumers. If a duration suffix is not * specified, seconds will be used. @@ -980,6 +986,14 @@ public class KafkaProperties { this.idleEventInterval = idleEventInterval; } + public Duration getIdleBetweenPolls() { + return idleBetweenPolls; + } + + public void setIdleBetweenPolls(Duration idleBetweenPolls) { + this.idleBetweenPolls = idleBetweenPolls; + } + public Duration getMonitorInterval() { return this.monitorInterval; } @@ -1003,7 +1017,6 @@ public class KafkaProperties { public void setMissingTopicsFatal(boolean missingTopicsFatal) { this.missingTopicsFatal = missingTopicsFatal; } - } public static class Ssl { From e9ab269b3b6f106a01bd0d3182a5e8a3cf233dbd Mon Sep 17 00:00:00 2001 From: Andy Wilkinson Date: Tue, 25 Aug 2020 15:43:26 +0100 Subject: [PATCH 2/2] Polish "Add a configuration property for KLC's idleBetweenPolls" See gh-23048 --- ...fkaListenerContainerFactoryConfigurer.java | 2 +- .../autoconfigure/kafka/KafkaProperties.java | 26 +++++++++---------- .../kafka/KafkaAutoConfigurationTests.java | 5 ++-- 3 files changed, 17 insertions(+), 16 deletions(-) diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java index 9a99ce6c08..7431fcf848 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java @@ -184,10 +184,10 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer { map.from(properties::getAckTime).as(Duration::toMillis).to(container::setAckTime); map.from(properties::getPollTimeout).as(Duration::toMillis).to(container::setPollTimeout); map.from(properties::getNoPollThreshold).to(container::setNoPollThreshold); + map.from(properties.getIdleBetweenPolls()).as(Duration::toMillis).to(container::setIdleBetweenPolls); map.from(properties::getIdleEventInterval).as(Duration::toMillis).to(container::setIdleEventInterval); map.from(properties::getMonitorInterval).as(Duration::getSeconds).as(Number::intValue) .to(container::setMonitorInterval); - map.from(properties.getIdleBetweenPolls()).as(Duration::toMillis).to(container::setIdleBetweenPolls); map.from(properties::getLogContainerConfig).to(container::setLogContainerConfig); map.from(properties::isMissingTopicsFatal).to(container::setMissingTopicsFatal); map.from(this.transactionManager).to(container::setTransactionManager); diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java index 9698255ea7..778c6f9bac 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java @@ -886,15 +886,14 @@ public class KafkaProperties { private Duration ackTime; /** - * Time between publishing idle consumer events (no data received). + * Sleep interval between Consumer.poll(Duration) calls. */ - private Duration idleEventInterval; + private Duration idleBetweenPolls = Duration.ZERO; /** - * The sleep interval in milliseconds between - * {@link org.apache.kafka.clients.consumer.Consumer#poll(Duration)} calls. + * Time between publishing idle consumer events (no data received). */ - private Duration idleBetweenPolls; + private Duration idleEventInterval; /** * Time between checks for non-responsive consumers. If a duration suffix is not @@ -978,20 +977,20 @@ public class KafkaProperties { this.ackTime = ackTime; } - public Duration getIdleEventInterval() { - return this.idleEventInterval; + public Duration getIdleBetweenPolls() { + return this.idleBetweenPolls; } - public void setIdleEventInterval(Duration idleEventInterval) { - this.idleEventInterval = idleEventInterval; + public void setIdleBetweenPolls(Duration idleBetweenPolls) { + this.idleBetweenPolls = idleBetweenPolls; } - public Duration getIdleBetweenPolls() { - return idleBetweenPolls; + public Duration getIdleEventInterval() { + return this.idleEventInterval; } - public void setIdleBetweenPolls(Duration idleBetweenPolls) { - this.idleBetweenPolls = idleBetweenPolls; + public void setIdleEventInterval(Duration idleEventInterval) { + this.idleEventInterval = idleEventInterval; } public Duration getMonitorInterval() { @@ -1017,6 +1016,7 @@ public class KafkaProperties { public void setMissingTopicsFatal(boolean missingTopicsFatal) { this.missingTopicsFatal = missingTopicsFatal; } + } public static class Ssl { diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java index 176a9d7982..3e3e93e052 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java @@ -368,8 +368,8 @@ class KafkaAutoConfigurationTests { "spring.kafka.listener.ack-count=123", "spring.kafka.listener.ack-time=456", "spring.kafka.listener.concurrency=3", "spring.kafka.listener.poll-timeout=2000", "spring.kafka.listener.no-poll-threshold=2.5", "spring.kafka.listener.type=batch", - "spring.kafka.listener.idle-event-interval=1s", "spring.kafka.listener.monitor-interval=45", - "spring.kafka.listener.log-container-config=true", + "spring.kafka.listener.idle-between-polls=1s", "spring.kafka.listener.idle-event-interval=1s", + "spring.kafka.listener.monitor-interval=45", "spring.kafka.listener.log-container-config=true", "spring.kafka.listener.missing-topics-fatal=true", "spring.kafka.jaas.enabled=true", "spring.kafka.producer.transaction-id-prefix=foo", "spring.kafka.jaas.login-module=foo", "spring.kafka.jaas.control-flag=REQUISITE", "spring.kafka.jaas.options.useKeyTab=true") @@ -392,6 +392,7 @@ class KafkaAutoConfigurationTests { assertThat(containerProperties.getAckTime()).isEqualTo(456L); assertThat(containerProperties.getPollTimeout()).isEqualTo(2000L); assertThat(containerProperties.getNoPollThreshold()).isEqualTo(2.5f); + assertThat(containerProperties.getIdleBetweenPolls()).isEqualTo(1000L); assertThat(containerProperties.getIdleEventInterval()).isEqualTo(1000L); assertThat(containerProperties.getMonitorInterval()).isEqualTo(45); assertThat(containerProperties.isLogContainerConfig()).isTrue();