Merge pull request #28290 from pascal-ayotte

* pr/28290:
  Polish "Add support for IdlePartitionEventInterval"
  Add support for IdlePartitionEventInterval

Closes gh-28290
pull/29200/head
Stephane Nicoll 3 years ago
commit a0d4651a04

@ -1,5 +1,5 @@
/* /*
* Copyright 2012-2021 the original author or authors. * Copyright 2012-2022 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -199,6 +199,8 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {
map.from(properties::getNoPollThreshold).to(container::setNoPollThreshold); map.from(properties::getNoPollThreshold).to(container::setNoPollThreshold);
map.from(properties.getIdleBetweenPolls()).as(Duration::toMillis).to(container::setIdleBetweenPolls); map.from(properties.getIdleBetweenPolls()).as(Duration::toMillis).to(container::setIdleBetweenPolls);
map.from(properties::getIdleEventInterval).as(Duration::toMillis).to(container::setIdleEventInterval); map.from(properties::getIdleEventInterval).as(Duration::toMillis).to(container::setIdleEventInterval);
map.from(properties::getIdlePartitionEventInterval).as(Duration::toMillis)
.to(container::setIdlePartitionEventInterval);
map.from(properties::getMonitorInterval).as(Duration::getSeconds).as(Number::intValue) map.from(properties::getMonitorInterval).as(Duration::getSeconds).as(Number::intValue)
.to(container::setMonitorInterval); .to(container::setMonitorInterval);
map.from(properties::getLogContainerConfig).to(container::setLogContainerConfig); map.from(properties::getLogContainerConfig).to(container::setLogContainerConfig);

@ -1,5 +1,5 @@
/* /*
* Copyright 2012-2021 the original author or authors. * Copyright 2012-2022 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -902,6 +902,12 @@ public class KafkaProperties {
*/ */
private Duration idleEventInterval; private Duration idleEventInterval;
/**
* Time between publishing idle partition consumer events (no data received for
* partition).
*/
private Duration idlePartitionEventInterval;
/** /**
* Time between checks for non-responsive consumers. If a duration suffix is not * Time between checks for non-responsive consumers. If a duration suffix is not
* specified, seconds will be used. * specified, seconds will be used.
@ -1006,6 +1012,14 @@ public class KafkaProperties {
this.idleEventInterval = idleEventInterval; this.idleEventInterval = idleEventInterval;
} }
public Duration getIdlePartitionEventInterval() {
return this.idlePartitionEventInterval;
}
public void setIdlePartitionEventInterval(Duration idlePartitionEventInterval) {
this.idlePartitionEventInterval = idlePartitionEventInterval;
}
public Duration getMonitorInterval() { public Duration getMonitorInterval() {
return this.monitorInterval; return this.monitorInterval;
} }

@ -1,5 +1,5 @@
/* /*
* Copyright 2012-2021 the original author or authors. * Copyright 2012-2022 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -389,6 +389,7 @@ class KafkaAutoConfigurationTests {
"spring.kafka.listener.concurrency=3", "spring.kafka.listener.poll-timeout=2000", "spring.kafka.listener.concurrency=3", "spring.kafka.listener.poll-timeout=2000",
"spring.kafka.listener.no-poll-threshold=2.5", "spring.kafka.listener.type=batch", "spring.kafka.listener.no-poll-threshold=2.5", "spring.kafka.listener.type=batch",
"spring.kafka.listener.idle-between-polls=1s", "spring.kafka.listener.idle-event-interval=1s", "spring.kafka.listener.idle-between-polls=1s", "spring.kafka.listener.idle-event-interval=1s",
"spring.kafka.listener.idle-partition-event-interval=1s",
"spring.kafka.listener.monitor-interval=45", "spring.kafka.listener.log-container-config=true", "spring.kafka.listener.monitor-interval=45", "spring.kafka.listener.log-container-config=true",
"spring.kafka.listener.only-log-record-metadata=true", "spring.kafka.listener.only-log-record-metadata=true",
"spring.kafka.listener.missing-topics-fatal=true", "spring.kafka.jaas.enabled=true", "spring.kafka.listener.missing-topics-fatal=true", "spring.kafka.jaas.enabled=true",
@ -415,6 +416,7 @@ class KafkaAutoConfigurationTests {
assertThat(containerProperties.getNoPollThreshold()).isEqualTo(2.5f); assertThat(containerProperties.getNoPollThreshold()).isEqualTo(2.5f);
assertThat(containerProperties.getIdleBetweenPolls()).isEqualTo(1000L); assertThat(containerProperties.getIdleBetweenPolls()).isEqualTo(1000L);
assertThat(containerProperties.getIdleEventInterval()).isEqualTo(1000L); assertThat(containerProperties.getIdleEventInterval()).isEqualTo(1000L);
assertThat(containerProperties.getIdlePartitionEventInterval()).isEqualTo(1000L);
assertThat(containerProperties.getMonitorInterval()).isEqualTo(45); assertThat(containerProperties.getMonitorInterval()).isEqualTo(45);
assertThat(containerProperties.isLogContainerConfig()).isTrue(); assertThat(containerProperties.isLogContainerConfig()).isTrue();
assertThat(containerProperties.isOnlyLogRecordMetadata()).isTrue(); assertThat(containerProperties.isOnlyLogRecordMetadata()).isTrue();

Loading…
Cancel
Save