Format with Eclipse Oxygen SR2

pull/11971/head
Phillip Webb 7 years ago
parent 55e98ac977
commit 387804e600

@ -48,8 +48,8 @@ import org.springframework.kafka.core.KafkaAdmin;
@AutoConfigureBefore(HealthIndicatorAutoConfiguration.class)
@AutoConfigureAfter(KafkaAutoConfiguration.class)
@EnableConfigurationProperties(KafkaHealthIndicatorProperties.class)
public class KafkaHealthIndicatorAutoConfiguration extends
CompositeHealthIndicatorConfiguration<KafkaHealthIndicator, KafkaAdmin> {
public class KafkaHealthIndicatorAutoConfiguration
extends CompositeHealthIndicatorConfiguration<KafkaHealthIndicator, KafkaAdmin> {
private final Map<String, KafkaAdmin> admins;

@ -41,9 +41,9 @@ public class KafkaHealthIndicatorAutoConfigurationTests {
@Test
public void runShouldCreateIndicator() {
this.contextRunner.run((context) -> assertThat(context)
.hasSingleBean(KafkaHealthIndicator.class)
.doesNotHaveBean(ApplicationHealthIndicator.class));
this.contextRunner.run(
(context) -> assertThat(context).hasSingleBean(KafkaHealthIndicator.class)
.doesNotHaveBean(ApplicationHealthIndicator.class));
}
@Test

@ -63,21 +63,19 @@ public class KafkaHealthIndicator extends AbstractHealthIndicator {
@Override
protected void doHealthCheck(Builder builder) throws Exception {
try (AdminClient adminClient = AdminClient.create(this.kafkaAdmin.getConfig())) {
DescribeClusterResult result = adminClient.describeCluster(
this.describeOptions);
DescribeClusterResult result = adminClient
.describeCluster(this.describeOptions);
String brokerId = result.controller().get().idString();
int replicationFactor = getReplicationFactor(brokerId, adminClient);
int nodes = result.nodes().get().size();
Status status = nodes >= replicationFactor ? Status.UP : Status.DOWN;
builder.status(status)
.withDetail("clusterId", result.clusterId().get())
.withDetail("brokerId", brokerId)
.withDetail("nodes", nodes);
builder.status(status).withDetail("clusterId", result.clusterId().get())
.withDetail("brokerId", brokerId).withDetail("nodes", nodes);
}
}
private int getReplicationFactor(String brokerId,
AdminClient adminClient) throws ExecutionException, InterruptedException {
private int getReplicationFactor(String brokerId, AdminClient adminClient)
throws ExecutionException, InterruptedException {
ConfigResource configResource = new ConfigResource(Type.BROKER, brokerId);
Map<ConfigResource, Config> kafkaConfig = adminClient
.describeConfigs(Collections.singletonList(configResource)).all().get();
@ -86,4 +84,3 @@ public class KafkaHealthIndicator extends AbstractHealthIndicator {
}
}

@ -49,11 +49,12 @@ public class KafkaHealthIndicatorTests {
this.kafkaEmbedded.destroy();
}
}
@Test
public void kafkaIsUp() throws Exception {
startKafka(1);
KafkaHealthIndicator healthIndicator =
new KafkaHealthIndicator(this.kafkaAdmin, 1000L);
KafkaHealthIndicator healthIndicator = new KafkaHealthIndicator(this.kafkaAdmin,
1000L);
Health health = healthIndicator.health();
assertThat(health.getStatus()).isEqualTo(Status.UP);
assertDetails(health.getDetails());
@ -64,8 +65,8 @@ public class KafkaHealthIndicatorTests {
int freePort = SocketUtils.findAvailableTcpPort();
this.kafkaAdmin = new KafkaAdmin(Collections.singletonMap(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:" + freePort));
KafkaHealthIndicator healthIndicator =
new KafkaHealthIndicator(this.kafkaAdmin, 1L);
KafkaHealthIndicator healthIndicator = new KafkaHealthIndicator(this.kafkaAdmin,
1L);
Health health = healthIndicator.health();
assertThat(health.getStatus()).isEqualTo(Status.DOWN);
assertThat((String) health.getDetails().get("error")).isNotEmpty();
@ -74,8 +75,8 @@ public class KafkaHealthIndicatorTests {
@Test
public void notEnoughNodesForReplicationFactor() throws Exception {
startKafka(2);
KafkaHealthIndicator healthIndicator =
new KafkaHealthIndicator(this.kafkaAdmin, 1000L);
KafkaHealthIndicator healthIndicator = new KafkaHealthIndicator(this.kafkaAdmin,
1000L);
Health health = healthIndicator.health();
assertThat(health.getStatus()).isEqualTo(Status.DOWN);
assertDetails(health.getDetails());
@ -89,13 +90,13 @@ public class KafkaHealthIndicatorTests {
private void startKafka(int replicationFactor) throws Exception {
this.kafkaEmbedded = new KafkaEmbedded(1, true);
this.kafkaEmbedded.brokerProperties(Collections.singletonMap(
KafkaHealthIndicator.REPLICATION_PROPERTY,
String.valueOf(replicationFactor)));
this.kafkaEmbedded.brokerProperties(
Collections.singletonMap(KafkaHealthIndicator.REPLICATION_PROPERTY,
String.valueOf(replicationFactor)));
this.kafkaEmbedded.before();
this.kafkaAdmin = new KafkaAdmin(Collections.singletonMap(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
this.kafkaEmbedded.getBrokersAsString()));
this.kafkaAdmin = new KafkaAdmin(
Collections.singletonMap(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
this.kafkaEmbedded.getBrokersAsString()));
}
}

Loading…
Cancel
Save