Skip to content

Commit 9209310

Browse files
committed
Update ChangeStreamDao to query different TVF for postgresSQL based on
the change stream partition mode For MUTABLE_KEY_RANGE change stream, use read_proto_bytes_, else use read_json_
1 parent 01e1cf6 commit 9209310

File tree

6 files changed

+275
-8
lines changed

6 files changed

+275
-8
lines changed

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java

Lines changed: 63 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@
4646
import com.google.cloud.spanner.Options;
4747
import com.google.cloud.spanner.Options.RpcPriority;
4848
import com.google.cloud.spanner.PartitionOptions;
49+
import com.google.cloud.spanner.ReadOnlyTransaction;
50+
import com.google.cloud.spanner.ResultSet;
4951
import com.google.cloud.spanner.Spanner;
5052
import com.google.cloud.spanner.SpannerException;
5153
import com.google.cloud.spanner.SpannerOptions;
@@ -1998,6 +2000,11 @@ && getInclusiveStartAt().toSqlTimestamp().after(getInclusiveEndAt().toSqlTimesta
19982000
final MapperFactory mapperFactory = new MapperFactory(changeStreamDatabaseDialect);
19992001
final ChangeStreamMetrics metrics = new ChangeStreamMetrics();
20002002
final RpcPriority rpcPriority = MoreObjects.firstNonNull(getRpcPriority(), RpcPriority.HIGH);
2003+
final SpannerAccessor spannerAccessor =
2004+
SpannerAccessor.getOrCreate(changeStreamSpannerConfig);
2005+
final boolean isMutableChangeStream =
2006+
isMutableChangeStream(
2007+
spannerAccessor.getDatabaseClient(), changeStreamDatabaseDialect, changeStreamName);
20012008
final DaoFactory daoFactory =
20022009
new DaoFactory(
20032010
changeStreamSpannerConfig,
@@ -2007,7 +2014,8 @@ && getInclusiveStartAt().toSqlTimestamp().after(getInclusiveEndAt().toSqlTimesta
20072014
rpcPriority,
20082015
input.getPipeline().getOptions().getJobName(),
20092016
changeStreamDatabaseDialect,
2010-
metadataDatabaseDialect);
2017+
metadataDatabaseDialect,
2018+
isMutableChangeStream);
20112019
final ActionFactory actionFactory = new ActionFactory();
20122020

20132021
final Duration watermarkRefreshRate =
@@ -2688,4 +2696,58 @@ static String resolveSpannerProjectId(SpannerConfig config) {
26882696
? SpannerOptions.getDefaultProjectId()
26892697
: config.getProjectId().get();
26902698
}
2699+
2700+
@VisibleForTesting
2701+
static boolean isMutableChangeStream(
2702+
DatabaseClient databaseClient, Dialect dialect, String changeStreamName) {
2703+
String fetchedPartitionMode = fetchPartitionMode(databaseClient, dialect, changeStreamName);
2704+
if (fetchedPartitionMode.isEmpty()
2705+
|| fetchedPartitionMode.equalsIgnoreCase("IMMUTABLE_KEY_RANGE")) {
2706+
return false;
2707+
}
2708+
return true;
2709+
}
2710+
2711+
private static String fetchPartitionMode(
2712+
DatabaseClient databaseClient, Dialect dialect, String changeStreamName) {
2713+
try (ReadOnlyTransaction tx = databaseClient.readOnlyTransaction()) {
2714+
Statement statement;
2715+
if (dialect == Dialect.POSTGRESQL) {
2716+
statement =
2717+
Statement.newBuilder(
2718+
"select option_value\n"
2719+
+ "from information_schema.change_stream_options\n"
2720+
+ "where change_stream_name = $1 and option_name = 'partition_mode'")
2721+
.bind("p1")
2722+
.to(changeStreamName)
2723+
.build();
2724+
} else {
2725+
statement =
2726+
Statement.newBuilder(
2727+
"select option_value\n"
2728+
+ "from information_schema.change_stream_options\n"
2729+
+ "where change_stream_name = @changeStreamName and option_name = 'partition_mode'")
2730+
.bind("changeStreamName")
2731+
.to(changeStreamName)
2732+
.build();
2733+
}
2734+
ResultSet resultSet = tx.executeQuery(statement);
2735+
while (resultSet.next()) {
2736+
String value = resultSet.getString(0);
2737+
if (value != null) {
2738+
return value;
2739+
}
2740+
}
2741+
return "";
2742+
} catch (RuntimeException e) {
2743+
// Log the failure (with stack trace) but rethrow so the caller still observes
2744+
// the error.
2745+
LOG.warn(
2746+
"Failed to fetch partition_mode for change stream '{}', dialect={} - will propagate exception",
2747+
changeStreamName,
2748+
dialect,
2749+
e);
2750+
throw e;
2751+
}
2752+
}
26912753
}

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/ChangeStreamDao.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,12 @@
3131
* as a {@link ResultSet}, which can be consumed until the stream is finished.
3232
*/
3333
public class ChangeStreamDao {
34-
3534
private final String changeStreamName;
3635
private final DatabaseClient databaseClient;
3736
private final RpcPriority rpcPriority;
3837
private final String jobName;
3938
private final Dialect dialect;
39+
private final boolean isMutableChangeStream;
4040

4141
/**
4242
* Constructs a change stream dao. All the queries performed by this class will be for the given
@@ -53,12 +53,14 @@ public class ChangeStreamDao {
5353
DatabaseClient databaseClient,
5454
RpcPriority rpcPriority,
5555
String jobName,
56-
Dialect dialect) {
56+
Dialect dialect,
57+
boolean isMutableChangeStream) {
5758
this.changeStreamName = changeStreamName;
5859
this.databaseClient = databaseClient;
5960
this.rpcPriority = rpcPriority;
6061
this.jobName = jobName;
6162
this.dialect = dialect;
63+
this.isMutableChangeStream = isMutableChangeStream;
6264
}
6365

6466
/**
@@ -91,8 +93,18 @@ public ChangeStreamResultSet changeStreamQuery(
9193
String query = "";
9294
Statement statement;
9395
if (this.isPostgres()) {
94-
query =
95-
"SELECT * FROM \"spanner\".\"read_json_" + changeStreamName + "\"($1, $2, $3, $4, null)";
96+
// Ensure we have determined whether change stream uses mutable key range
97+
if (this.isMutableChangeStream) {
98+
query =
99+
"SELECT * FROM \"spanner\".\"read_proto_bytes_"
100+
+ changeStreamName
101+
+ "\"($1, $2, $3, $4, null)";
102+
} else {
103+
query =
104+
"SELECT * FROM \"spanner\".\"read_json_"
105+
+ changeStreamName
106+
+ "\"($1, $2, $3, $4, null)";
107+
}
96108
statement =
97109
Statement.newBuilder(query)
98110
.bind("p1")

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/DaoFactory.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ public class DaoFactory implements Serializable {
4949
private final String jobName;
5050
private final Dialect spannerChangeStreamDatabaseDialect;
5151
private final Dialect metadataDatabaseDialect;
52+
private final boolean isMutableChangeStream;
5253

5354
/**
5455
* Constructs a {@link DaoFactory} with the configuration to be used for the underlying instances.
@@ -68,7 +69,8 @@ public DaoFactory(
6869
RpcPriority rpcPriority,
6970
String jobName,
7071
Dialect spannerChangeStreamDatabaseDialect,
71-
Dialect metadataDatabaseDialect) {
72+
Dialect metadataDatabaseDialect,
73+
boolean isMutableChangeStream) {
7274
if (metadataSpannerConfig.getInstanceId() == null) {
7375
throw new IllegalArgumentException("Metadata instance can not be null");
7476
}
@@ -83,6 +85,7 @@ public DaoFactory(
8385
this.jobName = jobName;
8486
this.spannerChangeStreamDatabaseDialect = spannerChangeStreamDatabaseDialect;
8587
this.metadataDatabaseDialect = metadataDatabaseDialect;
88+
this.isMutableChangeStream = isMutableChangeStream;
8689
}
8790

8891
/**
@@ -143,7 +146,8 @@ public synchronized ChangeStreamDao getChangeStreamDao() {
143146
spannerAccessor.getDatabaseClient(),
144147
rpcPriority,
145148
jobName,
146-
this.spannerChangeStreamDatabaseDialect);
149+
this.spannerChangeStreamDatabaseDialect,
150+
this.isMutableChangeStream);
147151
}
148152
return changeStreamDaoInstance;
149153
}

sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOReadChangeStreamTest.java

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,18 @@
1919

2020
import static org.junit.Assert.assertEquals;
2121
import static org.junit.Assert.assertNull;
22+
import static org.mockito.ArgumentMatchers.any;
23+
import static org.mockito.Mockito.any;
24+
import static org.mockito.Mockito.when;
2225

2326
import com.google.auth.Credentials;
2427
import com.google.cloud.Timestamp;
28+
import com.google.cloud.spanner.DatabaseClient;
29+
import com.google.cloud.spanner.Dialect;
2530
import com.google.cloud.spanner.Options.RpcPriority;
31+
import com.google.cloud.spanner.ReadOnlyTransaction;
32+
import com.google.cloud.spanner.ResultSet;
33+
import com.google.cloud.spanner.Statement;
2634
import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
2735
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
2836
import org.apache.beam.sdk.io.gcp.spanner.changestreams.MetadataSpannerConfigFactory;
@@ -32,6 +40,7 @@
3240
import org.junit.Test;
3341
import org.junit.runner.RunWith;
3442
import org.junit.runners.JUnit4;
43+
import org.mockito.Mockito;
3544

3645
@RunWith(JUnit4.class)
3746
public class SpannerIOReadChangeStreamTest {
@@ -120,7 +129,8 @@ public void testSetSpannerConfigCredential() {
120129

121130
@Test
122131
public void testWithDefaultCredential() {
123-
// Get the default credential, without setting any credentials in the pipeline options or
132+
// Get the default credential, without setting any credentials in the pipeline
133+
// options or
124134
// SpannerConfig.
125135
Credentials defaultCredential =
126136
testPipeline.getOptions().as(GcpOptions.class).getGcpCredential();
@@ -140,4 +150,53 @@ public void testWithDefaultCredential() {
140150
assertEquals(defaultCredential, changeStreamSpannerConfigWithCredential.getCredentials().get());
141151
assertEquals(defaultCredential, metadataSpannerConfigWithCredential.getCredentials().get());
142152
}
153+
154+
@Test
155+
public void testFetchPartitionModeGoogleSql() {
156+
DatabaseClient databaseClient = Mockito.mock(DatabaseClient.class);
157+
ReadOnlyTransaction transaction = Mockito.mock(ReadOnlyTransaction.class);
158+
ResultSet resultSet = Mockito.mock(ResultSet.class);
159+
160+
when(databaseClient.readOnlyTransaction()).thenReturn(transaction);
161+
when(transaction.executeQuery(any(Statement.class))).thenReturn(resultSet);
162+
when(resultSet.next()).thenReturn(true).thenReturn(false);
163+
when(resultSet.getString(0)).thenReturn("MUTABLE_KEY_RANGE");
164+
165+
assertEquals(
166+
true,
167+
SpannerIO.isMutableChangeStream(
168+
databaseClient, Dialect.GOOGLE_STANDARD_SQL, TEST_CHANGE_STREAM));
169+
}
170+
171+
@Test
172+
public void testFetchPartitionModePostgres() {
173+
DatabaseClient databaseClient = Mockito.mock(DatabaseClient.class);
174+
ReadOnlyTransaction transaction = Mockito.mock(ReadOnlyTransaction.class);
175+
ResultSet resultSet = Mockito.mock(ResultSet.class);
176+
177+
when(databaseClient.readOnlyTransaction()).thenReturn(transaction);
178+
when(transaction.executeQuery(any(Statement.class))).thenReturn(resultSet);
179+
when(resultSet.next()).thenReturn(true).thenReturn(false);
180+
when(resultSet.getString(0)).thenReturn("IMMUTABLE_KEY_RANGE");
181+
182+
assertEquals(
183+
false,
184+
SpannerIO.isMutableChangeStream(databaseClient, Dialect.POSTGRESQL, TEST_CHANGE_STREAM));
185+
}
186+
187+
@Test
188+
public void testFetchPartitionModeNotFound() {
189+
DatabaseClient databaseClient = Mockito.mock(DatabaseClient.class);
190+
ReadOnlyTransaction transaction = Mockito.mock(ReadOnlyTransaction.class);
191+
ResultSet resultSet = Mockito.mock(ResultSet.class);
192+
193+
when(databaseClient.readOnlyTransaction()).thenReturn(transaction);
194+
when(transaction.executeQuery(any(Statement.class))).thenReturn(resultSet);
195+
when(resultSet.next()).thenReturn(false);
196+
197+
assertEquals(
198+
false,
199+
SpannerIO.isMutableChangeStream(
200+
databaseClient, Dialect.GOOGLE_STANDARD_SQL, TEST_CHANGE_STREAM));
201+
}
143202
}

sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/SpannerChangeStreamErrorTest.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -326,6 +326,7 @@ public void testInvalidRecordReceivedWithDefaultSettings() {
326326
Timestamp.ofTimeSecondsAndNanos(startTimestamp.getSeconds(), startTimestamp.getNanos() + 1);
327327

328328
mockGetDialect();
329+
mockChangeStreamOptions();
329330
mockTableExists();
330331
mockGetWatermark(startTimestamp);
331332
ResultSet getPartitionResultSet = mockGetParentPartition(startTimestamp, endTimestamp);
@@ -377,6 +378,38 @@ public void testInvalidRecordReceivedWithDefaultSettings() {
377378
}
378379
}
379380

381+
private void mockChangeStreamOptions() {
382+
Statement changeStreamOptionsStatement =
383+
Statement.newBuilder(
384+
"select option_value\n"
385+
+ "from information_schema.change_stream_options\n"
386+
+ "where change_stream_name = @changeStreamName and option_name = 'partition_mode'")
387+
.bind("changeStreamName")
388+
.to(TEST_CHANGE_STREAM)
389+
.build();
390+
ResultSetMetadata changeStreamOptionsResultSetMetadata =
391+
ResultSetMetadata.newBuilder()
392+
.setRowType(
393+
StructType.newBuilder()
394+
.addFields(
395+
Field.newBuilder()
396+
.setName("option_value")
397+
.setType(Type.newBuilder().setCode(TypeCode.STRING).build())
398+
.build())
399+
.build())
400+
.build();
401+
ResultSet changeStreamOptionsResultSet =
402+
ResultSet.newBuilder()
403+
.addRows(
404+
ListValue.newBuilder()
405+
.addValues(Value.newBuilder().setStringValue("NEW_VALUES").build())
406+
.build())
407+
.setMetadata(changeStreamOptionsResultSetMetadata)
408+
.build();
409+
mockSpannerService.putPartialStatementResult(
410+
StatementResult.query(changeStreamOptionsStatement, changeStreamOptionsResultSet));
411+
}
412+
380413
private void mockInvalidChangeStreamRecordReceived(Timestamp now, Timestamp after3Seconds) {
381414
Statement changeStreamQueryStatement =
382415
Statement.newBuilder(

0 commit comments

Comments
 (0)