Honor timeout in ZipkinWebClientSender

Unfortunately there's no good way to configure connect and read timeout
separately, which works for all supported reactive clients. This
implementation applies a timeout through Reactor's timeout method. The
timeout from the properties is summed together and this is the applied
timeout. While not perfect, this is better than no timeout at all.

Closes gh-31496
pull/37630/head
Moritz Halbritter 1 year ago
parent 619a0f3b35
commit 72a4e1ebae

@ -119,7 +119,8 @@ class ZipkinConfigurations {
.getIfAvailable(() -> new PropertiesZipkinConnectionDetails(properties)); .getIfAvailable(() -> new PropertiesZipkinConnectionDetails(properties));
WebClient.Builder builder = WebClient.builder(); WebClient.Builder builder = WebClient.builder();
customizers.orderedStream().forEach((customizer) -> customizer.customize(builder)); customizers.orderedStream().forEach((customizer) -> customizer.customize(builder));
return new ZipkinWebClientSender(connectionDetails.getSpanEndpoint(), builder.build()); return new ZipkinWebClientSender(connectionDetails.getSpanEndpoint(), builder.build(),
properties.getConnectTimeout().plus(properties.getReadTimeout()));
} }
} }

@ -16,6 +16,8 @@
package org.springframework.boot.actuate.autoconfigure.tracing.zipkin; package org.springframework.boot.actuate.autoconfigure.tracing.zipkin;
import java.time.Duration;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import zipkin2.Call; import zipkin2.Call;
import zipkin2.Callback; import zipkin2.Callback;
@ -28,6 +30,7 @@ import org.springframework.web.reactive.function.client.WebClient;
* An {@link HttpSender} which uses {@link WebClient} for HTTP communication. * An {@link HttpSender} which uses {@link WebClient} for HTTP communication.
* *
* @author Stefan Bratanov * @author Stefan Bratanov
* @author Moritz Halbritter
*/ */
class ZipkinWebClientSender extends HttpSender { class ZipkinWebClientSender extends HttpSender {
@ -35,14 +38,17 @@ class ZipkinWebClientSender extends HttpSender {
private final WebClient webClient; private final WebClient webClient;
ZipkinWebClientSender(String endpoint, WebClient webClient) { private final Duration timeout;
ZipkinWebClientSender(String endpoint, WebClient webClient, Duration timeout) {
this.endpoint = endpoint; this.endpoint = endpoint;
this.webClient = webClient; this.webClient = webClient;
this.timeout = timeout;
} }
@Override @Override
public HttpPostCall sendSpans(byte[] batchedEncodedSpans) { public HttpPostCall sendSpans(byte[] batchedEncodedSpans) {
return new WebClientHttpPostCall(this.endpoint, batchedEncodedSpans, this.webClient); return new WebClientHttpPostCall(this.endpoint, batchedEncodedSpans, this.webClient, this.timeout);
} }
private static class WebClientHttpPostCall extends HttpPostCall { private static class WebClientHttpPostCall extends HttpPostCall {
@ -51,15 +57,18 @@ class ZipkinWebClientSender extends HttpSender {
private final WebClient webClient; private final WebClient webClient;
WebClientHttpPostCall(String endpoint, byte[] body, WebClient webClient) { private final Duration timeout;
WebClientHttpPostCall(String endpoint, byte[] body, WebClient webClient, Duration timeout) {
super(body); super(body);
this.endpoint = endpoint; this.endpoint = endpoint;
this.webClient = webClient; this.webClient = webClient;
this.timeout = timeout;
} }
@Override @Override
public Call<Void> clone() { public Call<Void> clone() {
return new WebClientHttpPostCall(this.endpoint, getUncompressedBody(), this.webClient); return new WebClientHttpPostCall(this.endpoint, getUncompressedBody(), this.webClient, this.timeout);
} }
@Override @Override
@ -79,7 +88,8 @@ class ZipkinWebClientSender extends HttpSender {
.headers(this::addDefaultHeaders) .headers(this::addDefaultHeaders)
.bodyValue(getBody()) .bodyValue(getBody())
.retrieve() .retrieve()
.toBodilessEntity(); .toBodilessEntity()
.timeout(this.timeout);
} }
private void addDefaultHeaders(HttpHeaders headers) { private void addDefaultHeaders(HttpHeaders headers) {

@ -1,5 +1,5 @@
/* /*
* Copyright 2012-2022 the original author or authors. * Copyright 2012-2023 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -25,6 +25,7 @@ import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import org.awaitility.Awaitility; import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import zipkin2.Callback; import zipkin2.Callback;
@ -42,19 +43,25 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy;
*/ */
abstract class ZipkinHttpSenderTests { abstract class ZipkinHttpSenderTests {
protected Sender sut; protected Sender sender;
abstract Sender createSut(); abstract Sender createSender();
@BeforeEach @BeforeEach
void setUp() { void beforeEach() throws Exception {
this.sut = createSut(); this.sender = createSender();
}
@AfterEach
void afterEach() throws IOException {
this.sender.close();
} }
@Test @Test
void sendSpansShouldThrowIfCloseWasCalled() throws IOException { void sendSpansShouldThrowIfCloseWasCalled() throws IOException {
this.sut.close(); this.sender.close();
assertThatThrownBy(() -> this.sut.sendSpans(Collections.emptyList())).isInstanceOf(ClosedSenderException.class); assertThatThrownBy(() -> this.sender.sendSpans(Collections.emptyList()))
.isInstanceOf(ClosedSenderException.class);
} }
protected void makeRequest(List<byte[]> encodedSpans, boolean async) throws IOException { protected void makeRequest(List<byte[]> encodedSpans, boolean async) throws IOException {
@ -68,8 +75,12 @@ abstract class ZipkinHttpSenderTests {
} }
protected CallbackResult makeAsyncRequest(List<byte[]> encodedSpans) { protected CallbackResult makeAsyncRequest(List<byte[]> encodedSpans) {
return makeAsyncRequest(this.sender, encodedSpans);
}
protected CallbackResult makeAsyncRequest(Sender sender, List<byte[]> encodedSpans) {
AtomicReference<CallbackResult> callbackResult = new AtomicReference<>(); AtomicReference<CallbackResult> callbackResult = new AtomicReference<>();
this.sut.sendSpans(encodedSpans).enqueue(new Callback<>() { sender.sendSpans(encodedSpans).enqueue(new Callback<>() {
@Override @Override
public void onSuccess(Void value) { public void onSuccess(Void value) {
callbackResult.set(new CallbackResult(true, null)); callbackResult.set(new CallbackResult(true, null));
@ -84,7 +95,11 @@ abstract class ZipkinHttpSenderTests {
} }
protected void makeSyncRequest(List<byte[]> encodedSpans) throws IOException { protected void makeSyncRequest(List<byte[]> encodedSpans) throws IOException {
this.sut.sendSpans(encodedSpans).execute(); makeSyncRequest(this.sender, encodedSpans);
}
protected void makeSyncRequest(Sender sender, List<byte[]> encodedSpans) throws IOException {
sender.sendSpans(encodedSpans).execute();
} }
protected byte[] toByteArray(String input) { protected byte[] toByteArray(String input) {

@ -54,14 +54,16 @@ class ZipkinRestTemplateSenderTests extends ZipkinHttpSenderTests {
private MockRestServiceServer mockServer; private MockRestServiceServer mockServer;
@Override @Override
Sender createSut() { Sender createSender() {
RestTemplate restTemplate = new RestTemplate(); RestTemplate restTemplate = new RestTemplate();
this.mockServer = MockRestServiceServer.createServer(restTemplate); this.mockServer = MockRestServiceServer.createServer(restTemplate);
return new ZipkinRestTemplateSender(ZIPKIN_URL, restTemplate); return new ZipkinRestTemplateSender(ZIPKIN_URL, restTemplate);
} }
@AfterEach @AfterEach
void tearDown() { @Override
void afterEach() throws IOException {
super.afterEach();
this.mockServer.verify(); this.mockServer.verify();
} }
@ -71,7 +73,7 @@ class ZipkinRestTemplateSenderTests extends ZipkinHttpSenderTests {
.andExpect(method(HttpMethod.POST)) .andExpect(method(HttpMethod.POST))
.andExpect(content().string("[]")) .andExpect(content().string("[]"))
.andRespond(withStatus(HttpStatus.ACCEPTED)); .andRespond(withStatus(HttpStatus.ACCEPTED));
assertThat(this.sut.check()).isEqualTo(CheckResult.OK); assertThat(this.sender.check()).isEqualTo(CheckResult.OK);
} }
@Test @Test
@ -79,7 +81,7 @@ class ZipkinRestTemplateSenderTests extends ZipkinHttpSenderTests {
this.mockServer.expect(requestTo(ZIPKIN_URL)) this.mockServer.expect(requestTo(ZIPKIN_URL))
.andExpect(method(HttpMethod.POST)) .andExpect(method(HttpMethod.POST))
.andRespond(withStatus(HttpStatus.INTERNAL_SERVER_ERROR)); .andRespond(withStatus(HttpStatus.INTERNAL_SERVER_ERROR));
CheckResult result = this.sut.check(); CheckResult result = this.sender.check();
assertThat(result.ok()).isFalse(); assertThat(result.ok()).isFalse();
assertThat(result.error()).hasMessageContaining("500 Internal Server Error"); assertThat(result.error()).hasMessageContaining("500 Internal Server Error");
} }

@ -17,16 +17,21 @@
package org.springframework.boot.actuate.autoconfigure.tracing.zipkin; package org.springframework.boot.actuate.autoconfigure.tracing.zipkin;
import java.io.IOException; import java.io.IOException;
import java.time.Duration;
import java.util.Base64; import java.util.Base64;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer; import java.util.function.Consumer;
import okhttp3.mockwebserver.MockResponse; import okhttp3.mockwebserver.MockResponse;
import okhttp3.mockwebserver.MockWebServer; import okhttp3.mockwebserver.MockWebServer;
import okhttp3.mockwebserver.QueueDispatcher;
import okhttp3.mockwebserver.RecordedRequest; import okhttp3.mockwebserver.RecordedRequest;
import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource; import org.junit.jupiter.params.provider.ValueSource;
@ -45,33 +50,48 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy;
*/ */
class ZipkinWebClientSenderTests extends ZipkinHttpSenderTests { class ZipkinWebClientSenderTests extends ZipkinHttpSenderTests {
private static ClearableDispatcher dispatcher;
private static MockWebServer mockBackEnd; private static MockWebServer mockBackEnd;
private static String ZIPKIN_URL; private static String ZIPKIN_URL;
@BeforeAll @BeforeAll
static void beforeAll() throws IOException { static void beforeAll() throws IOException {
dispatcher = new ClearableDispatcher();
mockBackEnd = new MockWebServer(); mockBackEnd = new MockWebServer();
mockBackEnd.setDispatcher(dispatcher);
mockBackEnd.start(); mockBackEnd.start();
ZIPKIN_URL = "http://localhost:%s/api/v2/spans".formatted(mockBackEnd.getPort()); ZIPKIN_URL = mockBackEnd.url("/api/v2/spans").toString();
} }
@AfterAll @AfterAll
static void tearDown() throws IOException { static void afterAll() throws IOException {
mockBackEnd.shutdown(); mockBackEnd.shutdown();
} }
@Override @Override
Sender createSut() { @BeforeEach
void beforeEach() throws Exception {
super.beforeEach();
clearResponses();
clearRequests();
}
@Override
Sender createSender() {
return createSender(Duration.ofSeconds(10));
}
Sender createSender(Duration timeout) {
WebClient webClient = WebClient.builder().build(); WebClient webClient = WebClient.builder().build();
return new ZipkinWebClientSender(ZIPKIN_URL, webClient); return new ZipkinWebClientSender(ZIPKIN_URL, webClient, timeout);
} }
@Test @Test
void checkShouldSendEmptySpanList() throws InterruptedException { void checkShouldSendEmptySpanList() throws InterruptedException {
mockBackEnd.enqueue(new MockResponse()); mockBackEnd.enqueue(new MockResponse());
assertThat(this.sut.check()).isEqualTo(CheckResult.OK); assertThat(this.sender.check()).isEqualTo(CheckResult.OK);
requestAssertions((request) -> { requestAssertions((request) -> {
assertThat(request.getMethod()).isEqualTo("POST"); assertThat(request.getMethod()).isEqualTo("POST");
assertThat(request.getBody().readUtf8()).isEqualTo("[]"); assertThat(request.getBody().readUtf8()).isEqualTo("[]");
@ -81,10 +101,9 @@ class ZipkinWebClientSenderTests extends ZipkinHttpSenderTests {
@Test @Test
void checkShouldNotRaiseException() throws InterruptedException { void checkShouldNotRaiseException() throws InterruptedException {
mockBackEnd.enqueue(new MockResponse().setResponseCode(500)); mockBackEnd.enqueue(new MockResponse().setResponseCode(500));
CheckResult result = this.sut.check(); CheckResult result = this.sender.check();
assertThat(result.ok()).isFalse(); assertThat(result.ok()).isFalse();
assertThat(result.error()).hasMessageContaining("500 Internal Server Error"); assertThat(result.error()).hasMessageContaining("500 Internal Server Error");
requestAssertions((request) -> assertThat(request.getMethod()).isEqualTo("POST")); requestAssertions((request) -> assertThat(request.getMethod()).isEqualTo("POST"));
} }
@ -94,7 +113,6 @@ class ZipkinWebClientSenderTests extends ZipkinHttpSenderTests {
mockBackEnd.enqueue(new MockResponse()); mockBackEnd.enqueue(new MockResponse());
List<byte[]> encodedSpans = List.of(toByteArray("span1"), toByteArray("span2")); List<byte[]> encodedSpans = List.of(toByteArray("span1"), toByteArray("span2"));
makeRequest(encodedSpans, async); makeRequest(encodedSpans, async);
requestAssertions((request) -> { requestAssertions((request) -> {
assertThat(request.getMethod()).isEqualTo("POST"); assertThat(request.getMethod()).isEqualTo("POST");
assertThat(request.getHeader("Content-Type")).isEqualTo("application/json"); assertThat(request.getHeader("Content-Type")).isEqualTo("application/json");
@ -115,7 +133,6 @@ class ZipkinWebClientSenderTests extends ZipkinHttpSenderTests {
assertThatThrownBy(() -> makeSyncRequest(Collections.emptyList())) assertThatThrownBy(() -> makeSyncRequest(Collections.emptyList()))
.hasMessageContaining("500 Internal Server Error"); .hasMessageContaining("500 Internal Server Error");
} }
requestAssertions((request) -> assertThat(request.getMethod()).isEqualTo("POST")); requestAssertions((request) -> assertThat(request.getMethod()).isEqualTo("POST"));
} }
@ -126,18 +143,31 @@ class ZipkinWebClientSenderTests extends ZipkinHttpSenderTests {
// This is gzip compressed 10000 times 'a' // This is gzip compressed 10000 times 'a'
byte[] compressed = Base64.getDecoder() byte[] compressed = Base64.getDecoder()
.decode("H4sIAAAAAAAA/+3BMQ0AAAwDIKFLj/k3UR8NcA8AAAAAAAAAAAADUsAZfeASJwAA"); .decode("H4sIAAAAAAAA/+3BMQ0AAAwDIKFLj/k3UR8NcA8AAAAAAAAAAAADUsAZfeASJwAA");
mockBackEnd.enqueue(new MockResponse()); mockBackEnd.enqueue(new MockResponse());
makeRequest(List.of(toByteArray(uncompressed)), async); makeRequest(List.of(toByteArray(uncompressed)), async);
requestAssertions((request) -> { requestAssertions((request) -> {
assertThat(request.getMethod()).isEqualTo("POST"); assertThat(request.getMethod()).isEqualTo("POST");
assertThat(request.getHeader("Content-Type")).isEqualTo("application/json"); assertThat(request.getHeader("Content-Type")).isEqualTo("application/json");
assertThat(request.getHeader("Content-Encoding")).isEqualTo("gzip"); assertThat(request.getHeader("Content-Encoding")).isEqualTo("gzip");
assertThat(request.getBody().readByteArray()).isEqualTo(compressed); assertThat(request.getBody().readByteArray()).isEqualTo(compressed);
}); });
}
@ParameterizedTest
@ValueSource(booleans = { true, false })
void shouldTimeout(boolean async) {
Sender sender = createSender(Duration.ofMillis(1));
MockResponse response = new MockResponse().setResponseCode(200).setHeadersDelay(100, TimeUnit.MILLISECONDS);
mockBackEnd.enqueue(response);
if (async) {
CallbackResult callbackResult = makeAsyncRequest(sender, Collections.emptyList());
assertThat(callbackResult.success()).isFalse();
assertThat(callbackResult.error()).isNotNull().isInstanceOf(TimeoutException.class);
}
else {
assertThatThrownBy(() -> makeSyncRequest(sender, Collections.emptyList()))
.hasCauseInstanceOf(TimeoutException.class);
}
} }
private void requestAssertions(Consumer<RecordedRequest> assertions) throws InterruptedException { private void requestAssertions(Consumer<RecordedRequest> assertions) throws InterruptedException {
@ -145,4 +175,24 @@ class ZipkinWebClientSenderTests extends ZipkinHttpSenderTests {
assertThat(request).satisfies(assertions); assertThat(request).satisfies(assertions);
} }
private static void clearRequests() throws InterruptedException {
RecordedRequest request;
do {
request = mockBackEnd.takeRequest(0, TimeUnit.SECONDS);
}
while (request != null);
}
private static void clearResponses() {
dispatcher.clear();
}
private static class ClearableDispatcher extends QueueDispatcher {
void clear() {
getResponseQueue().clear();
}
}
} }

Loading…
Cancel
Save