Skip to content

Commit 177a218

Browse files
CassandraDriverConfigLoader from GCS (GoogleCloudPlatform#2077)
1 parent 0962cb0 commit 177a218

File tree

6 files changed

+1845
-0
lines changed

6 files changed

+1845
-0
lines changed

v2/spanner-common/pom.xml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,18 @@
4444
<version>1.0-SNAPSHOT</version>
4545
<scope>compile</scope>
4646
</dependency>
47+
<dependency>
48+
<groupId>com.datastax.oss</groupId>
49+
<artifactId>java-driver-core</artifactId>
50+
<version>4.17.0</version>
51+
<scope>compile</scope>
52+
</dependency>
53+
<dependency>
54+
<groupId>org.mockito</groupId>
55+
<artifactId>mockito-inline</artifactId>
56+
<version>3.12.4</version>
57+
<scope>test</scope>
58+
</dependency>
4759
</dependencies>
4860

4961
<build>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
/*
2+
* Copyright (C) 2024 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package com.google.cloud.teleport.v2.spanner.migrations.utils;
17+
18+
import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
19+
import com.datastax.oss.driver.api.core.config.OptionsMap;
20+
import com.datastax.oss.driver.api.core.config.TypedDriverOption;
21+
import com.google.common.annotations.VisibleForTesting;
22+
import com.google.common.base.Objects;
23+
import com.google.common.collect.ImmutableMap;
24+
import com.typesafe.config.ConfigException;
25+
import java.io.FileNotFoundException;
26+
import java.net.URL;
27+
import java.util.Map.Entry;
28+
import org.slf4j.Logger;
29+
import org.slf4j.LoggerFactory;
30+
31+
/**
32+
* A common static utility class that allows the spanner migration pipelines to ingest Cassandra
33+
* Driver config file from GCS. Cassandra has a structured config file to accept all the driver
34+
* parameters, be it list of host ip addresses, credentials, retry policy and many more. Most of
35+
* these parameters are very specific to the Cassandra Database. Refer to the <a
36+
* href=>https://docs.datastax.com/en/developer/java-driver/4.3/manual/core/configuration/reference/index.html>reference
37+
* configuration</a> for the file format.
38+
*/
39+
public final class CassandraDriverConfigLoader {
40+
41+
private static final Logger LOG = LoggerFactory.getLogger(CassandraDriverConfigLoader.class);
42+
private static final ImmutableMap<String, TypedDriverOption> OPTIONS_SUPPORTED_BY_DRIVER =
43+
getOptionsSupportedByDriver();
44+
45+
/**
46+
* Load the Cassandra Config from a file as a {@link DriverConfigLoader}.
47+
*
48+
* @param path A complete gcs path to the config file of the form "gs://path/to/file".
49+
* @return DriverConfigLoader.
50+
* @throws FileNotFoundException - If file is not found at specified path.
51+
*/
52+
public static DriverConfigLoader loadFile(String path) throws FileNotFoundException {
53+
URL url = loadSingleFile(path);
54+
LOG.debug("Loaded Cassandra Driver config from path {}", path);
55+
try {
56+
DriverConfigLoader.fromUrl(url).getInitialConfig();
57+
return DriverConfigLoader.fromUrl(url);
58+
} catch (ConfigException.Parse parseException) {
59+
LOG.error(
60+
"Parsing error while parsing Cassandra Driver config from path {}", path, parseException);
61+
throw parseException;
62+
}
63+
}
64+
65+
/**
66+
* Load the Cassandra Config from a file as a {@link java.io.Serializable} {@link OptionsMap}.
67+
* This {@link OptionsMap} can be stored in any object that needs to implement {@link
68+
* java.io.Serializable}. At the time of opening a connection to Cassandra, it can be deserialized
69+
* by {@link CassandraDriverConfigLoader#fromOptionsMap(OptionsMap)}. Note: Implementation Detail,
70+
* Cassandra Driver does not provide a direct method to convert a link {@link DriverConfigLoader}
71+
* into an {@link OptionsMap}, or build an {@link OptionsMap} from a file.
72+
*
73+
* @param path A complete gcs path to the config file of the form "gs://path/to/file".
74+
* @return DriverConfigLoader.
75+
* @throws FileNotFoundException - If file is not found at specified path.
76+
*/
77+
public static OptionsMap getOptionsMapFromFile(String path) throws FileNotFoundException {
78+
OptionsMap optionsMap = new OptionsMap();
79+
DriverConfigLoader configLoader = loadFile(path);
80+
configLoader
81+
.getInitialConfig()
82+
.getProfiles()
83+
.forEach(
84+
(profileName, profile) ->
85+
profile.entrySet().forEach(e -> putInOptionsMap(optionsMap, profileName, e)));
86+
87+
return optionsMap;
88+
}
89+
90+
/**
91+
* Load the {@link DriverConfigLoader} from {@link java.io.Serializable} {@link OptionsMap} which
92+
* was obtained as a part of {@link CassandraDriverConfigLoader#getOptionsMapFromFile(String)}.
93+
*
94+
* @param optionsMap
95+
* @return DriverConfigLoader.
96+
*/
97+
public static DriverConfigLoader fromOptionsMap(OptionsMap optionsMap) {
98+
return DriverConfigLoader.fromMap(optionsMap);
99+
}
100+
101+
@VisibleForTesting
102+
protected static URL loadSingleFile(String path) throws FileNotFoundException {
103+
URL[] urls = JarFileReader.saveFilesLocally(path);
104+
if (urls.length == 0) {
105+
LOG.error("Could not load any Cassandra driver config file from specified path {}", path);
106+
throw (new FileNotFoundException("No file found in path " + path));
107+
}
108+
if (urls.length > 1) {
109+
LOG.error(
110+
"Need to provide a single Cassandra driver config file in the specified path {}. Found {} ",
111+
path,
112+
urls);
113+
throw (new IllegalArgumentException(
114+
String.format(
115+
"Need to provide a single Cassandra driver config file in the specified path %s. Found %d files",
116+
path, urls.length)));
117+
}
118+
return urls[0];
119+
}
120+
121+
@VisibleForTesting
122+
protected static void putInOptionsMap(
123+
OptionsMap optionsMap, String profileName, Entry<String, Object> e) {
124+
125+
TypedDriverOption option = OPTIONS_SUPPORTED_BY_DRIVER.get(e.getKey());
126+
if (Objects.equal(option, null)) {
127+
LOG.error(
128+
"Unknown Cassandra Option {}, Options supported by driver = {}",
129+
e.getKey(),
130+
OPTIONS_SUPPORTED_BY_DRIVER);
131+
throw new IllegalArgumentException(
132+
String.format(
133+
"Unknown Cassandra Driver Option %s. Supported Options = %s",
134+
e.getKey(), OPTIONS_SUPPORTED_BY_DRIVER));
135+
}
136+
optionsMap.put(profileName, option, e.getValue());
137+
}
138+
139+
private static ImmutableMap<String, TypedDriverOption> getOptionsSupportedByDriver() {
140+
ImmutableMap.Builder<String, TypedDriverOption> mapBuilder = ImmutableMap.builder();
141+
TypedDriverOption.builtInValues().forEach(e -> mapBuilder.put(e.getRawOption().getPath(), e));
142+
return mapBuilder.build();
143+
}
144+
145+
private CassandraDriverConfigLoader() {}
146+
}

v2/spanner-common/src/main/java/com/google/cloud/teleport/v2/spanner/migrations/utils/JarFileReader.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.slf4j.Logger;
3333
import org.slf4j.LoggerFactory;
3434

35+
/** TODO: rename this to FileReader. */
3536
public class JarFileReader {
3637

3738
private static final Logger LOG = LoggerFactory.getLogger(JarFileReader.class);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
/*
2+
* Copyright (C) 2024 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package com.google.cloud.teleport.v2.spanner.migrations.utils;
17+
18+
import static com.datastax.oss.driver.api.core.config.DefaultDriverOption.CONTACT_POINTS;
19+
import static com.datastax.oss.driver.api.core.config.DefaultDriverOption.RETRY_POLICY_CLASS;
20+
import static com.google.common.truth.Truth.assertThat;
21+
import static org.junit.Assert.assertThrows;
22+
import static org.mockito.Mockito.mockStatic;
23+
24+
import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
25+
import com.datastax.oss.driver.api.core.config.OptionsMap;
26+
import com.google.common.collect.ImmutableMap;
27+
import com.google.common.io.Resources;
28+
import com.typesafe.config.ConfigException;
29+
import java.io.FileNotFoundException;
30+
import java.net.MalformedURLException;
31+
import java.net.URL;
32+
import java.util.AbstractMap.SimpleEntry;
33+
import java.util.List;
34+
import org.junit.After;
35+
import org.junit.Before;
36+
import org.junit.Test;
37+
import org.junit.runner.RunWith;
38+
import org.mockito.MockedStatic;
39+
import org.mockito.junit.MockitoJUnitRunner;
40+
41+
/** Test class for {@link CassandraDriverConfigLoader}. */
42+
@RunWith(MockitoJUnitRunner.class)
43+
public class CassandraDriverConfigLoaderTest {
44+
MockedStatic mockFileReader;
45+
46+
@Before
47+
public void initialize() {
48+
mockFileReader = mockStatic(JarFileReader.class);
49+
}
50+
51+
@Test
52+
public void testCassandraDriverConfigLoaderBasic()
53+
throws FileNotFoundException, MalformedURLException {
54+
String testGcsPath = "gs://smt-test-bucket/cassandraConfig.conf";
55+
URL testUrl = Resources.getResource("test-cassandra-config.conf");
56+
mockFileReader
57+
.when(() -> JarFileReader.saveFilesLocally(testGcsPath))
58+
.thenReturn(new URL[] {testUrl});
59+
DriverConfigLoader driverConfigLoader = CassandraDriverConfigLoader.loadFile(testGcsPath);
60+
assertThat(
61+
driverConfigLoader
62+
.getInitialConfig()
63+
.getProfiles()
64+
.get("default")
65+
.getStringList(CONTACT_POINTS))
66+
.isEqualTo(List.of("127.0.0.1:9042", "127.0.0.2:9042"));
67+
;
68+
assertThat(
69+
driverConfigLoader
70+
.getInitialConfig()
71+
.getProfiles()
72+
.get("default")
73+
.getString(RETRY_POLICY_CLASS))
74+
.isEqualTo("DefaultRetryPolicy");
75+
}
76+
77+
@Test
78+
public void testCassandraDriverConfigLoadError()
79+
throws FileNotFoundException, MalformedURLException {
80+
String testGcsPathNotFound = "gs://smt-test-bucket/cassandraConfigNotFound.conf";
81+
String testGcsPathList =
82+
"gs://smt-test-bucket/cassandraConfig1.conf,gs://smt-test-bucket/cassandraConfig2.conf";
83+
84+
URL testUrl = Resources.getResource("test-cassandra-config-parse-err.conf");
85+
mockFileReader
86+
.when(() -> JarFileReader.saveFilesLocally(testGcsPathNotFound))
87+
.thenReturn(new URL[] {});
88+
mockFileReader
89+
.when(() -> JarFileReader.saveFilesLocally(testGcsPathList))
90+
.thenReturn(
91+
new URL[] {
92+
Resources.getResource("test-cassandra-config.conf"),
93+
Resources.getResource("test-cassandra-config.conf")
94+
});
95+
assertThrows(
96+
FileNotFoundException.class,
97+
() -> CassandraDriverConfigLoader.loadFile(testGcsPathNotFound));
98+
assertThrows(
99+
IllegalArgumentException.class,
100+
() -> CassandraDriverConfigLoader.loadFile(testGcsPathList));
101+
}
102+
103+
@Test
104+
public void testCassandraDriverConfigParseError()
105+
throws FileNotFoundException, MalformedURLException {
106+
String testGcsPath = "gs://smt-test-bucket/cassandraConfig.conf";
107+
URL testUrl = Resources.getResource("test-cassandra-config-parse-err.conf");
108+
mockFileReader
109+
.when(() -> JarFileReader.saveFilesLocally(testGcsPath))
110+
.thenReturn(new URL[] {testUrl});
111+
assertThrows(
112+
ConfigException.Parse.class, () -> CassandraDriverConfigLoader.loadFile(testGcsPath));
113+
}
114+
115+
@Test
116+
public void testOptionsMapConversion() throws FileNotFoundException {
117+
118+
String testGcsPath = "gs://smt-test-bucket/cassandraConfig.conf";
119+
URL testUrl = Resources.getResource("test-cassandra-config.conf");
120+
mockFileReader
121+
.when(() -> JarFileReader.saveFilesLocally(testGcsPath))
122+
.thenReturn(new URL[] {testUrl});
123+
DriverConfigLoader driverConfigLoaderDirect = CassandraDriverConfigLoader.loadFile(testGcsPath);
124+
OptionsMap optionsMap = CassandraDriverConfigLoader.getOptionsMapFromFile(testGcsPath);
125+
DriverConfigLoader driverConfigLoaderFromOptionsMap =
126+
CassandraDriverConfigLoader.fromOptionsMap(optionsMap);
127+
ImmutableMap<String, ImmutableMap<String, String>> directLoadMap =
128+
driverConfigMap(driverConfigLoaderDirect);
129+
ImmutableMap<String, ImmutableMap<String, String>> fromOptionsMap =
130+
driverConfigMap(driverConfigLoaderFromOptionsMap);
131+
132+
assertThat(directLoadMap).isEqualTo(fromOptionsMap);
133+
134+
assertThrows(
135+
IllegalArgumentException.class,
136+
() -> {
137+
OptionsMap optionsMapToLoad = new OptionsMap();
138+
CassandraDriverConfigLoader.putInOptionsMap(
139+
optionsMapToLoad, "default", new SimpleEntry<>("Unsupported", "Unsupported"));
140+
});
141+
}
142+
143+
private static ImmutableMap<String, ImmutableMap<String, String>> driverConfigMap(
144+
DriverConfigLoader driverConfigLoaderDirect) {
145+
ImmutableMap.Builder<String, ImmutableMap<String, String>> driverConfigMap =
146+
ImmutableMap.builder();
147+
driverConfigLoaderDirect
148+
.getInitialConfig()
149+
.getProfiles()
150+
.forEach(
151+
(profile, options) -> {
152+
ImmutableMap.Builder<String, String> profileMapBuilder = ImmutableMap.builder();
153+
options
154+
.entrySet()
155+
.forEach(
156+
e -> profileMapBuilder.put(e.getKey().toString(), e.getValue().toString()));
157+
driverConfigMap.put(profile, profileMapBuilder.build());
158+
});
159+
return driverConfigMap.build();
160+
}
161+
162+
@After
163+
public void cleanup() {
164+
mockFileReader.close();
165+
mockFileReader = null;
166+
}
167+
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
# Configuration for the DataStax Java driver for Apache Cassandra®.
2+
# This file is in HOCON format, see https://github.com/typesafehub/config/blob/master/HOCON.md.
3+
# This file has an intentional parsing error, to help test exception handling for cases where the config file does not get parsed.
4+
# DO NOT USE FOR PRODUCTION.
5+
6+
datastax-java-driver {
7+
basic.contact-points = [ "127.0.0.1:9042", ]
8+
}
9+
}

0 commit comments

Comments
 (0)