Skip to content

Commit 3b1477a

Browse files
Fixing TODO for Ingesting GCS Config (GoogleCloudPlatform#2104)
1 parent 934a393 commit 3b1477a

File tree

7 files changed

+165
-170
lines changed

7 files changed

+165
-170
lines changed

v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/iowrapper/CassandraConnector.java

Lines changed: 1 addition & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,10 @@
1717

1818
import com.datastax.oss.driver.api.core.CqlSession;
1919
import com.datastax.oss.driver.api.core.CqlSessionBuilder;
20-
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
2120
import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
22-
import com.datastax.oss.driver.api.core.config.ProgrammaticDriverConfigLoaderBuilder;
2321
import com.datastax.oss.driver.api.core.session.Session;
2422
import com.google.cloud.teleport.v2.source.reader.io.cassandra.schema.CassandraSchemaReference;
2523
import com.google.common.annotations.VisibleForTesting;
26-
import java.util.List;
2724
import org.jline.utils.Log;
2825
import org.slf4j.Logger;
2926
import org.slf4j.LoggerFactory;
@@ -46,11 +43,6 @@ public CassandraConnector(
4643
schemaReference);
4744
CqlSessionBuilder builder =
4845
CqlSession.builder().withConfigLoader(getDriverConfigLoader(dataSource));
49-
builder = setCredentials(builder, dataSource);
50-
if (dataSource.localDataCenter() != null) {
51-
builder = builder.addContactPoints(List.copyOf(dataSource.contactPoints()));
52-
builder = builder.withLocalDatacenter(dataSource.localDataCenter());
53-
}
5446
if (schemaReference.keyspaceName() != null) {
5547
builder.withKeyspace(schemaReference.keyspaceName());
5648
}
@@ -62,36 +54,9 @@ public CassandraConnector(
6254
schemaReference);
6355
}
6456

65-
@VisibleForTesting
66-
protected static CqlSessionBuilder setCredentials(
67-
CqlSessionBuilder builder, CassandraDataSource cassandraDataSource) {
68-
if (cassandraDataSource.dbAuth() != null) {
69-
return builder.withAuthCredentials(
70-
cassandraDataSource.dbAuth().getUserName().get(),
71-
cassandraDataSource.dbAuth().getPassword().get());
72-
} else {
73-
return builder;
74-
}
75-
}
76-
7757
@VisibleForTesting
7858
protected static DriverConfigLoader getDriverConfigLoader(CassandraDataSource dataSource) {
79-
ProgrammaticDriverConfigLoaderBuilder driverConfigLoaderBuilder =
80-
DriverConfigLoader.programmaticBuilder()
81-
.withString(
82-
DefaultDriverOption.REQUEST_CONSISTENCY, dataSource.consistencyLevel().name())
83-
.withClass(DefaultDriverOption.RETRY_POLICY_CLASS, dataSource.retryPolicy());
84-
if (dataSource.connectTimeout() != null) {
85-
driverConfigLoaderBuilder =
86-
driverConfigLoaderBuilder.withDuration(
87-
DefaultDriverOption.CONNECTION_CONNECT_TIMEOUT, dataSource.connectTimeout());
88-
}
89-
if (dataSource.requestTimeout() != null) {
90-
driverConfigLoaderBuilder =
91-
driverConfigLoaderBuilder.withDuration(
92-
DefaultDriverOption.REQUEST_TIMEOUT, dataSource.requestTimeout());
93-
}
94-
return driverConfigLoaderBuilder.build();
59+
return dataSource.driverConfigLoader();
9560
}
9661

9762
public Session getSession() {

v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/iowrapper/CassandraDataSource.java

Lines changed: 83 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -16,83 +16,125 @@
1616
package com.google.cloud.teleport.v2.source.reader.io.cassandra.iowrapper;
1717

1818
import com.datastax.oss.driver.api.core.ConsistencyLevel;
19-
import com.datastax.oss.driver.internal.core.retry.DefaultRetryPolicy;
19+
import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
20+
import com.datastax.oss.driver.api.core.config.OptionsMap;
21+
import com.datastax.oss.driver.api.core.config.TypedDriverOption;
2022
import com.google.auto.value.AutoValue;
21-
import com.google.cloud.teleport.v2.source.reader.auth.dbauth.DbAuth;
23+
import com.google.cloud.teleport.v2.spanner.migrations.utils.CassandraDriverConfigLoader;
24+
import com.google.common.annotations.VisibleForTesting;
2225
import com.google.common.collect.ImmutableList;
26+
import java.io.FileNotFoundException;
2327
import java.io.Serializable;
2428
import java.net.InetSocketAddress;
25-
import java.time.Duration;
2629
import java.util.List;
2730
import javax.annotation.Nullable;
2831

2932
/**
3033
* Encapsulates details of a Cassandra Cluster. Cassandra Cluster can connect to multiple KeySpaces,
31-
* just like a Mysql instance can have multiple databases. TODO(vardhanvthigle): Take
32-
* DriverConfiguration as a GCS file for advanced overrides.
34+
* just like a Mysql instance can have multiple databases.
3335
*/
3436
@AutoValue
3537
public abstract class CassandraDataSource implements Serializable {
3638

37-
/** Name of the Cassandra Cluster. */
38-
public abstract String clusterName();
39-
40-
/** Name of local Datacenter. Must be specified if contactPoints are not empty */
41-
@Nullable
42-
public abstract String localDataCenter();
43-
44-
/** Contact points for connecting to a Cassandra Cluster. */
45-
public abstract ImmutableList<InetSocketAddress> contactPoints();
39+
/** Options Map. * */
40+
abstract OptionsMap optionsMap();
4641

47-
/** Cassandra Auth details. */
4842
@Nullable
49-
public abstract DbAuth dbAuth();
43+
public abstract String clusterName();
5044

51-
/** Retry Policy for Cassandra Driver. Defaults to {@link DefaultRetryPolicy}. */
52-
public abstract Class retryPolicy();
45+
public DriverConfigLoader driverConfigLoader() {
46+
return CassandraDriverConfigLoader.fromOptionsMap(optionsMap());
47+
}
5348

54-
/** Consistency level for reading the source. Defaults to {@link ConsistencyLevel#QUORUM} */
55-
public abstract ConsistencyLevel consistencyLevel();
49+
/** returns List of ContactPoints. Added for easier compatibility with 3.0 cluster creation. */
50+
public ImmutableList<InetSocketAddress> contactPoints() {
51+
return driverConfigLoader()
52+
.getInitialConfig()
53+
.getDefaultProfile()
54+
.getStringList(TypedDriverOption.CONTACT_POINTS.getRawOption())
55+
.stream()
56+
.map(
57+
contactPoint -> {
58+
String[] ipPort = contactPoint.split(":");
59+
return new InetSocketAddress(ipPort[0], Integer.parseInt(ipPort[1]));
60+
})
61+
.collect(ImmutableList.toImmutableList());
62+
}
5663

57-
/** Connection timeout for Cassandra driver. Set null for driver default. */
58-
@Nullable
59-
public abstract Duration connectTimeout();
64+
/** Returns local datacenter. Added for easier compatibility with 3.0 cluster creation. */
65+
public String localDataCenter() {
66+
return driverConfigLoader()
67+
.getInitialConfig()
68+
.getDefaultProfile()
69+
.getString(TypedDriverOption.LOAD_BALANCING_LOCAL_DATACENTER.getRawOption());
70+
}
6071

61-
/** Read timeout for Cassandra driver. Set null for driver default. */
62-
@Nullable
63-
public abstract Duration requestTimeout();
72+
/** Returns the logged Keyspace. */
73+
public String loggedKeySpace() {
74+
return driverConfigLoader()
75+
.getInitialConfig()
76+
.getDefaultProfile()
77+
.getString(TypedDriverOption.SESSION_KEYSPACE.getRawOption());
78+
}
6479

6580
public static Builder builder() {
66-
return new AutoValue_CassandraDataSource.Builder()
67-
.setRetryPolicy(DefaultRetryPolicy.class)
68-
.setConsistencyLevel(ConsistencyLevel.QUORUM);
81+
return new AutoValue_CassandraDataSource.Builder();
6982
}
7083

7184
public abstract Builder toBuilder();
7285

7386
@AutoValue.Builder
7487
public abstract static class Builder {
7588

76-
public abstract Builder setClusterName(String value);
89+
public abstract Builder setOptionsMap(OptionsMap value);
7790

78-
public abstract Builder setLocalDataCenter(@Nullable String value);
91+
public abstract Builder setClusterName(@Nullable String value);
7992

80-
public abstract Builder setContactPoints(ImmutableList<InetSocketAddress> value);
93+
abstract OptionsMap optionsMap();
8194

82-
public Builder setContactPoints(List<InetSocketAddress> value) {
83-
return setContactPoints(ImmutableList.copyOf(value));
95+
public Builder setOptionsMapFromGcsFile(String gcsPath) throws FileNotFoundException {
96+
return this.setOptionsMap(CassandraDriverConfigLoader.getOptionsMapFromFile(gcsPath));
8497
}
8598

86-
public abstract Builder setDbAuth(@Nullable DbAuth value);
87-
88-
public abstract Builder setRetryPolicy(Class value);
99+
public <ValueT> Builder overrideOptionInOptionsMap(
100+
TypedDriverOption<ValueT> option, ValueT value) {
101+
DriverConfigLoader.fromMap(optionsMap())
102+
.getInitialConfig()
103+
.getProfiles()
104+
.keySet()
105+
.forEach(profile -> this.optionsMap().put(profile, option, value));
106+
return this;
107+
}
89108

90-
public abstract Builder setConsistencyLevel(ConsistencyLevel value);
109+
/**
110+
* Allowing UT to set the contact points. In UT environment, the port is dynamically determined.
111+
* We can't use a static GCS file to provide the contact points.
112+
*/
113+
@VisibleForTesting
114+
public Builder setContactPoints(List<InetSocketAddress> contactPoints) {
115+
overrideOptionInOptionsMap(
116+
TypedDriverOption.CONTACT_POINTS,
117+
contactPoints.stream()
118+
.map(p -> p.getAddress().getHostAddress() + ":" + p.getPort())
119+
.collect(ImmutableList.toImmutableList()));
120+
return this;
121+
}
91122

92-
public abstract Builder setConnectTimeout(@Nullable Duration value);
123+
/** Set the local Datacenter. */
124+
@VisibleForTesting
125+
public Builder setLocalDataCenter(String localDataCenter) {
126+
overrideOptionInOptionsMap(
127+
TypedDriverOption.LOAD_BALANCING_LOCAL_DATACENTER, localDataCenter);
128+
return this;
129+
}
93130

94-
public abstract Builder setRequestTimeout(@Nullable Duration value);
131+
abstract CassandraDataSource autoBuild();
95132

96-
public abstract CassandraDataSource build();
133+
public CassandraDataSource build() {
134+
/* Prefer to use quorum read until we encounter a strong use case to not do so. */
135+
this.overrideOptionInOptionsMap(
136+
TypedDriverOption.REQUEST_CONSISTENCY, ConsistencyLevel.QUORUM.toString());
137+
return autoBuild();
138+
}
97139
}
98140
}

v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/iowrapper/CassandraConnectorTest.java

Lines changed: 2 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -19,20 +19,11 @@
1919
import static com.google.cloud.teleport.v2.source.reader.io.cassandra.testutils.BasicTestSchema.TEST_CQLSH;
2020
import static com.google.cloud.teleport.v2.source.reader.io.cassandra.testutils.BasicTestSchema.TEST_KEYSPACE;
2121
import static com.google.common.truth.Truth.assertThat;
22-
import static org.mockito.ArgumentMatchers.anyString;
23-
import static org.mockito.Mockito.mock;
24-
import static org.mockito.Mockito.never;
25-
import static org.mockito.Mockito.times;
26-
import static org.mockito.Mockito.verify;
2722

28-
import com.datastax.oss.driver.api.core.ConsistencyLevel;
29-
import com.datastax.oss.driver.api.core.CqlSessionBuilder;
30-
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
31-
import com.google.cloud.teleport.v2.source.reader.auth.dbauth.LocalCredentialsProvider;
23+
import com.datastax.oss.driver.api.core.config.OptionsMap;
3224
import com.google.cloud.teleport.v2.source.reader.io.cassandra.schema.CassandraSchemaReference;
3325
import com.google.cloud.teleport.v2.source.reader.io.cassandra.testutils.SharedEmbeddedCassandra;
3426
import java.io.IOException;
35-
import java.time.Duration;
3627
import org.junit.AfterClass;
3728
import org.junit.BeforeClass;
3829
import org.junit.Test;
@@ -71,6 +62,7 @@ public void testBasic() throws IOException {
7162
CassandraDataSource cassandraDataSource =
7263
CassandraDataSource.builder()
7364
.setClusterName(sharedEmbeddedCassandra.getInstance().getClusterName())
65+
.setOptionsMap(OptionsMap.driverDefaults())
7466
.setContactPoints(sharedEmbeddedCassandra.getInstance().getContactPoints())
7567
.setLocalDataCenter(sharedEmbeddedCassandra.getInstance().getLocalDataCenter())
7668
.build();
@@ -95,77 +87,4 @@ public void testBasic() throws IOException {
9587
assertThat(cassandraConnectorWithNullKeySpace.getSession().getKeyspace()).isEmpty();
9688
}
9789
}
98-
99-
@Test
100-
public void testCredentialsSetter() {
101-
102-
final String testUserName = "testUseramNe";
103-
final String testPassword = "test";
104-
105-
CassandraDataSource cassandraDataSource =
106-
CassandraDataSource.builder()
107-
.setClusterName(sharedEmbeddedCassandra.getInstance().getClusterName())
108-
.setContactPoints(sharedEmbeddedCassandra.getInstance().getContactPoints())
109-
.setLocalDataCenter(sharedEmbeddedCassandra.getInstance().getLocalDataCenter())
110-
.build();
111-
CqlSessionBuilder mockSessionBuilder = mock(CqlSessionBuilder.class);
112-
// No Auth Set
113-
CassandraConnector.setCredentials(mockSessionBuilder, cassandraDataSource);
114-
verify(mockSessionBuilder, never()).withAuthCredentials(anyString(), anyString());
115-
// Auth set
116-
CassandraConnector.setCredentials(
117-
mockSessionBuilder,
118-
cassandraDataSource.toBuilder()
119-
.setDbAuth(
120-
LocalCredentialsProvider.builder()
121-
.setUserName(testUserName)
122-
.setPassword(testPassword)
123-
.build())
124-
.build());
125-
verify(mockSessionBuilder, times(1)).withAuthCredentials(testUserName, testPassword);
126-
}
127-
128-
@Test
129-
public void testConfigLoader() {
130-
131-
CassandraDataSource cassandraDataSource =
132-
CassandraDataSource.builder()
133-
.setClusterName(sharedEmbeddedCassandra.getInstance().getClusterName())
134-
.setContactPoints(sharedEmbeddedCassandra.getInstance().getContactPoints())
135-
.setLocalDataCenter(sharedEmbeddedCassandra.getInstance().getLocalDataCenter())
136-
.build();
137-
assertThat(
138-
CassandraConnector.getDriverConfigLoader(cassandraDataSource)
139-
.getInitialConfig()
140-
.getDefaultProfile()
141-
.getString(DefaultDriverOption.REQUEST_CONSISTENCY))
142-
.isEqualTo(ConsistencyLevel.QUORUM.name());
143-
assertThat(
144-
CassandraConnector.getDriverConfigLoader(
145-
cassandraDataSource.toBuilder()
146-
.setConsistencyLevel(ConsistencyLevel.ONE)
147-
.build())
148-
.getInitialConfig()
149-
.getDefaultProfile()
150-
.getString(DefaultDriverOption.REQUEST_CONSISTENCY))
151-
.isEqualTo(ConsistencyLevel.ONE.name());
152-
assertThat(
153-
CassandraConnector.getDriverConfigLoader(
154-
cassandraDataSource.toBuilder()
155-
.setConnectTimeout(Duration.ofSeconds(42L))
156-
.build())
157-
.getInitialConfig()
158-
.getDefaultProfile()
159-
.getDuration(DefaultDriverOption.CONNECTION_CONNECT_TIMEOUT))
160-
.isEqualTo(Duration.ofSeconds(42L));
161-
assertThat(
162-
CassandraConnector.getDriverConfigLoader(
163-
cassandraDataSource.toBuilder()
164-
.setRequestTimeout(Duration.ofSeconds(42L))
165-
.build())
166-
.getInitialConfig()
167-
.getDefaultProfile()
168-
.getDuration(DefaultDriverOption.REQUEST_TIMEOUT))
169-
.isEqualTo(Duration.ofSeconds(42L));
170-
}
17190
}

0 commit comments

Comments
 (0)