Skip to content

Commit 2caf29c

Browse files
authored
[Sourcedb-to-spanner] Bulk migration Mysql to spanner 1tb Load test (GoogleCloudPlatform#2063)
* [Sourcedb-to-spanner] Bulk migration Mysql to spanner 1tb Load test * Updating row counts, added static sql resources * Renaming the test and addressing comments
1 parent 587fc05 commit 2caf29c

File tree

7 files changed

+360
-19
lines changed

7 files changed

+360
-19
lines changed
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.it.jdbc;
19+
20+
/** Parent class for Static JDBC Resources. */
21+
public abstract class StaticJDBCResource {
22+
private final String hostname;
23+
private final String username;
24+
private final String password;
25+
private final int port;
26+
27+
private final String database;
28+
29+
enum SourceType {
30+
ORACLE,
31+
MYSQL,
32+
POSTGRESQL,
33+
}
34+
35+
StaticJDBCResource(org.apache.beam.it.jdbc.StaticJDBCResource.Builder<?> builder) {
36+
this.hostname = builder.hostname;
37+
this.username = builder.username;
38+
this.password = builder.password;
39+
this.port = builder.port;
40+
this.database = builder.database;
41+
}
42+
43+
public abstract org.apache.beam.it.jdbc.StaticJDBCResource.SourceType type();
44+
45+
public String hostname() {
46+
return this.hostname;
47+
}
48+
49+
public String username() {
50+
return this.username;
51+
}
52+
53+
public String password() {
54+
return this.password;
55+
}
56+
57+
public int port() {
58+
return this.port;
59+
}
60+
61+
public String database() {
62+
return this.database;
63+
}
64+
65+
public abstract String getJDBCPrefix();
66+
67+
// TODO: exclude the StaticJDBCResource files from the codecov checks
68+
public String getconnectionURL() {
69+
return String.format("jdbc:%s://%s:%d/%s", getJDBCPrefix(), hostname, port, database);
70+
}
71+
72+
public abstract static class Builder<T extends org.apache.beam.it.jdbc.StaticJDBCResource> {
73+
private final String hostname;
74+
private final String username;
75+
private final String password;
76+
private final int port;
77+
78+
private final String database;
79+
80+
public Builder(String hostname, String username, String password, int port, String database) {
81+
this.hostname = hostname;
82+
this.username = username;
83+
this.password = password;
84+
this.port = port;
85+
this.database = database;
86+
}
87+
88+
public abstract T build();
89+
}
90+
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.it.jdbc;
19+
20+
/**
21+
* Client for static MySQL resource.
22+
*
23+
* <p>Subclass of {@link StaticJDBCResource}.
24+
*/
25+
public class StaticMySQLResource extends StaticJDBCResource {
26+
27+
StaticMySQLResource(Builder builder) {
28+
super(builder);
29+
}
30+
31+
@Override
32+
public SourceType type() {
33+
return SourceType.MYSQL;
34+
}
35+
36+
@Override
37+
public String getJDBCPrefix() {
38+
return "mysql";
39+
}
40+
41+
public static Builder builder(
42+
String hostname, String username, String password, int port, String database) {
43+
return new Builder(hostname, username, password, port, database);
44+
}
45+
46+
/** Builder for {@link StaticMySQLResource}. */
47+
public static class Builder extends StaticJDBCResource.Builder<StaticMySQLResource> {
48+
public Builder(String hostname, String username, String password, int port, String database) {
49+
super(hostname, username, password, port, database);
50+
}
51+
52+
@Override
53+
public StaticMySQLResource build() {
54+
return new StaticMySQLResource(this);
55+
}
56+
}
57+
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.it.jdbc;
19+
20+
/**
21+
* Client for Postgresql resource used by Datastream.
22+
*
23+
* <p>Subclass of {@link StaticJDBCResource}.
24+
*/
25+
public class StaticPostgresqlResource extends StaticJDBCResource {
26+
27+
StaticPostgresqlResource(Builder builder) {
28+
super(builder);
29+
}
30+
31+
public static Builder builder(
32+
String hostname, String username, String password, int port, String database) {
33+
return new Builder(hostname, username, password, port, database);
34+
}
35+
36+
@Override
37+
public SourceType type() {
38+
return SourceType.POSTGRESQL;
39+
}
40+
41+
@Override
42+
public String getJDBCPrefix() {
43+
return "postgresql";
44+
}
45+
46+
/** Builder for {@link StaticPostgresqlResource}. */
47+
public static class Builder extends StaticJDBCResource.Builder<StaticPostgresqlResource> {
48+
49+
public Builder(String hostname, String username, String password, int port, String database) {
50+
super(hostname, username, password, port, database);
51+
}
52+
53+
@Override
54+
public StaticPostgresqlResource build() {
55+
return new StaticPostgresqlResource(this);
56+
}
57+
}
58+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
/** Package for managing Static sql resources. */
20+
package org.apache.beam.it.jdbc;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Copyright (C) 2024 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package com.google.cloud.teleport.v2.templates.loadtesting;
17+
18+
import com.google.cloud.teleport.metadata.TemplateLoadTest;
19+
import com.google.cloud.teleport.v2.source.reader.io.jdbc.iowrapper.config.SQLDialect;
20+
import com.google.cloud.teleport.v2.templates.SourceDbToSpanner;
21+
import java.io.IOException;
22+
import java.text.ParseException;
23+
import java.util.HashMap;
24+
import java.util.Map;
25+
import org.junit.Test;
26+
import org.junit.experimental.categories.Category;
27+
import org.junit.runner.RunWith;
28+
import org.junit.runners.JUnit4;
29+
30+
@Category(TemplateLoadTest.class)
31+
@TemplateLoadTest(SourceDbToSpanner.class)
32+
@RunWith(JUnit4.class)
33+
public class MySQLSourceDbToSpannerLT extends SourceDbToSpannerLTBase {
34+
35+
@Test
36+
public void mySQLToSpannerBulk1TBTest() throws IOException, ParseException, InterruptedException {
37+
String username =
38+
accessSecret(
39+
"projects/269744978479/secrets/nokill-sourcedb-mysql-to-spanner-cloudsql-username/versions/1");
40+
String password =
41+
accessSecret(
42+
"projects/269744978479/secrets/nokill-sourcedb-mysql-to-spanner-cloudsql-password/versions/1");
43+
String database = "3tables10cols";
44+
String host =
45+
accessSecret(
46+
"projects/269744978479/secrets/nokill-sourcedb-mysql-to-spanner-cloudsql-ip-address/versions/1");
47+
int port = 3306;
48+
49+
setUp(SQLDialect.MYSQL, host, port, username, password, database);
50+
createSpannerDDL("SourceDbToSpannerLT/mysql-spanner-schema.sql");
51+
52+
Map<String, Integer> expectedCountPerTable =
53+
new HashMap<>() {
54+
{
55+
put("table1", 4255685);
56+
put("table2", 10489500);
57+
}
58+
};
59+
60+
runLoadTest(expectedCountPerTable);
61+
}
62+
}

v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/loadtesting/SourceDbToSpannerLTBase.java

Lines changed: 32 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,9 @@
3939
import org.apache.beam.it.gcp.spanner.SpannerResourceManager;
4040
import org.apache.beam.it.gcp.spanner.conditions.SpannerRowsCheck;
4141
import org.apache.beam.it.gcp.storage.GcsResourceManager;
42-
import org.apache.beam.it.jdbc.AbstractJDBCResourceManager;
43-
import org.apache.beam.it.jdbc.PostgresResourceManager;
42+
import org.apache.beam.it.jdbc.StaticJDBCResource;
43+
import org.apache.beam.it.jdbc.StaticMySQLResource;
44+
import org.apache.beam.it.jdbc.StaticPostgresqlResource;
4445
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.Resources;
4546
import org.junit.After;
4647

@@ -52,14 +53,20 @@ public class SourceDbToSpannerLTBase extends TemplateLoadTestBase {
5253

5354
private static final int MAX_WORKERS = 100;
5455

55-
private static final int NUM_WORKERS = 10;
56-
private static final Duration JOB_TIMEOUT = Duration.ofHours(4);
56+
private static final int NUM_WORKERS = 20;
57+
58+
private static final String WORKER_MACHINE_TYPE = "n1-highmem-96";
59+
private static final String LAUNCHER_MACHINE_TYPE = "n1-highmem-64";
60+
61+
private static final String FETCH_SIZE = "8000";
62+
63+
private static final Duration JOB_TIMEOUT = Duration.ofHours(3);
5764
private static final Duration CHECK_INTERVAL = Duration.ofMinutes(5);
5865
private static final Duration DONE_TIMEOUT = Duration.ofMinutes(20);
5966

6067
private SQLDialect dialect;
6168
private GcsResourceManager gcsResourceManager;
62-
private AbstractJDBCResourceManager<?> sourceDatabaseResourceManager;
69+
private StaticJDBCResource sourceDatabaseResource;
6370
private SpannerResourceManager spannerResourceManager;
6471

6572
private final String artifactBucket;
@@ -92,14 +99,11 @@ public void setUp(
9299
GcsResourceManager.builder(artifactBucket, getClass().getSimpleName(), CREDENTIALS).build();
93100

94101
if (dialect == SQLDialect.POSTGRESQL) {
95-
sourceDatabaseResourceManager =
96-
PostgresResourceManager.builder(testName)
97-
.setUsername(username)
98-
.setPassword(password)
99-
.setDatabaseName(database)
100-
.setHost(host)
101-
.setPort(port)
102-
.build();
102+
sourceDatabaseResource =
103+
new StaticPostgresqlResource.Builder(host, username, password, port, database).build();
104+
} else if (dialect == SQLDialect.MYSQL) {
105+
sourceDatabaseResource =
106+
new StaticMySQLResource.Builder(host, username, password, port, database).build();
103107
} else {
104108
throw new IllegalArgumentException("Dialect " + dialect + " not supported");
105109
}
@@ -142,11 +146,13 @@ public void runLoadTest(
142146
put("instanceId", spannerResourceManager.getInstanceId());
143147
put("databaseId", spannerResourceManager.getDatabaseId());
144148
put("sourceDbDialect", dialect.name());
145-
put("sourceConfigURL", sourceDatabaseResourceManager.getUri());
146-
put("username", sourceDatabaseResourceManager.getUsername());
147-
put("password", sourceDatabaseResourceManager.getPassword());
149+
put("sourceConfigURL", sourceDatabaseResource.getconnectionURL());
150+
put("username", sourceDatabaseResource.username());
151+
put("password", sourceDatabaseResource.password());
148152
put("outputDirectory", "gs://" + artifactBucket + "/" + outputDirectory);
149153
put("jdbcDriverClassName", driverClassName());
154+
put("fetchSize", FETCH_SIZE);
155+
put("workerMachineType", WORKER_MACHINE_TYPE);
150156
}
151157
};
152158
params.putAll(templateParameters);
@@ -156,6 +162,7 @@ public void runLoadTest(
156162
LaunchConfig.builder(getClass().getSimpleName(), SPEC_PATH)
157163
.addEnvironment("maxWorkers", MAX_WORKERS)
158164
.addEnvironment("numWorkers", NUM_WORKERS)
165+
.addEnvironment("launcherMachineType", LAUNCHER_MACHINE_TYPE)
159166
.setParameters(params);
160167
environmentOptions.forEach(options::addEnvironment);
161168

@@ -182,8 +189,15 @@ public void runLoadTest(
182189
result = pipelineOperator.waitUntilDone(createConfig(jobInfo, DONE_TIMEOUT));
183190
assertThatResult(result).isLaunchFinished();
184191

192+
Map<String, Double> metrics = getMetrics(jobInfo);
193+
populateResourceManagerMetrics(metrics);
194+
185195
// Export results
186-
exportMetricsToBigQuery(jobInfo, getMetrics(jobInfo));
196+
exportMetricsToBigQuery(jobInfo, metrics);
197+
}
198+
199+
public void populateResourceManagerMetrics(Map<String, Double> metrics) {
200+
spannerResourceManager.collectMetrics(metrics);
187201
}
188202

189203
/**
@@ -193,8 +207,7 @@ public void runLoadTest(
193207
*/
194208
@After
195209
public void cleanUp() throws IOException {
196-
ResourceManagerUtils.cleanResources(
197-
spannerResourceManager, sourceDatabaseResourceManager, gcsResourceManager);
210+
ResourceManagerUtils.cleanResources(spannerResourceManager, gcsResourceManager);
198211
}
199212

200213
private String driverClassName() {

0 commit comments

Comments
 (0)