Align embedded Jetty's graceful shutdown behaviour with standalone

Standalone Jetty will, by default, include a Connection: close header
in all responses once graceful shutdown has begun. Previously, the
way in which we were shutting Jetty down did not cause this to happen.

This commit updates JettyGracefulShutdown to shut down each connector.
This causes Jetty to send the Connection: close header, aligning its
behaviour more closely with what it does when used standalone. The
tests have also been updated to verify this behaviour and to ensure
that the correct port is used for requests even once the connector is
no longer bound and the web server no longer knows its ephemeral port.

See gh-4657
pull/20472/head
Andy Wilkinson 5 years ago
parent 5bb66e1b85
commit b8cb61bbe0

@ -17,13 +17,13 @@
package org.springframework.boot.web.embedded.jetty;
import java.time.Duration;
import java.util.concurrent.ExecutionException;
import java.util.function.Supplier;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.springframework.boot.web.server.GracefulShutdown;
@ -55,7 +55,15 @@ class JettyGracefulShutdown implements GracefulShutdown {
logger.info("Commencing graceful shutdown, allowing up to " + this.period.getSeconds()
+ "s for active requests to complete");
for (Connector connector : this.server.getConnectors()) {
((ServerConnector) connector).setAccepting(false);
try {
connector.shutdown().get();
}
catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
catch (ExecutionException ex) {
// Continue
}
}
this.shuttingDown = true;
long end = System.currentTimeMillis() + this.period.toMillis();

@ -16,14 +16,12 @@
package org.springframework.boot.web.embedded.jetty;
import java.net.ConnectException;
import java.net.InetAddress;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Server;
@ -39,6 +37,7 @@ import org.springframework.http.client.reactive.JettyResourceFactory;
import org.springframework.http.server.reactive.HttpHandler;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.inOrder;
@ -128,32 +127,18 @@ class JettyReactiveWebServerFactoryTests extends AbstractReactiveWebServerFactor
BlockingHandler blockingHandler = new BlockingHandler();
this.webServer = factory.getWebServer(blockingHandler);
this.webServer.start();
int port = this.webServer.getPort();
CountDownLatch responseLatch = new CountDownLatch(1);
getWebClient().build().get().retrieve().toBodilessEntity().subscribe((response) -> responseLatch.countDown());
getWebClient(port).build().get().retrieve().toBodilessEntity()
.subscribe((response) -> responseLatch.countDown());
blockingHandler.awaitQueue();
Future<Boolean> shutdownResult = initiateGracefulShutdown();
// We need to make two requests as Jetty accepts one additional request after a
// connector has been told to stop accepting requests
Mono<ResponseEntity<Void>> unconnectableRequest1 = getWebClient().build().get().retrieve().toBodilessEntity();
Mono<ResponseEntity<Void>> unconnectableRequest2 = getWebClient().build().get().retrieve().toBodilessEntity();
Mono<ResponseEntity<Void>> unconnectableRequest = getWebClient(port).build().get().retrieve()
.toBodilessEntity();
assertThat(shutdownResult.get()).isEqualTo(false);
blockingHandler.completeOne();
responseLatch.await(5, TimeUnit.SECONDS);
this.webServer.stop();
List<Object> results = new ArrayList<>();
try {
results.add(unconnectableRequest1.block());
}
catch (Exception ex) {
results.add(ex);
}
try {
results.add(unconnectableRequest2.block());
}
catch (Exception ex) {
results.add(ex);
}
assertThat(results).anySatisfy((result) -> assertThat(result).isInstanceOf(Exception.class));
assertThatExceptionOfType(RuntimeException.class).isThrownBy(() -> unconnectableRequest.block())
.withCauseInstanceOf(ConnectException.class);
}
@Override

@ -21,7 +21,7 @@ import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@ -30,8 +30,11 @@ import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;
import javax.servlet.ServletRegistration.Dynamic;
import org.apache.http.Header;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
import org.apache.http.conn.HttpHostConnectException;
import org.apache.http.impl.client.HttpClients;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Server;
@ -189,19 +192,80 @@ class JettyServletWebServerFactoryTests extends AbstractJettyServletWebServerFac
registration.setAsyncSupported(true);
});
this.webServer.start();
Future<Object> request = initiateGetRequest("/blocking");
int port = this.webServer.getPort();
Future<Object> request = initiateGetRequest(port, "/blocking");
blockingServlet.awaitQueue();
Future<Boolean> shutdownResult = initiateGracefulShutdown();
// Jetty accepts one additional request after a connector has been told to stop
// accepting requests
Future<Object> unconnectableRequest1 = initiateGetRequest("/");
Future<Object> unconnectableRequest2 = initiateGetRequest("/");
Future<Object> unconnectableRequest = initiateGetRequest(port, "/");
assertThat(shutdownResult.get()).isEqualTo(false);
blockingServlet.admitOne();
assertThat(request.get()).isInstanceOf(HttpResponse.class);
Object response = request.get();
assertThat(response).isInstanceOf(HttpResponse.class);
assertThat(unconnectableRequest.get()).isInstanceOf(HttpHostConnectException.class);
this.webServer.stop();
List<Object> results = Arrays.asList(unconnectableRequest1.get(), unconnectableRequest2.get());
assertThat(results).anySatisfy((result) -> assertThat(result).isInstanceOf(HttpHostConnectException.class));
}
@Test
void whenServerIsShuttingDownGracefullyThenResponseToRequestOnIdleConnectionWillHaveAConnectionCloseHeader()
throws Exception {
AbstractServletWebServerFactory factory = getFactory();
Shutdown shutdown = new Shutdown();
shutdown.setGracePeriod(Duration.ofSeconds(5));
factory.setShutdown(shutdown);
BlockingServlet blockingServlet = new BlockingServlet();
this.webServer = factory.getWebServer((context) -> {
Dynamic registration = context.addServlet("blockingServlet", blockingServlet);
registration.addMapping("/blocking");
registration.setAsyncSupported(true);
});
this.webServer.start();
int port = this.webServer.getPort();
HttpClient client = HttpClients.createMinimal();
Future<Object> request = initiateGetRequest(client, port, "/blocking");
blockingServlet.awaitQueue();
blockingServlet.admitOne();
Object response = request.get();
assertThat(response).isInstanceOf(HttpResponse.class);
assertThat(((HttpResponse) response).getStatusLine().getStatusCode()).isEqualTo(200);
assertThat(((HttpResponse) response).getFirstHeader("Connection")).isNull();
this.webServer.shutDownGracefully();
request = initiateGetRequest(client, port, "/blocking");
blockingServlet.awaitQueue();
blockingServlet.admitOne();
response = request.get();
assertThat(response).isInstanceOf(HttpResponse.class);
assertThat(((HttpResponse) response).getStatusLine().getStatusCode()).isEqualTo(200);
assertThat(((HttpResponse) response).getFirstHeader("Connection")).isNotNull().extracting(Header::getValue)
.isEqualTo("close");
this.webServer.stop();
}
@Test
void whenARequestCompletesAfterGracefulShutdownHasBegunThenItHasAConnectionCloseHeader()
throws InterruptedException, ExecutionException {
AbstractServletWebServerFactory factory = getFactory();
Shutdown shutdown = new Shutdown();
shutdown.setGracePeriod(Duration.ofSeconds(30));
factory.setShutdown(shutdown);
BlockingServlet blockingServlet = new BlockingServlet();
this.webServer = factory.getWebServer((context) -> {
Dynamic registration = context.addServlet("blockingServlet", blockingServlet);
registration.addMapping("/blocking");
registration.setAsyncSupported(true);
});
this.webServer.start();
int port = this.webServer.getPort();
Future<Object> request = initiateGetRequest(port, "/blocking");
blockingServlet.awaitQueue();
long start = System.currentTimeMillis();
Future<Boolean> shutdownResult = initiateGracefulShutdown();
blockingServlet.admitOne();
assertThat(shutdownResult.get()).isTrue();
long end = System.currentTimeMillis();
assertThat(end - start).isLessThanOrEqualTo(30000);
Object requestResult = request.get();
assertThat(requestResult).isInstanceOf(HttpResponse.class);
assertThat(((HttpResponse) requestResult).getFirstHeader("Connection").getValue()).isEqualTo("close");
}
private Ssl getSslSettings(String... enabledProtocols) {

@ -112,7 +112,7 @@ class NettyReactiveWebServerFactoryTests extends AbstractReactiveWebServerFactor
BlockingHandler blockingHandler = new BlockingHandler();
this.webServer = factory.getWebServer(blockingHandler);
this.webServer.start();
WebClient webClient = getWebClient().build();
WebClient webClient = getWebClient(this.webServer.getPort()).build();
webClient.get().retrieve().toBodilessEntity().subscribe();
blockingHandler.awaitQueue();
Future<Boolean> shutdownResult = initiateGracefulShutdown();

@ -271,7 +271,7 @@ class TomcatReactiveWebServerFactoryTests extends AbstractReactiveWebServerFacto
BlockingHandler blockingHandler = new BlockingHandler();
this.webServer = factory.getWebServer(blockingHandler);
this.webServer.start();
WebClient webClient = getWebClient().build();
WebClient webClient = getWebClient(this.webServer.getPort()).build();
webClient.get().retrieve().toBodilessEntity().subscribe();
blockingHandler.awaitQueue();
Future<Boolean> shutdownResult = initiateGracefulShutdown();

@ -574,10 +574,11 @@ class TomcatServletWebServerFactoryTests extends AbstractServletWebServerFactory
registration.setAsyncSupported(true);
});
this.webServer.start();
Future<Object> request = initiateGetRequest("/blocking");
int port = this.webServer.getPort();
Future<Object> request = initiateGetRequest(port, "/blocking");
blockingServlet.awaitQueue();
Future<Boolean> shutdownResult = initiateGracefulShutdown();
Future<Object> unconnectableRequest = initiateGetRequest("/");
Future<Object> unconnectableRequest = initiateGetRequest(port, "/");
assertThat(shutdownResult.get()).isEqualTo(false);
blockingServlet.admitOne();
assertThat(request.get()).isInstanceOf(HttpResponse.class);

@ -118,7 +118,7 @@ class UndertowReactiveWebServerFactoryTests extends AbstractReactiveWebServerFac
BlockingHandler blockingHandler = new BlockingHandler();
this.webServer = factory.getWebServer(blockingHandler);
this.webServer.start();
WebClient webClient = getWebClient().build();
WebClient webClient = getWebClient(this.webServer.getPort()).build();
webClient.get().retrieve().toBodilessEntity().subscribe();
blockingHandler.awaitQueue();
Future<Boolean> shutdownResult = initiateGracefulShutdown();
@ -146,7 +146,7 @@ class UndertowReactiveWebServerFactoryTests extends AbstractReactiveWebServerFac
assertThat(accessLogDirectory.listFiles()).isEmpty();
this.webServer = factory.getWebServer(new EchoHandler());
this.webServer.start();
WebClient client = getWebClient().build();
WebClient client = getWebClient(this.webServer.getPort()).build();
Mono<String> result = client.post().uri("/test").contentType(MediaType.TEXT_PLAIN)
.body(BodyInserters.fromValue("Hello World")).exchange()
.flatMap((response) -> response.bodyToMono(String.class));

@ -190,10 +190,11 @@ class UndertowServletWebServerFactoryTests extends AbstractServletWebServerFacto
registration.setAsyncSupported(true);
});
this.webServer.start();
Future<Object> request = initiateGetRequest("/blocking");
int port = this.webServer.getPort();
Future<Object> request = initiateGetRequest(port, "/blocking");
blockingServlet.awaitQueue();
Future<Boolean> shutdownResult = initiateGracefulShutdown();
Future<Object> rejectedRequest = initiateGetRequest("/");
Future<Object> rejectedRequest = initiateGetRequest(port, "/");
assertThat(shutdownResult.get()).isEqualTo(false);
blockingServlet.admitOne();
assertThat(request.get()).isInstanceOf(HttpResponse.class);

@ -109,8 +109,8 @@ public abstract class AbstractReactiveWebServerFactoryTests {
this.webServer.start();
return port;
});
Mono<String> result = getWebClient().build().post().uri("/test").contentType(MediaType.TEXT_PLAIN)
.body(BodyInserters.fromValue("Hello World")).exchange()
Mono<String> result = getWebClient(this.webServer.getPort()).build().post().uri("/test")
.contentType(MediaType.TEXT_PLAIN).body(BodyInserters.fromValue("Hello World")).exchange()
.flatMap((response) -> response.bodyToMono(String.class));
assertThat(result.block(Duration.ofSeconds(30))).isEqualTo("Hello World");
assertThat(this.webServer.getPort()).isEqualTo(specificPort);
@ -269,12 +269,12 @@ public abstract class AbstractReactiveWebServerFactoryTests {
StepVerifier.create(result).expectError(SSLException.class).verify(Duration.ofSeconds(10));
}
protected WebClient.Builder getWebClient() {
return getWebClient(HttpClient.create().wiretap(true));
protected WebClient.Builder getWebClient(int port) {
return getWebClient(HttpClient.create().wiretap(true), port);
}
protected WebClient.Builder getWebClient(HttpClient client) {
InetSocketAddress address = new InetSocketAddress(this.webServer.getPort());
protected WebClient.Builder getWebClient(HttpClient client, int port) {
InetSocketAddress address = new InetSocketAddress(port);
String baseUrl = "http://" + address.getHostString() + ":" + address.getPort();
return WebClient.builder().clientConnector(new ReactorClientHttpConnector(client)).baseUrl(baseUrl);
}
@ -368,7 +368,8 @@ public abstract class AbstractReactiveWebServerFactoryTests {
BlockingHandler blockingHandler = new BlockingHandler();
this.webServer = factory.getWebServer(blockingHandler);
this.webServer.start();
Mono<ResponseEntity<Void>> request = getWebClient().build().get().retrieve().toBodilessEntity();
Mono<ResponseEntity<Void>> request = getWebClient(this.webServer.getPort()).build().get().retrieve()
.toBodilessEntity();
AtomicReference<ResponseEntity<Void>> responseReference = new AtomicReference<>();
CountDownLatch responseLatch = new CountDownLatch(1);
request.subscribe((response) -> {
@ -399,7 +400,8 @@ public abstract class AbstractReactiveWebServerFactoryTests {
BlockingHandler blockingHandler = new BlockingHandler();
this.webServer = factory.getWebServer(blockingHandler);
this.webServer.start();
Mono<ResponseEntity<Void>> request = getWebClient().build().get().retrieve().toBodilessEntity();
Mono<ResponseEntity<Void>> request = getWebClient(this.webServer.getPort()).build().get().retrieve()
.toBodilessEntity();
AtomicReference<ResponseEntity<Void>> responseReference = new AtomicReference<>();
CountDownLatch responseLatch = new CountDownLatch(1);
request.subscribe((response) -> {
@ -437,7 +439,7 @@ public abstract class AbstractReactiveWebServerFactoryTests {
.tcpConfiguration((tcpClient) -> tcpClient.doOnConnected(
(connection) -> connection.channel().pipeline().addBefore(NettyPipeline.HttpDecompressor,
"CompressionTest", new CompressionDetectionHandler())));
return getWebClient(client).build();
return getWebClient(client, this.webServer.getPort()).build();
}
protected void assertResponseIsCompressed(ResponseEntity<Void> response) {
@ -451,8 +453,8 @@ public abstract class AbstractReactiveWebServerFactoryTests {
protected void assertForwardHeaderIsUsed(AbstractReactiveWebServerFactory factory) {
this.webServer = factory.getWebServer(new XForwardedHandler());
this.webServer.start();
String body = getWebClient().build().get().header("X-Forwarded-Proto", "https").retrieve()
.bodyToMono(String.class).block(Duration.ofSeconds(30));
String body = getWebClient(this.webServer.getPort()).build().get().header("X-Forwarded-Proto", "https")
.retrieve().bodyToMono(String.class).block(Duration.ofSeconds(30));
assertThat(body).isEqualTo("https");
}

@ -1041,7 +1041,8 @@ public abstract class AbstractServletWebServerFactoryTests {
registration.addMapping("/blocking");
});
this.webServer.start();
Future<Object> request = initiateGetRequest("/blocking");
int port = this.webServer.getPort();
Future<Object> request = initiateGetRequest(port, "/blocking");
blockingServlet.awaitQueue();
long start = System.currentTimeMillis();
assertThat(this.webServer.shutDownGracefully()).isFalse();
@ -1066,7 +1067,8 @@ public abstract class AbstractServletWebServerFactoryTests {
registration.setAsyncSupported(true);
});
this.webServer.start();
Future<Object> request = initiateGetRequest("/blocking");
int port = this.webServer.getPort();
Future<Object> request = initiateGetRequest(port, "/blocking");
blockingServlet.awaitQueue();
long start = System.currentTimeMillis();
Future<Boolean> shutdownResult = initiateGracefulShutdown();
@ -1090,7 +1092,8 @@ public abstract class AbstractServletWebServerFactoryTests {
registration.setAsyncSupported(true);
});
this.webServer.start();
Future<Object> request = initiateGetRequest("/blockingAsync");
int port = this.webServer.getPort();
Future<Object> request = initiateGetRequest(port, "/blockingAsync");
blockingAsyncServlet.awaitQueue();
long start = System.currentTimeMillis();
assertThat(this.webServer.shutDownGracefully()).isFalse();
@ -1115,7 +1118,8 @@ public abstract class AbstractServletWebServerFactoryTests {
registration.setAsyncSupported(true);
});
this.webServer.start();
Future<Object> request = initiateGetRequest("/blockingAsync");
int port = this.webServer.getPort();
Future<Object> request = initiateGetRequest(port, "/blockingAsync");
blockingAsyncServlet.awaitQueue();
long start = System.currentTimeMillis();
Future<Boolean> shutdownResult = initiateGracefulShutdown();
@ -1133,15 +1137,14 @@ public abstract class AbstractServletWebServerFactoryTests {
return future;
}
protected Future<Object> initiateGetRequest(String path) {
return initiateGetRequest(HttpClients.createDefault(), path);
protected Future<Object> initiateGetRequest(int port, String path) {
return initiateGetRequest(HttpClients.createMinimal(), port, path);
}
protected Future<Object> initiateGetRequest(HttpClient httpClient, String path) {
protected Future<Object> initiateGetRequest(HttpClient httpClient, int port, String path) {
RunnableFuture<Object> getRequest = new FutureTask<>(() -> {
try {
HttpResponse response = httpClient
.execute(new HttpGet("http://localhost:" + this.webServer.getPort() + path));
HttpResponse response = httpClient.execute(new HttpGet("http://localhost:" + port + path));
response.getEntity().getContent().close();
return response;
}

Loading…
Cancel
Save