Polish 'Apply TTL invocation caching on reactor types'

Extract reactor specific code to an inner class to protect
against ClassNotFound exceptions if reactor is not in use.

Also add support for `Flux`.

See gh-18339
pull/18546/head
Phillip Webb 5 years ago
parent 33d8bfa99d
commit 38968d2fff

@ -20,11 +20,13 @@ import java.time.Duration;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import org.springframework.boot.actuate.endpoint.InvocationContext; import org.springframework.boot.actuate.endpoint.InvocationContext;
import org.springframework.boot.actuate.endpoint.invoke.OperationInvoker; import org.springframework.boot.actuate.endpoint.invoke.OperationInvoker;
import org.springframework.util.Assert; import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.ObjectUtils; import org.springframework.util.ObjectUtils;
/** /**
@ -32,10 +34,14 @@ import org.springframework.util.ObjectUtils;
* configurable time to live. * configurable time to live.
* *
* @author Stephane Nicoll * @author Stephane Nicoll
* @author Christoph Dreis
* @author Phillip Webb
* @since 2.0.0 * @since 2.0.0
*/ */
public class CachingOperationInvoker implements OperationInvoker { public class CachingOperationInvoker implements OperationInvoker {
private static final boolean IS_REACTOR_PRESENT = ClassUtils.isPresent("reactor.core.publisher.Mono", null);
private final OperationInvoker invoker; private final OperationInvoker invoker;
private final long timeToLive; private final long timeToLive;
@ -70,20 +76,13 @@ public class CachingOperationInvoker implements OperationInvoker {
long accessTime = System.currentTimeMillis(); long accessTime = System.currentTimeMillis();
CachedResponse cached = this.cachedResponse; CachedResponse cached = this.cachedResponse;
if (cached == null || cached.isStale(accessTime, this.timeToLive)) { if (cached == null || cached.isStale(accessTime, this.timeToLive)) {
Object response = handleMonoResponse(this.invoker.invoke(context)); Object response = this.invoker.invoke(context);
this.cachedResponse = new CachedResponse(response, accessTime); cached = createCachedResponse(response, accessTime);
return response; this.cachedResponse = cached;
} }
return cached.getResponse(); return cached.getResponse();
} }
private Object handleMonoResponse(Object response) {
if (response instanceof Mono) {
return ((Mono) response).cache(Duration.ofMillis(this.timeToLive));
}
return response;
}
private boolean hasInput(InvocationContext context) { private boolean hasInput(InvocationContext context) {
if (context.getSecurityContext().getPrincipal() != null) { if (context.getSecurityContext().getPrincipal() != null) {
return true; return true;
@ -95,6 +94,13 @@ public class CachingOperationInvoker implements OperationInvoker {
return false; return false;
} }
private CachedResponse createCachedResponse(Object response, long accessTime) {
if (IS_REACTOR_PRESENT) {
return new ReactiveCachedResponse(response, accessTime, this.timeToLive);
}
return new CachedResponse(response, accessTime);
}
/** /**
* Apply caching configuration when appropriate to the given invoker. * Apply caching configuration when appropriate to the given invoker.
* @param invoker the invoker to wrap * @param invoker the invoker to wrap
@ -134,4 +140,25 @@ public class CachingOperationInvoker implements OperationInvoker {
} }
/**
* {@link CachedResponse} variant used when Reactor is present.
*/
static class ReactiveCachedResponse extends CachedResponse {
ReactiveCachedResponse(Object response, long creationTime, long timeToLive) {
super(applyCaching(response, timeToLive), creationTime);
}
private static Object applyCaching(Object response, long timeToLive) {
if (response instanceof Mono) {
return ((Mono<?>) response).cache(Duration.ofMillis(timeToLive));
}
if (response instanceof Flux) {
return ((Flux<?>) response).cache(Duration.ofMillis(timeToLive));
}
return response;
}
}
} }

@ -17,20 +17,19 @@
package org.springframework.boot.actuate.endpoint.invoker.cache; package org.springframework.boot.actuate.endpoint.invoker.cache;
import java.security.Principal; import java.security.Principal;
import java.time.Duration; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import org.springframework.boot.actuate.endpoint.InvocationContext; import org.springframework.boot.actuate.endpoint.InvocationContext;
import org.springframework.boot.actuate.endpoint.SecurityContext; import org.springframework.boot.actuate.endpoint.SecurityContext;
import org.springframework.boot.actuate.endpoint.invoke.MissingParametersException; import org.springframework.boot.actuate.endpoint.invoke.MissingParametersException;
import org.springframework.boot.actuate.endpoint.invoke.OperationInvoker; import org.springframework.boot.actuate.endpoint.invoke.OperationInvoker;
import org.springframework.boot.test.rule.OutputCapture;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
@ -44,12 +43,11 @@ import static org.mockito.Mockito.verifyNoMoreInteractions;
* Tests for {@link CachingOperationInvoker}. * Tests for {@link CachingOperationInvoker}.
* *
* @author Stephane Nicoll * @author Stephane Nicoll
* @author Christoph Dreis
* @author Phillip Webb
*/ */
public class CachingOperationInvokerTests { public class CachingOperationInvokerTests {
@Rule
public OutputCapture outputCapture = new OutputCapture();
@Test @Test
public void createInstanceWithTtlSetToZero() { public void createInstanceWithTtlSetToZero() {
assertThatIllegalArgumentException() assertThatIllegalArgumentException()
@ -72,17 +70,26 @@ public class CachingOperationInvokerTests {
@Test @Test
public void cacheInTtlWithMonoResponse() { public void cacheInTtlWithMonoResponse() {
MonoOperationInvoker.invocations = 0;
MonoOperationInvoker target = new MonoOperationInvoker(); MonoOperationInvoker target = new MonoOperationInvoker();
InvocationContext context = new InvocationContext(mock(SecurityContext.class), Collections.emptyMap()); InvocationContext context = new InvocationContext(mock(SecurityContext.class), Collections.emptyMap());
CachingOperationInvoker invoker = new CachingOperationInvoker(target, 500L); CachingOperationInvoker invoker = new CachingOperationInvoker(target, 500L);
Object monoResponse = invoker.invoke(context); Object response = ((Mono<?>) invoker.invoke(context)).block();
assertThat(monoResponse).isInstanceOf(Mono.class); Object cachedResponse = ((Mono<?>) invoker.invoke(context)).block();
Object response = ((Mono) monoResponse).block(Duration.ofSeconds(30)); assertThat(MonoOperationInvoker.invocations).isEqualTo(1);
Object cachedMonoResponse = invoker.invoke(context); assertThat(response).isSameAs(cachedResponse);
assertThat(cachedMonoResponse).isInstanceOf(Mono.class); }
Object cachedResponse = ((Mono) cachedMonoResponse).block(Duration.ofSeconds(30));
@Test
public void cacheInTtlWithFluxResponse() {
FluxOperationInvoker.invocations = 0;
FluxOperationInvoker target = new FluxOperationInvoker();
InvocationContext context = new InvocationContext(mock(SecurityContext.class), Collections.emptyMap());
CachingOperationInvoker invoker = new CachingOperationInvoker(target, 500L);
Object response = ((Flux<?>) invoker.invoke(context)).blockLast();
Object cachedResponse = ((Flux<?>) invoker.invoke(context)).blockLast();
assertThat(FluxOperationInvoker.invocations).isEqualTo(1);
assertThat(response).isSameAs(cachedResponse); assertThat(response).isSameAs(cachedResponse);
assertThat(this.outputCapture.toString()).containsOnlyOnce("invoked");
} }
private void assertCacheIsUsed(Map<String, Object> parameters) { private void assertCacheIsUsed(Map<String, Object> parameters) {
@ -144,14 +151,28 @@ public class CachingOperationInvokerTests {
private static class MonoOperationInvoker implements OperationInvoker { private static class MonoOperationInvoker implements OperationInvoker {
static int invocations;
@Override @Override
public Object invoke(InvocationContext context) throws MissingParametersException { public Object invoke(InvocationContext context) throws MissingParametersException {
return Mono.fromCallable(this::printInvocation); return Mono.fromCallable(() -> {
invocations++;
return Mono.just("test");
});
} }
private Mono<String> printInvocation() { }
System.out.println("MonoOperationInvoker invoked");
return Mono.just("test"); private static class FluxOperationInvoker implements OperationInvoker {
static int invocations;
@Override
public Object invoke(InvocationContext context) throws MissingParametersException {
return Flux.fromIterable(() -> {
invocations++;
return Arrays.asList("spring", "boot").iterator();
});
} }
} }

Loading…
Cancel
Save