|
16 | 16 | package com.google.cloud.teleport.v2.source.reader.io.cassandra.iowrapper;
|
17 | 17 |
|
18 | 18 | import com.datastax.oss.driver.api.core.ConsistencyLevel;
|
19 |
| -import com.datastax.oss.driver.internal.core.retry.DefaultRetryPolicy; |
| 19 | +import com.datastax.oss.driver.api.core.config.DriverConfigLoader; |
| 20 | +import com.datastax.oss.driver.api.core.config.OptionsMap; |
| 21 | +import com.datastax.oss.driver.api.core.config.TypedDriverOption; |
20 | 22 | import com.google.auto.value.AutoValue;
|
21 |
| -import com.google.cloud.teleport.v2.source.reader.auth.dbauth.DbAuth; |
| 23 | +import com.google.cloud.teleport.v2.spanner.migrations.utils.CassandraDriverConfigLoader; |
| 24 | +import com.google.common.annotations.VisibleForTesting; |
22 | 25 | import com.google.common.collect.ImmutableList;
|
| 26 | +import java.io.FileNotFoundException; |
23 | 27 | import java.io.Serializable;
|
24 | 28 | import java.net.InetSocketAddress;
|
25 |
| -import java.time.Duration; |
26 | 29 | import java.util.List;
|
27 | 30 | import javax.annotation.Nullable;
|
28 | 31 |
|
29 | 32 | /**
|
30 | 33 | * Encapsulates details of a Cassandra Cluster. Cassandra Cluster can connect to multiple KeySpaces,
|
31 |
| - * just like a Mysql instance can have multiple databases. TODO(vardhanvthigle): Take |
32 |
| - * DriverConfiguration as a GCS file for advanced overrides. |
| 34 | + * just like a Mysql instance can have multiple databases. |
33 | 35 | */
|
34 | 36 | @AutoValue
|
35 | 37 | public abstract class CassandraDataSource implements Serializable {
|
36 | 38 |
|
37 |
| - /** Name of the Cassandra Cluster. */ |
38 |
| - public abstract String clusterName(); |
39 |
| - |
40 |
| - /** Name of local Datacenter. Must be specified if contactPoints are not empty */ |
41 |
| - @Nullable |
42 |
| - public abstract String localDataCenter(); |
43 |
| - |
44 |
| - /** Contact points for connecting to a Cassandra Cluster. */ |
45 |
| - public abstract ImmutableList<InetSocketAddress> contactPoints(); |
| 39 | + /** Options Map. * */ |
| 40 | + abstract OptionsMap optionsMap(); |
46 | 41 |
|
47 |
| - /** Cassandra Auth details. */ |
48 | 42 | @Nullable
|
49 |
| - public abstract DbAuth dbAuth(); |
| 43 | + public abstract String clusterName(); |
50 | 44 |
|
51 |
| - /** Retry Policy for Cassandra Driver. Defaults to {@link DefaultRetryPolicy}. */ |
52 |
| - public abstract Class retryPolicy(); |
| 45 | + public DriverConfigLoader driverConfigLoader() { |
| 46 | + return CassandraDriverConfigLoader.fromOptionsMap(optionsMap()); |
| 47 | + } |
53 | 48 |
|
54 |
| - /** Consistency level for reading the source. Defaults to {@link ConsistencyLevel#QUORUM} */ |
55 |
| - public abstract ConsistencyLevel consistencyLevel(); |
| 49 | + /** returns List of ContactPoints. Added for easier compatibility with 3.0 cluster creation. */ |
| 50 | + public ImmutableList<InetSocketAddress> contactPoints() { |
| 51 | + return driverConfigLoader() |
| 52 | + .getInitialConfig() |
| 53 | + .getDefaultProfile() |
| 54 | + .getStringList(TypedDriverOption.CONTACT_POINTS.getRawOption()) |
| 55 | + .stream() |
| 56 | + .map( |
| 57 | + contactPoint -> { |
| 58 | + String[] ipPort = contactPoint.split(":"); |
| 59 | + return new InetSocketAddress(ipPort[0], Integer.parseInt(ipPort[1])); |
| 60 | + }) |
| 61 | + .collect(ImmutableList.toImmutableList()); |
| 62 | + } |
56 | 63 |
|
57 |
| - /** Connection timeout for Cassandra driver. Set null for driver default. */ |
58 |
| - @Nullable |
59 |
| - public abstract Duration connectTimeout(); |
| 64 | + /** Returns local datacenter. Added for easier compatibility with 3.0 cluster creation. */ |
| 65 | + public String localDataCenter() { |
| 66 | + return driverConfigLoader() |
| 67 | + .getInitialConfig() |
| 68 | + .getDefaultProfile() |
| 69 | + .getString(TypedDriverOption.LOAD_BALANCING_LOCAL_DATACENTER.getRawOption()); |
| 70 | + } |
60 | 71 |
|
61 |
| - /** Read timeout for Cassandra driver. Set null for driver default. */ |
62 |
| - @Nullable |
63 |
| - public abstract Duration requestTimeout(); |
| 72 | + /** Returns the logged Keyspace. */ |
| 73 | + public String loggedKeySpace() { |
| 74 | + return driverConfigLoader() |
| 75 | + .getInitialConfig() |
| 76 | + .getDefaultProfile() |
| 77 | + .getString(TypedDriverOption.SESSION_KEYSPACE.getRawOption()); |
| 78 | + } |
64 | 79 |
|
65 | 80 | public static Builder builder() {
|
66 |
| - return new AutoValue_CassandraDataSource.Builder() |
67 |
| - .setRetryPolicy(DefaultRetryPolicy.class) |
68 |
| - .setConsistencyLevel(ConsistencyLevel.QUORUM); |
| 81 | + return new AutoValue_CassandraDataSource.Builder(); |
69 | 82 | }
|
70 | 83 |
|
71 | 84 | public abstract Builder toBuilder();
|
72 | 85 |
|
73 | 86 | @AutoValue.Builder
|
74 | 87 | public abstract static class Builder {
|
75 | 88 |
|
76 |
| - public abstract Builder setClusterName(String value); |
| 89 | + public abstract Builder setOptionsMap(OptionsMap value); |
77 | 90 |
|
78 |
| - public abstract Builder setLocalDataCenter(@Nullable String value); |
| 91 | + public abstract Builder setClusterName(@Nullable String value); |
79 | 92 |
|
80 |
| - public abstract Builder setContactPoints(ImmutableList<InetSocketAddress> value); |
| 93 | + abstract OptionsMap optionsMap(); |
81 | 94 |
|
82 |
| - public Builder setContactPoints(List<InetSocketAddress> value) { |
83 |
| - return setContactPoints(ImmutableList.copyOf(value)); |
| 95 | + public Builder setOptionsMapFromGcsFile(String gcsPath) throws FileNotFoundException { |
| 96 | + return this.setOptionsMap(CassandraDriverConfigLoader.getOptionsMapFromFile(gcsPath)); |
84 | 97 | }
|
85 | 98 |
|
86 |
| - public abstract Builder setDbAuth(@Nullable DbAuth value); |
87 |
| - |
88 |
| - public abstract Builder setRetryPolicy(Class value); |
| 99 | + public <ValueT> Builder overrideOptionInOptionsMap( |
| 100 | + TypedDriverOption<ValueT> option, ValueT value) { |
| 101 | + DriverConfigLoader.fromMap(optionsMap()) |
| 102 | + .getInitialConfig() |
| 103 | + .getProfiles() |
| 104 | + .keySet() |
| 105 | + .forEach(profile -> this.optionsMap().put(profile, option, value)); |
| 106 | + return this; |
| 107 | + } |
89 | 108 |
|
90 |
| - public abstract Builder setConsistencyLevel(ConsistencyLevel value); |
| 109 | + /** |
| 110 | + * Allowing UT to set the contact points. In UT environment, the port is dynamically determined. |
| 111 | + * We can't use a static GCS file to provide the contact points. |
| 112 | + */ |
| 113 | + @VisibleForTesting |
| 114 | + public Builder setContactPoints(List<InetSocketAddress> contactPoints) { |
| 115 | + overrideOptionInOptionsMap( |
| 116 | + TypedDriverOption.CONTACT_POINTS, |
| 117 | + contactPoints.stream() |
| 118 | + .map(p -> p.getAddress().getHostAddress() + ":" + p.getPort()) |
| 119 | + .collect(ImmutableList.toImmutableList())); |
| 120 | + return this; |
| 121 | + } |
91 | 122 |
|
92 |
| - public abstract Builder setConnectTimeout(@Nullable Duration value); |
| 123 | + /** Set the local Datacenter. */ |
| 124 | + @VisibleForTesting |
| 125 | + public Builder setLocalDataCenter(String localDataCenter) { |
| 126 | + overrideOptionInOptionsMap( |
| 127 | + TypedDriverOption.LOAD_BALANCING_LOCAL_DATACENTER, localDataCenter); |
| 128 | + return this; |
| 129 | + } |
93 | 130 |
|
94 |
| - public abstract Builder setRequestTimeout(@Nullable Duration value); |
| 131 | + abstract CassandraDataSource autoBuild(); |
95 | 132 |
|
96 |
| - public abstract CassandraDataSource build(); |
| 133 | + public CassandraDataSource build() { |
| 134 | + /* Prefer to use quorum read until we encounter a strong use case to not do so. */ |
| 135 | + this.overrideOptionInOptionsMap( |
| 136 | + TypedDriverOption.REQUEST_CONSISTENCY, ConsistencyLevel.QUORUM.toString()); |
| 137 | + return autoBuild(); |
| 138 | + } |
97 | 139 | }
|
98 | 140 | }
|
0 commit comments