Merge pull request #31238 from ittays

* pr/31238:
  Polish 'Allow spring.data.cassandra.config file to override default values'
  Allow spring.data.cassandra.config file to override default values

Closes gh-31238
pull/32521/head
Phillip Webb 2 years ago
commit 9e98f694c7

@ -1,5 +1,5 @@
/* /*
* Copyright 2012-2021 the original author or authors. * Copyright 2012-2022 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -19,6 +19,7 @@ package org.springframework.boot.autoconfigure.cassandra;
import java.io.IOException; import java.io.IOException;
import java.security.NoSuchAlgorithmException; import java.security.NoSuchAlgorithmException;
import java.time.Duration; import java.time.Duration;
import java.util.Collections;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -63,6 +64,7 @@ import org.springframework.core.io.Resource;
* @author Eddú Meléndez * @author Eddú Meléndez
* @author Stephane Nicoll * @author Stephane Nicoll
* @author Steffen F. Qvistgaard * @author Steffen F. Qvistgaard
* @author Ittay Stern
* @since 1.3.0 * @since 1.3.0
*/ */
@Configuration(proxyBeanMethods = false) @Configuration(proxyBeanMethods = false)
@ -70,6 +72,15 @@ import org.springframework.core.io.Resource;
@EnableConfigurationProperties(CassandraProperties.class) @EnableConfigurationProperties(CassandraProperties.class)
public class CassandraAutoConfiguration { public class CassandraAutoConfiguration {
private static final Config SPRING_BOOT_DEFAULTS;
static {
CassandraDriverOptions options = new CassandraDriverOptions();
options.add(DefaultDriverOption.CONTACT_POINTS, Collections.singletonList("127.0.0.1:9042"));
options.add(DefaultDriverOption.PROTOCOL_COMPRESSION, "none");
options.add(DefaultDriverOption.CONTROL_CONNECTION_TIMEOUT, (int) Duration.ofSeconds(5).toMillis());
SPRING_BOOT_DEFAULTS = options.build();
}
@Bean @Bean
@ConditionalOnMissingBean @ConditionalOnMissingBean
@Lazy @Lazy
@ -118,36 +129,35 @@ public class CassandraAutoConfiguration {
} }
private Config cassandraConfiguration(CassandraProperties properties) { private Config cassandraConfiguration(CassandraProperties properties) {
Config config = mapConfig(properties);
Resource configFile = properties.getConfig();
return (configFile != null) ? applyDefaultFallback(config.withFallback(loadConfig(configFile)))
: applyDefaultFallback(config);
}
private Config applyDefaultFallback(Config config) {
ConfigFactory.invalidateCaches(); ConfigFactory.invalidateCaches();
return ConfigFactory.defaultOverrides().withFallback(config).withFallback(ConfigFactory.defaultReference()) Config config = ConfigFactory.defaultOverrides();
.resolve(); config = config.withFallback(mapConfig(properties));
if (properties.getConfig() != null) {
config = config.withFallback(loadConfig(properties.getConfig()));
}
config = config.withFallback(SPRING_BOOT_DEFAULTS);
config = config.withFallback(ConfigFactory.defaultReference());
return config.resolve();
} }
private Config loadConfig(Resource config) { private Config loadConfig(Resource resource) {
try { try {
return ConfigFactory.parseURL(config.getURL()); return ConfigFactory.parseURL(resource.getURL());
} }
catch (IOException ex) { catch (IOException ex) {
throw new IllegalStateException("Failed to load cassandra configuration from " + config, ex); throw new IllegalStateException("Failed to load cassandra configuration from " + resource, ex);
} }
} }
private Config mapConfig(CassandraProperties properties) { private Config mapConfig(CassandraProperties properties) {
CassandraDriverOptions options = new CassandraDriverOptions(); CassandraDriverOptions options = new CassandraDriverOptions();
PropertyMapper map = PropertyMapper.get(); PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
map.from(properties.getSessionName()).whenHasText() map.from(properties.getSessionName()).whenHasText()
.to((sessionName) -> options.add(DefaultDriverOption.SESSION_NAME, sessionName)); .to((sessionName) -> options.add(DefaultDriverOption.SESSION_NAME, sessionName));
map.from(properties::getUsername).whenNonNull() map.from(properties::getUsername)
.to((username) -> options.add(DefaultDriverOption.AUTH_PROVIDER_USER_NAME, username) .to((username) -> options.add(DefaultDriverOption.AUTH_PROVIDER_USER_NAME, username)
.add(DefaultDriverOption.AUTH_PROVIDER_PASSWORD, properties.getPassword())); .add(DefaultDriverOption.AUTH_PROVIDER_PASSWORD, properties.getPassword()));
map.from(properties::getCompression).whenNonNull() map.from(properties::getCompression)
.to((compression) -> options.add(DefaultDriverOption.PROTOCOL_COMPRESSION, compression)); .to((compression) -> options.add(DefaultDriverOption.PROTOCOL_COMPRESSION, compression));
mapConnectionOptions(properties, options); mapConnectionOptions(properties, options);
mapPoolingOptions(properties, options); mapPoolingOptions(properties, options);
@ -155,7 +165,7 @@ public class CassandraAutoConfiguration {
mapControlConnectionOptions(properties, options); mapControlConnectionOptions(properties, options);
map.from(mapContactPoints(properties)) map.from(mapContactPoints(properties))
.to((contactPoints) -> options.add(DefaultDriverOption.CONTACT_POINTS, contactPoints)); .to((contactPoints) -> options.add(DefaultDriverOption.CONTACT_POINTS, contactPoints));
map.from(properties.getLocalDatacenter()).to( map.from(properties.getLocalDatacenter()).whenHasText().to(
(localDatacenter) -> options.add(DefaultDriverOption.LOAD_BALANCING_LOCAL_DATACENTER, localDatacenter)); (localDatacenter) -> options.add(DefaultDriverOption.LOAD_BALANCING_LOCAL_DATACENTER, localDatacenter));
return options.build(); return options.build();
} }
@ -210,8 +220,12 @@ public class CassandraAutoConfiguration {
} }
private List<String> mapContactPoints(CassandraProperties properties) { private List<String> mapContactPoints(CassandraProperties properties) {
if (properties.getContactPoints() != null) {
return properties.getContactPoints().stream() return properties.getContactPoints().stream()
.map((candidate) -> formatContactPoint(candidate, properties.getPort())).collect(Collectors.toList()); .map((candidate) -> formatContactPoint(candidate, properties.getPort()))
.collect(Collectors.toList());
}
return null;
} }
private String formatContactPoint(String candidate, int port) { private String formatContactPoint(String candidate, int port) {

@ -17,8 +17,6 @@
package org.springframework.boot.autoconfigure.cassandra; package org.springframework.boot.autoconfigure.cassandra;
import java.time.Duration; import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List; import java.util.List;
import com.datastax.oss.driver.api.core.DefaultConsistencyLevel; import com.datastax.oss.driver.api.core.DefaultConsistencyLevel;
@ -57,7 +55,7 @@ public class CassandraProperties {
* Cluster node addresses in the form 'host:port', or a simple 'host' to use the * Cluster node addresses in the form 'host:port', or a simple 'host' to use the
* configured port. * configured port.
*/ */
private final List<String> contactPoints = new ArrayList<>(Collections.singleton("127.0.0.1:9042")); private List<String> contactPoints;
/** /**
* Port to use if a contact point does not specify one. * Port to use if a contact point does not specify one.
@ -83,7 +81,7 @@ public class CassandraProperties {
/** /**
* Compression supported by the Cassandra binary protocol. * Compression supported by the Cassandra binary protocol.
*/ */
private Compression compression = Compression.NONE; private Compression compression;
/** /**
* Schema action to take at startup. * Schema action to take at startup.
@ -143,6 +141,10 @@ public class CassandraProperties {
return this.contactPoints; return this.contactPoints;
} }
public void setContactPoints(List<String> contactPoints) {
this.contactPoints = contactPoints;
}
public int getPort() { public int getPort() {
return this.port; return this.port;
} }
@ -266,7 +268,7 @@ public class CassandraProperties {
/** /**
* How many rows will be retrieved simultaneously in a single network round-trip. * How many rows will be retrieved simultaneously in a single network round-trip.
*/ */
private int pageSize; private Integer pageSize;
private final Throttler throttler = new Throttler(); private final Throttler throttler = new Throttler();
@ -294,7 +296,7 @@ public class CassandraProperties {
this.serialConsistency = serialConsistency; this.serialConsistency = serialConsistency;
} }
public int getPageSize() { public Integer getPageSize() {
return this.pageSize; return this.pageSize;
} }
@ -347,7 +349,7 @@ public class CassandraProperties {
/** /**
* Timeout to use for control queries. * Timeout to use for control queries.
*/ */
private Duration timeout = Duration.ofSeconds(5); private Duration timeout;
public Duration getTimeout() { public Duration getTimeout() {
return this.timeout; return this.timeout;
@ -370,17 +372,17 @@ public class CassandraProperties {
* Maximum number of requests that can be enqueued when the throttling threshold * Maximum number of requests that can be enqueued when the throttling threshold
* is exceeded. * is exceeded.
*/ */
private int maxQueueSize; private Integer maxQueueSize;
/** /**
* Maximum number of requests that are allowed to execute in parallel. * Maximum number of requests that are allowed to execute in parallel.
*/ */
private int maxConcurrentRequests; private Integer maxConcurrentRequests;
/** /**
* Maximum allowed request rate. * Maximum allowed request rate.
*/ */
private int maxRequestsPerSecond; private Integer maxRequestsPerSecond;
/** /**
* How often the throttler attempts to dequeue requests. Set this high enough that * How often the throttler attempts to dequeue requests. Set this high enough that
@ -397,7 +399,7 @@ public class CassandraProperties {
this.type = type; this.type = type;
} }
public int getMaxQueueSize() { public Integer getMaxQueueSize() {
return this.maxQueueSize; return this.maxQueueSize;
} }
@ -405,7 +407,7 @@ public class CassandraProperties {
this.maxQueueSize = maxQueueSize; this.maxQueueSize = maxQueueSize;
} }
public int getMaxConcurrentRequests() { public Integer getMaxConcurrentRequests() {
return this.maxConcurrentRequests; return this.maxConcurrentRequests;
} }
@ -413,7 +415,7 @@ public class CassandraProperties {
this.maxConcurrentRequests = maxConcurrentRequests; this.maxConcurrentRequests = maxConcurrentRequests;
} }
public int getMaxRequestsPerSecond() { public Integer getMaxRequestsPerSecond() {
return this.maxRequestsPerSecond; return this.maxRequestsPerSecond;
} }

@ -492,6 +492,10 @@
"name": "spring.data.cassandra.connection.init-query-timeout", "name": "spring.data.cassandra.connection.init-query-timeout",
"defaultValue": "5s" "defaultValue": "5s"
}, },
{
"name": "spring.data.cassandra.controlconnection.timeout",
"defaultValue": "5s"
},
{ {
"name": "spring.data.cassandra.contact-points", "name": "spring.data.cassandra.contact-points",
"defaultValue": [ "defaultValue": [

@ -1,5 +1,5 @@
/* /*
* Copyright 2012-2021 the original author or authors. * Copyright 2012-2022 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -17,6 +17,7 @@
package org.springframework.boot.autoconfigure.cassandra; package org.springframework.boot.autoconfigure.cassandra;
import java.time.Duration; import java.time.Duration;
import java.util.Collections;
import com.datastax.oss.driver.api.core.CqlIdentifier; import com.datastax.oss.driver.api.core.CqlIdentifier;
import com.datastax.oss.driver.api.core.CqlSession; import com.datastax.oss.driver.api.core.CqlSession;
@ -43,6 +44,7 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy;
* *
* @author Eddú Meléndez * @author Eddú Meléndez
* @author Stephane Nicoll * @author Stephane Nicoll
* @author Ittay Stern
*/ */
class CassandraAutoConfigurationTests { class CassandraAutoConfigurationTests {
@ -244,6 +246,28 @@ class CassandraAutoConfigurationTests {
}); });
} }
@Test // gh-31238
void driverConfigLoaderWithConfigOverridesDefaults() {
String configLocation = "org/springframework/boot/autoconfigure/cassandra/override-defaults.conf";
this.contextRunner.withPropertyValues("spring.data.cassandra.config=" + configLocation).run((context) -> {
DriverExecutionProfile actual = context.getBean(DriverConfigLoader.class).getInitialConfig()
.getDefaultProfile();
assertThat(actual.getString(DefaultDriverOption.SESSION_NAME)).isEqualTo("advanced session");
assertThat(actual.getDuration(DefaultDriverOption.REQUEST_TIMEOUT)).isEqualTo(Duration.ofSeconds(2));
assertThat(actual.getStringList(DefaultDriverOption.CONTACT_POINTS))
.isEqualTo(Collections.singletonList("1.2.3.4:5678"));
assertThat(actual.getBoolean(DefaultDriverOption.RESOLVE_CONTACT_POINTS)).isFalse();
assertThat(actual.getInt(DefaultDriverOption.REQUEST_PAGE_SIZE)).isEqualTo(11);
assertThat(actual.getString(DefaultDriverOption.LOAD_BALANCING_LOCAL_DATACENTER)).isEqualTo("datacenter1");
assertThat(actual.getInt(DefaultDriverOption.REQUEST_THROTTLER_MAX_CONCURRENT_REQUESTS)).isEqualTo(22);
assertThat(actual.getInt(DefaultDriverOption.REQUEST_THROTTLER_MAX_REQUESTS_PER_SECOND)).isEqualTo(33);
assertThat(actual.getInt(DefaultDriverOption.REQUEST_THROTTLER_MAX_QUEUE_SIZE)).isEqualTo(44);
assertThat(actual.getDuration(DefaultDriverOption.CONTROL_CONNECTION_TIMEOUT))
.isEqualTo(Duration.ofMillis(5555));
assertThat(actual.getString(DefaultDriverOption.PROTOCOL_COMPRESSION)).isEqualTo("SNAPPY");
});
}
@Test @Test
void driverConfigLoaderWithConfigCreateProfiles() { void driverConfigLoaderWithConfigCreateProfiles() {
String configLocation = "org/springframework/boot/autoconfigure/cassandra/profiles.conf"; String configLocation = "org/springframework/boot/autoconfigure/cassandra/profiles.conf";

@ -0,0 +1,20 @@
datastax-java-driver {
basic {
session-name = advanced session
load-balancing-policy {
local-datacenter = datacenter1
}
request.page-size = 11
contact-points = [ "1.2.3.4:5678" ]
}
advanced {
throttler {
max-concurrent-requests = 22
max-requests-per-second = 33
max-queue-size = 44
}
control-connection.timeout = 5555
protocol.compression = SNAPPY
resolve-contact-points = false
}
}
Loading…
Cancel
Save