Merge pull request #20205 from wonwoo

* wonwoo/master:
  Polish
  Configure codec buffer size in ES Reactive Rest client

Closes gh-20205
pull/20830/head
Brian Clozel 5 years ago
commit 1199b7c014

@ -35,7 +35,8 @@ public class CodecProperties {
/**
* Limit on the number of bytes that can be buffered whenever the input stream needs
* to be aggregated. By default this is not set, in which case individual codec
* to be aggregated. This applies only to the auto-configured WebFlux server and
* WebClient instances. By default this is not set, in which case individual codec
* defaults apply. Most codecs are limited to 256K by default.
*/
private DataSize maxInMemorySize;

@ -1,5 +1,5 @@
/*
* Copyright 2012-2019 the original author or authors.
* Copyright 2012-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -29,6 +29,8 @@ import org.springframework.data.elasticsearch.client.ClientConfiguration;
import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient;
import org.springframework.data.elasticsearch.client.reactive.ReactiveRestClients;
import org.springframework.http.HttpHeaders;
import org.springframework.util.unit.DataSize;
import org.springframework.web.reactive.function.client.ExchangeStrategies;
import org.springframework.web.reactive.function.client.WebClient;
/**
@ -52,6 +54,7 @@ public class ReactiveRestClientAutoConfiguration {
builder.usingSsl();
}
configureTimeouts(builder, properties);
configureExchangeStrategies(builder, properties);
return builder.build();
}
@ -67,6 +70,19 @@ public class ReactiveRestClientAutoConfiguration {
});
}
private void configureExchangeStrategies(ClientConfiguration.TerminalClientConfigurationBuilder builder,
ReactiveRestClientProperties properties) {
PropertyMapper map = PropertyMapper.get();
builder.withWebClientConfigurer((webClient) -> {
ExchangeStrategies exchangeStrategies = ExchangeStrategies.builder()
.codecs((configurer) -> map.from(properties.getMaxInMemorySize()).whenNonNull()
.asInt(DataSize::toBytes)
.to((maxInMemorySize) -> configurer.defaultCodecs().maxInMemorySize(maxInMemorySize)))
.build();
return webClient.mutate().exchangeStrategies(exchangeStrategies).build();
});
}
@Bean
@ConditionalOnMissingBean
public ReactiveElasticsearchClient reactiveElasticsearchClient(ClientConfiguration clientConfiguration) {

@ -1,5 +1,5 @@
/*
* Copyright 2012-2019 the original author or authors.
* Copyright 2012-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -22,6 +22,7 @@ import java.util.Collections;
import java.util.List;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.util.unit.DataSize;
/**
* Configuration properties for Elasticsearch Reactive REST clients.
@ -62,6 +63,12 @@ public class ReactiveRestClientProperties {
*/
private Duration socketTimeout;
/**
* Limit on the number of bytes that can be buffered whenever the input stream needs
* to be aggregated.
*/
private DataSize maxInMemorySize;
public List<String> getEndpoints() {
return this.endpoints;
}
@ -110,4 +117,12 @@ public class ReactiveRestClientProperties {
this.socketTimeout = socketTimeout;
}
public DataSize getMaxInMemorySize() {
return this.maxInMemorySize;
}
public void setMaxInMemorySize(DataSize maxInMemorySize) {
this.maxInMemorySize = maxInMemorySize;
}
}

Loading…
Cancel
Save