|
|
|
@ -24,6 +24,7 @@ import io.micrometer.core.instrument.Tag;
|
|
|
|
|
import reactor.core.publisher.Mono;
|
|
|
|
|
import reactor.core.publisher.SignalType;
|
|
|
|
|
import reactor.util.context.Context;
|
|
|
|
|
import reactor.util.context.ContextView;
|
|
|
|
|
|
|
|
|
|
import org.springframework.boot.actuate.metrics.AutoTimer;
|
|
|
|
|
import org.springframework.web.reactive.function.client.ClientRequest;
|
|
|
|
@ -74,12 +75,12 @@ public class MetricsWebClientFilterFunction implements ExchangeFilterFunction {
|
|
|
|
|
return next.exchange(request);
|
|
|
|
|
}
|
|
|
|
|
return next.exchange(request).as((responseMono) -> instrumentResponse(request, responseMono))
|
|
|
|
|
.subscriberContext(this::putStartTime);
|
|
|
|
|
.contextWrite(this::putStartTime);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private Mono<ClientResponse> instrumentResponse(ClientRequest request, Mono<ClientResponse> responseMono) {
|
|
|
|
|
final AtomicBoolean responseReceived = new AtomicBoolean();
|
|
|
|
|
return Mono.deferWithContext((ctx) -> responseMono.doOnEach((signal) -> {
|
|
|
|
|
return Mono.deferContextual((ctx) -> responseMono.doOnEach((signal) -> {
|
|
|
|
|
if (signal.isOnNext() || signal.isOnError()) {
|
|
|
|
|
responseReceived.set(true);
|
|
|
|
|
Iterable<Tag> tags = this.tagProvider.tags(request, signal.get(), signal.getThrowable());
|
|
|
|
@ -98,7 +99,7 @@ public class MetricsWebClientFilterFunction implements ExchangeFilterFunction {
|
|
|
|
|
.register(this.meterRegistry).record(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private Long getStartTime(Context context) {
|
|
|
|
|
private Long getStartTime(ContextView context) {
|
|
|
|
|
return context.get(METRICS_WEBCLIENT_START_TIME);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|