Skip to content

Commit 5d6e63c

Browse files
Correcting Partitioning Logic for VarBinary Primary Key Type. (GoogleCloudPlatform#2064)
* Correcting Collation Mapping Query for Varbinary. * Fixing NoSuchFileException for mssql-jdbc-12.2.0.jre11
1 parent d9243b5 commit 5d6e63c

File tree

13 files changed

+415
-118
lines changed

13 files changed

+415
-118
lines changed

v2/sourcedb-to-spanner/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,12 @@
146146
<groupId>org.apache.beam</groupId>
147147
<artifactId>beam-sdks-java-io-cassandra</artifactId>
148148
</dependency>
149+
<dependency>
150+
<groupId>com.microsoft.sqlserver</groupId>
151+
<artifactId>mssql-jdbc</artifactId>
152+
<version>${mssql-jdbc.version}</version>
153+
<scope>test</scope>
154+
</dependency>
149155
<dependency>
150156
<groupId>org.apache.commons</groupId>
151157
<artifactId>commons-collections4</artifactId>

v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/dialectadapter/mysql/MysqlDialectAdapter.java

Lines changed: 20 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535
import com.google.common.annotations.VisibleForTesting;
3636
import com.google.common.collect.ImmutableList;
3737
import com.google.common.collect.ImmutableMap;
38-
import com.google.common.collect.ImmutableSet;
3938
import com.google.re2j.Pattern;
4039
import java.sql.PreparedStatement;
4140
import java.sql.ResultSet;
@@ -59,9 +58,6 @@
5958
public final class MysqlDialectAdapter implements DialectAdapter {
6059

6160
public static final String PAD_SPACE = "PAD SPACE";
62-
public static final String NO_PAD = "NO PAD";
63-
public static final String BINARY_CHARACTER_SET = "binary";
64-
public static final String BINARY_COLLATION = "binary";
6561
private final MySqlVersion mySqlVersion;
6662

6763
private static final Logger logger = LoggerFactory.getLogger(MysqlDialectAdapter.class);
@@ -370,16 +366,14 @@ private ImmutableMap<String, SourceColumnType> getTableCols(
370366
// String types: Ref https://dev.mysql.com/doc/refman/8.4/en/string-type-syntax.html
371367
.put("CHAR", IndexType.STRING)
372368
.put("VARCHAR", IndexType.STRING)
373-
.put("BINARY", IndexType.STRING)
374-
.put("VARBINARY", IndexType.STRING)
375-
.put("BLOB", IndexType.STRING)
376-
.put("TEXT", IndexType.STRING)
377-
.put("ENUM", IndexType.STRING)
378-
.put("SET", IndexType.STRING)
369+
// Mapping BINARY, VARBINARY and TINYBLOB to Java bigInteger
370+
// Ref https://dev.mysql.com/doc/refman/8.4/en/charset-binary-collations.html
371+
.put("BINARY", IndexType.BINARY)
372+
.put("VARBINARY", IndexType.BINARY)
373+
.put("TINYBLOB", IndexType.BINARY)
374+
.put("TINYTEXT", IndexType.STRING)
379375
.build();
380376

381-
private ImmutableSet<String> binaryColumnTypes = ImmutableSet.of("BINARY", "VARBINARY", "BLOB");
382-
383377
/**
384378
* Get the PadSpace attribute from {@link ResultSet} for index discovery query {@link
385379
* #getIndexDiscoveryQuery(JdbcSchemaReference)}. This method takes care of the fact that older
@@ -440,28 +434,17 @@ private ImmutableList<SourceColumnIndexInfo> getTableIndexes(
440434
// Column.
441435
String columType = normalizeColumnType(rs.getString(InformationSchemaStatsCols.TYPE_COL));
442436
IndexType indexType = INDEX_TYPE_MAPPING.getOrDefault(columType, IndexType.OTHER);
443-
444437
CollationReference collationReference = null;
445-
// Binary (and similar columns like VarBinary, Blob etc) columns have a fixed character-set
446-
// and collation called "binary".
447-
// Ref https://dev.mysql.com/doc/refman/8.4/en/charset-binary-collations.html
448-
// In information_schema.columns query, these column types show null as character set.
449-
// Ref: https://www.db-fiddle.com/f/kRVPA5jDwZYNj2rsdtif4K/3
450-
// Also for both mySQL 5.7 and 8.0 binary columns have a NO-PAD comparison.
451-
// Ref: https://www.db-fiddle.com/f/kRVPA5jDwZYNj2rsdtif4K/0.
452-
if (binaryColumnTypes.contains(columType) && characterSet == null) {
453-
characterSet = BINARY_CHARACTER_SET;
454-
collation = BINARY_COLLATION;
455-
padSpace = NO_PAD;
456-
}
457-
if (characterSet != null) {
438+
if (indexType.equals(IndexType.STRING)) {
458439
collationReference =
459440
CollationReference.builder()
460-
.setDbCharacterSet(characterSet)
461-
.setDbCollation(collation)
441+
.setDbCharacterSet(escapeMySql(characterSet))
442+
.setDbCollation(escapeMySql(collation))
462443
.setPadSpace(
463444
(padSpace == null) ? false : padSpace.trim().toUpperCase().equals(PAD_SPACE))
464445
.build();
446+
} else {
447+
stringMaxLength = null;
465448
}
466449

467450
indexesBuilder.add(
@@ -487,6 +470,15 @@ private ImmutableList<SourceColumnIndexInfo> getTableIndexes(
487470
return indexesBuilder.build();
488471
}
489472

473+
@VisibleForTesting
474+
protected static String escapeMySql(String input) {
475+
if (input.startsWith("`")) {
476+
return input;
477+
} else {
478+
return "`" + input + "`";
479+
}
480+
}
481+
490482
private SourceColumnType resultSetToSourceColumnType(ResultSet rs) throws SQLException {
491483
String colType = normalizeColumnType(rs.getString(InformationSchemaCols.TYPE_COL));
492484
long charMaxLength = rs.getLong(InformationSchemaCols.CHAR_MAX_LENGTH_COL);

v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/iowrapper/JdbcIoWrapper.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -306,7 +306,8 @@ private static TableConfig getTableConfig(
306306
.forEach(tableConfigBuilder::withPartitionColum);
307307
} else {
308308
ImmutableSet<IndexType> supportedIndexTypes =
309-
ImmutableSet.of(IndexType.NUMERIC, IndexType.STRING, IndexType.BIG_INT_UNSIGNED);
309+
ImmutableSet.of(
310+
IndexType.NUMERIC, IndexType.STRING, IndexType.BIG_INT_UNSIGNED, IndexType.BINARY);
310311
// As of now only Primary key index with Numeric type is supported.
311312
// TODO:
312313
// 1. support non-primary unique indexes.

v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/uniformsplitter/range/BoundaryExtractorFactory.java

Lines changed: 41 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import com.google.common.collect.ImmutableMap;
1919
import java.io.Serializable;
2020
import java.math.BigDecimal;
21-
import java.math.BigInteger;
2221
import java.sql.ResultSet;
2322
import java.sql.SQLException;
2423
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
@@ -27,21 +26,27 @@
2726
/** Factory to construct {@link BoundaryExtractor} for supported {@link class}. */
2827
public class BoundaryExtractorFactory {
2928

29+
public static final Class BYTE_ARRAY_CLASS = (new byte[] {}).getClass();
3030
private static final ImmutableMap<Class, BoundaryExtractor<?>> extractorMap =
3131
ImmutableMap.of(
3232
Integer.class,
33-
(BoundaryExtractor<Integer>)
34-
(partitionColumn, resultSet, boundaryTypeMapper) ->
35-
fromIntegers(partitionColumn, resultSet, boundaryTypeMapper),
33+
(BoundaryExtractor<Integer>)
34+
(partitionColumn, resultSet, boundaryTypeMapper) ->
35+
fromIntegers(partitionColumn, resultSet, boundaryTypeMapper),
3636
Long.class,
37-
(BoundaryExtractor<Long>)
38-
(partitionColumn, resultSet, boundaryTypeMapper) ->
39-
fromLongs(partitionColumn, resultSet, boundaryTypeMapper),
40-
String.class, (BoundaryExtractor<String>) BoundaryExtractorFactory::fromStrings,
41-
BigInteger.class,
42-
(BoundaryExtractor<BigInteger>)
43-
(partitionColumn, resultSet, boundaryTypeMapper) ->
44-
fromBigIntegers(partitionColumn, resultSet, boundaryTypeMapper));
37+
(BoundaryExtractor<Long>)
38+
(partitionColumn, resultSet, boundaryTypeMapper) ->
39+
fromLongs(partitionColumn, resultSet, boundaryTypeMapper),
40+
String.class,
41+
(BoundaryExtractor<String>) BoundaryExtractorFactory::fromStrings,
42+
BigDecimal.class,
43+
(BoundaryExtractor<BigDecimal>)
44+
(partitionColumn, resultSet, boundaryTypeMapper) ->
45+
fromBigDecimals(partitionColumn, resultSet, boundaryTypeMapper),
46+
BYTE_ARRAY_CLASS,
47+
(BoundaryExtractor<byte[]>)
48+
(partitionColumn, resultSet, boundaryTypeMapper) ->
49+
fromBinary(partitionColumn, resultSet, boundaryTypeMapper));
4550

4651
/**
4752
* Create a {@link BoundaryExtractor} for the required class.
@@ -90,20 +95,38 @@ private static Boundary<Long> fromLongs(
9095
.build();
9196
}
9297

93-
private static Boundary<java.math.BigInteger> fromBigIntegers(
98+
private static Boundary<BigDecimal> fromBigDecimals(
9499
PartitionColumn partitionColumn,
95100
ResultSet resultSet,
96101
@Nullable BoundaryTypeMapper boundaryTypeMapper)
97102
throws SQLException {
98-
Preconditions.checkArgument(partitionColumn.columnClass().equals(BigInteger.class));
103+
Preconditions.checkArgument(partitionColumn.columnClass().equals(BigDecimal.class));
99104
resultSet.next();
100105
BigDecimal start = resultSet.getBigDecimal(1);
101106
BigDecimal end = resultSet.getBigDecimal(2);
102-
return Boundary.<java.math.BigInteger>builder()
107+
return Boundary.<BigDecimal>builder()
103108
.setPartitionColumn(partitionColumn)
104-
.setStart(start == null ? null : start.toBigInteger())
105-
.setEnd(end == null ? null : end.toBigInteger())
106-
.setBoundarySplitter(BoundarySplitterFactory.create(BigInteger.class))
109+
.setStart(start)
110+
.setEnd(end)
111+
.setBoundarySplitter(BoundarySplitterFactory.create(BigDecimal.class))
112+
.setBoundaryTypeMapper(boundaryTypeMapper)
113+
.build();
114+
}
115+
116+
private static Boundary<byte[]> fromBinary(
117+
PartitionColumn partitionColumn,
118+
ResultSet resultSet,
119+
@Nullable BoundaryTypeMapper boundaryTypeMapper)
120+
throws SQLException {
121+
Preconditions.checkArgument(partitionColumn.columnClass().equals(BYTE_ARRAY_CLASS));
122+
resultSet.next();
123+
byte[] start = resultSet.getBytes(1);
124+
byte[] end = resultSet.getBytes(2);
125+
return Boundary.<byte[]>builder()
126+
.setPartitionColumn(partitionColumn)
127+
.setStart(start)
128+
.setEnd(end)
129+
.setBoundarySplitter(BoundarySplitterFactory.create(BYTE_ARRAY_CLASS))
107130
.setBoundaryTypeMapper(boundaryTypeMapper)
108131
.build();
109132
}

v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/uniformsplitter/range/BoundarySplitterFactory.java

Lines changed: 42 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,12 @@
1515
*/
1616
package com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.range;
1717

18+
import static com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.range.BoundaryExtractorFactory.BYTE_ARRAY_CLASS;
19+
1820
import com.google.common.base.Preconditions;
1921
import com.google.common.collect.ImmutableMap;
2022
import java.io.Serializable;
23+
import java.math.BigDecimal;
2124
import java.math.BigInteger;
2225
import org.apache.beam.sdk.transforms.DoFn;
2326

@@ -26,18 +29,27 @@ public class BoundarySplitterFactory {
2629
private static final ImmutableMap<Class, BoundarySplitter<?>> splittermap =
2730
ImmutableMap.of(
2831
Integer.class,
29-
(BoundarySplitter<Integer>)
30-
(start, end, partitionColumn, boundaryTypeMapper, processContext) ->
31-
splitIntegers(start, end),
32+
(BoundarySplitter<Integer>)
33+
(start, end, partitionColumn, boundaryTypeMapper, processContext) ->
34+
splitIntegers(start, end),
3235
Long.class,
33-
(BoundarySplitter<Long>)
34-
(start, end, partitionColumn, boundaryTypeMapper, processContext) ->
35-
splitLongs(start, end),
36+
(BoundarySplitter<Long>)
37+
(start, end, partitionColumn, boundaryTypeMapper, processContext) ->
38+
splitLongs(start, end),
3639
BigInteger.class,
37-
(BoundarySplitter<BigInteger>)
38-
(start, end, partitionColumn, boundaryTypeMapper, processContext) ->
39-
splitBigIntegers(start, end),
40-
String.class, (BoundarySplitter<String>) BoundarySplitterFactory::splitStrings);
40+
(BoundarySplitter<BigInteger>)
41+
(start, end, partitionColumn, boundaryTypeMapper, processContext) ->
42+
splitBigIntegers(start, end),
43+
BigDecimal.class,
44+
(BoundarySplitter<BigDecimal>)
45+
(start, end, partitionColumn, boundaryTypeMapper, processContext) ->
46+
splitBigDecimal(start, end),
47+
String.class,
48+
(BoundarySplitter<String>) BoundarySplitterFactory::splitStrings,
49+
BYTE_ARRAY_CLASS,
50+
(BoundarySplitter<byte[]>)
51+
(start, end, partitionColumn, boundaryTypeMapper, processContext) ->
52+
splitBytes(start, end));
4153

4254
/**
4355
* Creates {@link BoundarySplitter BoundarySplitter&lt;T&gt;} for pass class {@code c} such that
@@ -132,6 +144,26 @@ private static Long splitLongs(Long start, Long end) {
132144
return (start & end) + ((start ^ end) >> 1);
133145
}
134146

147+
private static BigDecimal splitBigDecimal(BigDecimal start, BigDecimal end) {
148+
BigInteger startBigInt = (start == null) ? null : start.toBigInteger();
149+
BigInteger endBigInt = (end == null) ? null : end.toBigInteger();
150+
BigInteger split = splitBigIntegers(startBigInt, endBigInt);
151+
if (split == null) {
152+
return null;
153+
}
154+
return new BigDecimal(split);
155+
}
156+
157+
private static byte[] splitBytes(byte[] start, byte[] end) {
158+
BigInteger startBigInt = (start == null) ? null : new BigInteger(start);
159+
BigInteger endBigInt = (end == null) ? null : new BigInteger(end);
160+
BigInteger split = splitBigIntegers(startBigInt, endBigInt);
161+
if (split == null) {
162+
return null;
163+
}
164+
return split.toByteArray();
165+
}
166+
135167
private static String splitStrings(
136168
String start,
137169
String end,

v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/schema/SourceColumnIndexInfo.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,11 @@
1616
package com.google.cloud.teleport.v2.source.reader.io.schema;
1717

1818
import com.google.auto.value.AutoValue;
19+
import com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.range.BoundaryExtractorFactory;
1920
import com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.stringmapper.CollationReference;
2021
import com.google.common.base.Preconditions;
2122
import com.google.common.collect.ImmutableMap;
22-
import java.math.BigInteger;
23+
import java.math.BigDecimal;
2324
import javax.annotation.Nullable;
2425

2526
@AutoValue
@@ -138,6 +139,7 @@ public SourceColumnIndexInfo build() {
138139
public enum IndexType {
139140
NUMERIC,
140141
BIG_INT_UNSIGNED,
142+
BINARY,
141143
STRING,
142144
DATE_TIME,
143145
OTHER
@@ -148,5 +150,6 @@ public enum IndexType {
148150
ImmutableMap.of(
149151
IndexType.NUMERIC, Long.class,
150152
IndexType.STRING, String.class,
151-
IndexType.BIG_INT_UNSIGNED, BigInteger.class);
153+
IndexType.BIG_INT_UNSIGNED, BigDecimal.class,
154+
IndexType.BINARY, BoundaryExtractorFactory.BYTE_ARRAY_CLASS);
152155
}

0 commit comments

Comments
 (0)