Polish adapt to and from Mono conversion

See gh-10112
pull/10130/head
Phillip Webb 7 years ago
parent 7fc12bc8a3
commit 9cb0a81e50

@ -106,12 +106,13 @@ public class JerseyEndpointResourceFactory {
private static final List<Function<Object, Object>> bodyConverters; private static final List<Function<Object, Object>> bodyConverters;
static { static {
bodyConverters = new ArrayList<>(); List<Function<Object, Object>> converters = new ArrayList<>();
bodyConverters.add(new ResourceBodyConverter()); converters.add(new ResourceBodyConverter());
if (ClassUtils.isPresent("reactor.core.publisher.Mono", if (ClassUtils.isPresent("reactor.core.publisher.Mono",
EndpointInvokingInflector.class.getClassLoader())) { EndpointInvokingInflector.class.getClassLoader())) {
bodyConverters.add(new MonoBodyConverter()); converters.add(new MonoBodyConverter());
} }
bodyConverters = Collections.unmodifiableList(converters);
} }
private final OperationInvoker operationInvoker; private final OperationInvoker operationInvoker;
@ -193,11 +194,8 @@ public class JerseyEndpointResourceFactory {
} }
private Object convertIfNecessary(Object body) throws IOException { private Object convertIfNecessary(Object body) throws IOException {
if (body instanceof org.springframework.core.io.Resource) { for (Function<Object, Object> converter : bodyConverters) {
return ((org.springframework.core.io.Resource) body).getInputStream(); body = converter.apply(body);
}
if (body instanceof Mono) {
return ((Mono<?>) body).block();
} }
return body; return body;
} }

@ -24,6 +24,7 @@ import java.util.Map;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.core.scheduler.Schedulers; import reactor.core.scheduler.Schedulers;
import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.InitializingBean;
@ -212,9 +213,8 @@ public class WebEndpointReactiveHandlerMapping extends RequestMappingInfoHandler
return Mono.from(result).map(this::toResponseEntity) return Mono.from(result).map(this::toResponseEntity)
.onErrorReturn(ParameterMappingException.class, .onErrorReturn(ParameterMappingException.class,
new ResponseEntity<>(HttpStatus.BAD_REQUEST)) new ResponseEntity<>(HttpStatus.BAD_REQUEST))
.defaultIfEmpty( .defaultIfEmpty(new ResponseEntity<>(httpMethod == HttpMethod.GET
new ResponseEntity<>(httpMethod == HttpMethod.GET ? HttpStatus.NOT_FOUND : HttpStatus.NO_CONTENT));
? HttpStatus.NOT_FOUND : HttpStatus.NO_CONTENT));
} }
private ResponseEntity<Object> toResponseEntity(Object response) { private ResponseEntity<Object> toResponseEntity(Object response) {
@ -277,18 +277,20 @@ public class WebEndpointReactiveHandlerMapping extends RequestMappingInfoHandler
@Override @Override
public Object invoke(Map<String, Object> arguments) { public Object invoke(Map<String, Object> arguments) {
return Mono.create((sink) -> { return Mono.create((sink) -> {
Schedulers.elastic().schedule(() -> { Schedulers.elastic().schedule(() -> invoke(arguments, sink));
try {
Object result = this.delegate.invoke(arguments);
sink.success(result);
}
catch (Exception ex) {
sink.error(ex);
}
});
}); });
} }
private void invoke(Map<String, Object> arguments, MonoSink<Object> sink) {
try {
Object result = this.delegate.invoke(arguments);
sink.success(result);
}
catch (Exception ex) {
sink.error(ex);
}
}
} }
} }

Loading…
Cancel
Save