Merge pull request #23048 from s50600822

* gh-23048:
  Polish "Add a configuration property for KLC's idleBetweenPolls"
  Add a configuration property for KLC's idleBetweenPolls

Closes gh-23048
pull/23084/head
Andy Wilkinson 4 years ago
commit 1b8bfaa1fb

@ -184,6 +184,7 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {
map.from(properties::getAckTime).as(Duration::toMillis).to(container::setAckTime); map.from(properties::getAckTime).as(Duration::toMillis).to(container::setAckTime);
map.from(properties::getPollTimeout).as(Duration::toMillis).to(container::setPollTimeout); map.from(properties::getPollTimeout).as(Duration::toMillis).to(container::setPollTimeout);
map.from(properties::getNoPollThreshold).to(container::setNoPollThreshold); map.from(properties::getNoPollThreshold).to(container::setNoPollThreshold);
map.from(properties.getIdleBetweenPolls()).as(Duration::toMillis).to(container::setIdleBetweenPolls);
map.from(properties::getIdleEventInterval).as(Duration::toMillis).to(container::setIdleEventInterval); map.from(properties::getIdleEventInterval).as(Duration::toMillis).to(container::setIdleEventInterval);
map.from(properties::getMonitorInterval).as(Duration::getSeconds).as(Number::intValue) map.from(properties::getMonitorInterval).as(Duration::getSeconds).as(Number::intValue)
.to(container::setMonitorInterval); .to(container::setMonitorInterval);

@ -885,6 +885,11 @@ public class KafkaProperties {
*/ */
private Duration ackTime; private Duration ackTime;
/**
* Sleep interval between Consumer.poll(Duration) calls.
*/
private Duration idleBetweenPolls = Duration.ZERO;
/** /**
* Time between publishing idle consumer events (no data received). * Time between publishing idle consumer events (no data received).
*/ */
@ -972,6 +977,14 @@ public class KafkaProperties {
this.ackTime = ackTime; this.ackTime = ackTime;
} }
public Duration getIdleBetweenPolls() {
return this.idleBetweenPolls;
}
public void setIdleBetweenPolls(Duration idleBetweenPolls) {
this.idleBetweenPolls = idleBetweenPolls;
}
public Duration getIdleEventInterval() { public Duration getIdleEventInterval() {
return this.idleEventInterval; return this.idleEventInterval;
} }

@ -368,8 +368,8 @@ class KafkaAutoConfigurationTests {
"spring.kafka.listener.ack-count=123", "spring.kafka.listener.ack-time=456", "spring.kafka.listener.ack-count=123", "spring.kafka.listener.ack-time=456",
"spring.kafka.listener.concurrency=3", "spring.kafka.listener.poll-timeout=2000", "spring.kafka.listener.concurrency=3", "spring.kafka.listener.poll-timeout=2000",
"spring.kafka.listener.no-poll-threshold=2.5", "spring.kafka.listener.type=batch", "spring.kafka.listener.no-poll-threshold=2.5", "spring.kafka.listener.type=batch",
"spring.kafka.listener.idle-event-interval=1s", "spring.kafka.listener.monitor-interval=45", "spring.kafka.listener.idle-between-polls=1s", "spring.kafka.listener.idle-event-interval=1s",
"spring.kafka.listener.log-container-config=true", "spring.kafka.listener.monitor-interval=45", "spring.kafka.listener.log-container-config=true",
"spring.kafka.listener.missing-topics-fatal=true", "spring.kafka.jaas.enabled=true", "spring.kafka.listener.missing-topics-fatal=true", "spring.kafka.jaas.enabled=true",
"spring.kafka.producer.transaction-id-prefix=foo", "spring.kafka.jaas.login-module=foo", "spring.kafka.producer.transaction-id-prefix=foo", "spring.kafka.jaas.login-module=foo",
"spring.kafka.jaas.control-flag=REQUISITE", "spring.kafka.jaas.options.useKeyTab=true") "spring.kafka.jaas.control-flag=REQUISITE", "spring.kafka.jaas.options.useKeyTab=true")
@ -392,6 +392,7 @@ class KafkaAutoConfigurationTests {
assertThat(containerProperties.getAckTime()).isEqualTo(456L); assertThat(containerProperties.getAckTime()).isEqualTo(456L);
assertThat(containerProperties.getPollTimeout()).isEqualTo(2000L); assertThat(containerProperties.getPollTimeout()).isEqualTo(2000L);
assertThat(containerProperties.getNoPollThreshold()).isEqualTo(2.5f); assertThat(containerProperties.getNoPollThreshold()).isEqualTo(2.5f);
assertThat(containerProperties.getIdleBetweenPolls()).isEqualTo(1000L);
assertThat(containerProperties.getIdleEventInterval()).isEqualTo(1000L); assertThat(containerProperties.getIdleEventInterval()).isEqualTo(1000L);
assertThat(containerProperties.getMonitorInterval()).isEqualTo(45); assertThat(containerProperties.getMonitorInterval()).isEqualTo(45);
assertThat(containerProperties.isLogContainerConfig()).isTrue(); assertThat(containerProperties.isLogContainerConfig()).isTrue();

Loading…
Cancel
Save