diff --git a/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/java/org/springframework/boot/actuate/autoconfigure/tracing/zipkin/ZipkinConfigurations.java b/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/java/org/springframework/boot/actuate/autoconfigure/tracing/zipkin/ZipkinConfigurations.java index b4a1802b7b..f4ecc95031 100644 --- a/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/java/org/springframework/boot/actuate/autoconfigure/tracing/zipkin/ZipkinConfigurations.java +++ b/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/java/org/springframework/boot/actuate/autoconfigure/tracing/zipkin/ZipkinConfigurations.java @@ -119,7 +119,8 @@ class ZipkinConfigurations { .getIfAvailable(() -> new PropertiesZipkinConnectionDetails(properties)); WebClient.Builder builder = WebClient.builder(); customizers.orderedStream().forEach((customizer) -> customizer.customize(builder)); - return new ZipkinWebClientSender(connectionDetails.getSpanEndpoint(), builder.build()); + return new ZipkinWebClientSender(connectionDetails.getSpanEndpoint(), builder.build(), + properties.getConnectTimeout().plus(properties.getReadTimeout())); } } diff --git a/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/java/org/springframework/boot/actuate/autoconfigure/tracing/zipkin/ZipkinWebClientSender.java b/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/java/org/springframework/boot/actuate/autoconfigure/tracing/zipkin/ZipkinWebClientSender.java index b9992bd575..2ef8cb74c0 100644 --- a/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/java/org/springframework/boot/actuate/autoconfigure/tracing/zipkin/ZipkinWebClientSender.java +++ b/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/java/org/springframework/boot/actuate/autoconfigure/tracing/zipkin/ZipkinWebClientSender.java @@ -16,6 +16,8 @@ package org.springframework.boot.actuate.autoconfigure.tracing.zipkin; +import java.time.Duration; + import reactor.core.publisher.Mono; import zipkin2.Call; import zipkin2.Callback; @@ -28,6 +30,7 @@ import org.springframework.web.reactive.function.client.WebClient; * An {@link HttpSender} which uses {@link WebClient} for HTTP communication. * * @author Stefan Bratanov + * @author Moritz Halbritter */ class ZipkinWebClientSender extends HttpSender { @@ -35,14 +38,17 @@ class ZipkinWebClientSender extends HttpSender { private final WebClient webClient; - ZipkinWebClientSender(String endpoint, WebClient webClient) { + private final Duration timeout; + + ZipkinWebClientSender(String endpoint, WebClient webClient, Duration timeout) { this.endpoint = endpoint; this.webClient = webClient; + this.timeout = timeout; } @Override public HttpPostCall sendSpans(byte[] batchedEncodedSpans) { - return new WebClientHttpPostCall(this.endpoint, batchedEncodedSpans, this.webClient); + return new WebClientHttpPostCall(this.endpoint, batchedEncodedSpans, this.webClient, this.timeout); } private static class WebClientHttpPostCall extends HttpPostCall { @@ -51,15 +57,18 @@ class ZipkinWebClientSender extends HttpSender { private final WebClient webClient; - WebClientHttpPostCall(String endpoint, byte[] body, WebClient webClient) { + private final Duration timeout; + + WebClientHttpPostCall(String endpoint, byte[] body, WebClient webClient, Duration timeout) { super(body); this.endpoint = endpoint; this.webClient = webClient; + this.timeout = timeout; } @Override public Call clone() { - return new WebClientHttpPostCall(this.endpoint, getUncompressedBody(), this.webClient); + return new WebClientHttpPostCall(this.endpoint, getUncompressedBody(), this.webClient, this.timeout); } @Override @@ -79,7 +88,8 @@ class ZipkinWebClientSender extends HttpSender { .headers(this::addDefaultHeaders) .bodyValue(getBody()) .retrieve() - .toBodilessEntity(); + .toBodilessEntity() + .timeout(this.timeout); } private void addDefaultHeaders(HttpHeaders headers) { diff --git a/spring-boot-project/spring-boot-actuator-autoconfigure/src/test/java/org/springframework/boot/actuate/autoconfigure/tracing/zipkin/ZipkinHttpSenderTests.java b/spring-boot-project/spring-boot-actuator-autoconfigure/src/test/java/org/springframework/boot/actuate/autoconfigure/tracing/zipkin/ZipkinHttpSenderTests.java index 970a8c3009..80654a3e55 100644 --- a/spring-boot-project/spring-boot-actuator-autoconfigure/src/test/java/org/springframework/boot/actuate/autoconfigure/tracing/zipkin/ZipkinHttpSenderTests.java +++ b/spring-boot-project/spring-boot-actuator-autoconfigure/src/test/java/org/springframework/boot/actuate/autoconfigure/tracing/zipkin/ZipkinHttpSenderTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2022 the original author or authors. + * Copyright 2012-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,6 +25,7 @@ import java.util.Objects; import java.util.concurrent.atomic.AtomicReference; import org.awaitility.Awaitility; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import zipkin2.Callback; @@ -42,19 +43,25 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; */ abstract class ZipkinHttpSenderTests { - protected Sender sut; + protected Sender sender; - abstract Sender createSut(); + abstract Sender createSender(); @BeforeEach - void setUp() { - this.sut = createSut(); + void beforeEach() throws Exception { + this.sender = createSender(); + } + + @AfterEach + void afterEach() throws IOException { + this.sender.close(); } @Test void sendSpansShouldThrowIfCloseWasCalled() throws IOException { - this.sut.close(); - assertThatThrownBy(() -> this.sut.sendSpans(Collections.emptyList())).isInstanceOf(ClosedSenderException.class); + this.sender.close(); + assertThatThrownBy(() -> this.sender.sendSpans(Collections.emptyList())) + .isInstanceOf(ClosedSenderException.class); } protected void makeRequest(List encodedSpans, boolean async) throws IOException { @@ -68,8 +75,12 @@ abstract class ZipkinHttpSenderTests { } protected CallbackResult makeAsyncRequest(List encodedSpans) { + return makeAsyncRequest(this.sender, encodedSpans); + } + + protected CallbackResult makeAsyncRequest(Sender sender, List encodedSpans) { AtomicReference callbackResult = new AtomicReference<>(); - this.sut.sendSpans(encodedSpans).enqueue(new Callback<>() { + sender.sendSpans(encodedSpans).enqueue(new Callback<>() { @Override public void onSuccess(Void value) { callbackResult.set(new CallbackResult(true, null)); @@ -84,7 +95,11 @@ abstract class ZipkinHttpSenderTests { } protected void makeSyncRequest(List encodedSpans) throws IOException { - this.sut.sendSpans(encodedSpans).execute(); + makeSyncRequest(this.sender, encodedSpans); + } + + protected void makeSyncRequest(Sender sender, List encodedSpans) throws IOException { + sender.sendSpans(encodedSpans).execute(); } protected byte[] toByteArray(String input) { diff --git a/spring-boot-project/spring-boot-actuator-autoconfigure/src/test/java/org/springframework/boot/actuate/autoconfigure/tracing/zipkin/ZipkinRestTemplateSenderTests.java b/spring-boot-project/spring-boot-actuator-autoconfigure/src/test/java/org/springframework/boot/actuate/autoconfigure/tracing/zipkin/ZipkinRestTemplateSenderTests.java index ec945ac3da..c5809f0415 100644 --- a/spring-boot-project/spring-boot-actuator-autoconfigure/src/test/java/org/springframework/boot/actuate/autoconfigure/tracing/zipkin/ZipkinRestTemplateSenderTests.java +++ b/spring-boot-project/spring-boot-actuator-autoconfigure/src/test/java/org/springframework/boot/actuate/autoconfigure/tracing/zipkin/ZipkinRestTemplateSenderTests.java @@ -54,14 +54,16 @@ class ZipkinRestTemplateSenderTests extends ZipkinHttpSenderTests { private MockRestServiceServer mockServer; @Override - Sender createSut() { + Sender createSender() { RestTemplate restTemplate = new RestTemplate(); this.mockServer = MockRestServiceServer.createServer(restTemplate); return new ZipkinRestTemplateSender(ZIPKIN_URL, restTemplate); } @AfterEach - void tearDown() { + @Override + void afterEach() throws IOException { + super.afterEach(); this.mockServer.verify(); } @@ -71,7 +73,7 @@ class ZipkinRestTemplateSenderTests extends ZipkinHttpSenderTests { .andExpect(method(HttpMethod.POST)) .andExpect(content().string("[]")) .andRespond(withStatus(HttpStatus.ACCEPTED)); - assertThat(this.sut.check()).isEqualTo(CheckResult.OK); + assertThat(this.sender.check()).isEqualTo(CheckResult.OK); } @Test @@ -79,7 +81,7 @@ class ZipkinRestTemplateSenderTests extends ZipkinHttpSenderTests { this.mockServer.expect(requestTo(ZIPKIN_URL)) .andExpect(method(HttpMethod.POST)) .andRespond(withStatus(HttpStatus.INTERNAL_SERVER_ERROR)); - CheckResult result = this.sut.check(); + CheckResult result = this.sender.check(); assertThat(result.ok()).isFalse(); assertThat(result.error()).hasMessageContaining("500 Internal Server Error"); } diff --git a/spring-boot-project/spring-boot-actuator-autoconfigure/src/test/java/org/springframework/boot/actuate/autoconfigure/tracing/zipkin/ZipkinWebClientSenderTests.java b/spring-boot-project/spring-boot-actuator-autoconfigure/src/test/java/org/springframework/boot/actuate/autoconfigure/tracing/zipkin/ZipkinWebClientSenderTests.java index dc19d987ca..b5b1b42b7e 100644 --- a/spring-boot-project/spring-boot-actuator-autoconfigure/src/test/java/org/springframework/boot/actuate/autoconfigure/tracing/zipkin/ZipkinWebClientSenderTests.java +++ b/spring-boot-project/spring-boot-actuator-autoconfigure/src/test/java/org/springframework/boot/actuate/autoconfigure/tracing/zipkin/ZipkinWebClientSenderTests.java @@ -17,16 +17,21 @@ package org.springframework.boot.actuate.autoconfigure.tracing.zipkin; import java.io.IOException; +import java.time.Duration; import java.util.Base64; import java.util.Collections; import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.function.Consumer; import okhttp3.mockwebserver.MockResponse; import okhttp3.mockwebserver.MockWebServer; +import okhttp3.mockwebserver.QueueDispatcher; import okhttp3.mockwebserver.RecordedRequest; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @@ -45,33 +50,48 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; */ class ZipkinWebClientSenderTests extends ZipkinHttpSenderTests { + private static ClearableDispatcher dispatcher; + private static MockWebServer mockBackEnd; private static String ZIPKIN_URL; @BeforeAll static void beforeAll() throws IOException { + dispatcher = new ClearableDispatcher(); mockBackEnd = new MockWebServer(); + mockBackEnd.setDispatcher(dispatcher); mockBackEnd.start(); - ZIPKIN_URL = "http://localhost:%s/api/v2/spans".formatted(mockBackEnd.getPort()); + ZIPKIN_URL = mockBackEnd.url("/api/v2/spans").toString(); } @AfterAll - static void tearDown() throws IOException { + static void afterAll() throws IOException { mockBackEnd.shutdown(); } @Override - Sender createSut() { + @BeforeEach + void beforeEach() throws Exception { + super.beforeEach(); + clearResponses(); + clearRequests(); + } + + @Override + Sender createSender() { + return createSender(Duration.ofSeconds(10)); + } + + Sender createSender(Duration timeout) { WebClient webClient = WebClient.builder().build(); - return new ZipkinWebClientSender(ZIPKIN_URL, webClient); + return new ZipkinWebClientSender(ZIPKIN_URL, webClient, timeout); } @Test void checkShouldSendEmptySpanList() throws InterruptedException { mockBackEnd.enqueue(new MockResponse()); - assertThat(this.sut.check()).isEqualTo(CheckResult.OK); - + assertThat(this.sender.check()).isEqualTo(CheckResult.OK); requestAssertions((request) -> { assertThat(request.getMethod()).isEqualTo("POST"); assertThat(request.getBody().readUtf8()).isEqualTo("[]"); @@ -81,10 +101,9 @@ class ZipkinWebClientSenderTests extends ZipkinHttpSenderTests { @Test void checkShouldNotRaiseException() throws InterruptedException { mockBackEnd.enqueue(new MockResponse().setResponseCode(500)); - CheckResult result = this.sut.check(); + CheckResult result = this.sender.check(); assertThat(result.ok()).isFalse(); assertThat(result.error()).hasMessageContaining("500 Internal Server Error"); - requestAssertions((request) -> assertThat(request.getMethod()).isEqualTo("POST")); } @@ -94,7 +113,6 @@ class ZipkinWebClientSenderTests extends ZipkinHttpSenderTests { mockBackEnd.enqueue(new MockResponse()); List encodedSpans = List.of(toByteArray("span1"), toByteArray("span2")); makeRequest(encodedSpans, async); - requestAssertions((request) -> { assertThat(request.getMethod()).isEqualTo("POST"); assertThat(request.getHeader("Content-Type")).isEqualTo("application/json"); @@ -115,7 +133,6 @@ class ZipkinWebClientSenderTests extends ZipkinHttpSenderTests { assertThatThrownBy(() -> makeSyncRequest(Collections.emptyList())) .hasMessageContaining("500 Internal Server Error"); } - requestAssertions((request) -> assertThat(request.getMethod()).isEqualTo("POST")); } @@ -126,18 +143,31 @@ class ZipkinWebClientSenderTests extends ZipkinHttpSenderTests { // This is gzip compressed 10000 times 'a' byte[] compressed = Base64.getDecoder() .decode("H4sIAAAAAAAA/+3BMQ0AAAwDIKFLj/k3UR8NcA8AAAAAAAAAAAADUsAZfeASJwAA"); - mockBackEnd.enqueue(new MockResponse()); - makeRequest(List.of(toByteArray(uncompressed)), async); - requestAssertions((request) -> { assertThat(request.getMethod()).isEqualTo("POST"); assertThat(request.getHeader("Content-Type")).isEqualTo("application/json"); assertThat(request.getHeader("Content-Encoding")).isEqualTo("gzip"); assertThat(request.getBody().readByteArray()).isEqualTo(compressed); }); + } + @ParameterizedTest + @ValueSource(booleans = { true, false }) + void shouldTimeout(boolean async) { + Sender sender = createSender(Duration.ofMillis(1)); + MockResponse response = new MockResponse().setResponseCode(200).setHeadersDelay(100, TimeUnit.MILLISECONDS); + mockBackEnd.enqueue(response); + if (async) { + CallbackResult callbackResult = makeAsyncRequest(sender, Collections.emptyList()); + assertThat(callbackResult.success()).isFalse(); + assertThat(callbackResult.error()).isNotNull().isInstanceOf(TimeoutException.class); + } + else { + assertThatThrownBy(() -> makeSyncRequest(sender, Collections.emptyList())) + .hasCauseInstanceOf(TimeoutException.class); + } } private void requestAssertions(Consumer assertions) throws InterruptedException { @@ -145,4 +175,24 @@ class ZipkinWebClientSenderTests extends ZipkinHttpSenderTests { assertThat(request).satisfies(assertions); } + private static void clearRequests() throws InterruptedException { + RecordedRequest request; + do { + request = mockBackEnd.takeRequest(0, TimeUnit.SECONDS); + } + while (request != null); + } + + private static void clearResponses() { + dispatcher.clear(); + } + + private static class ClearableDispatcher extends QueueDispatcher { + + void clear() { + getResponseQueue().clear(); + } + + } + }