Skip to content

Commit d5f8b77

Browse files
committed
Update ChangeStreamDao to query different TVF for postgresSQL based on
the change stream partition mode For MUTABLE_KEY_RANGE change stream, use read_proto_bytes_, else use read_json_
1 parent 01e1cf6 commit d5f8b77

File tree

5 files changed

+242
-8
lines changed

5 files changed

+242
-8
lines changed

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java

Lines changed: 63 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@
4646
import com.google.cloud.spanner.Options;
4747
import com.google.cloud.spanner.Options.RpcPriority;
4848
import com.google.cloud.spanner.PartitionOptions;
49+
import com.google.cloud.spanner.ReadOnlyTransaction;
50+
import com.google.cloud.spanner.ResultSet;
4951
import com.google.cloud.spanner.Spanner;
5052
import com.google.cloud.spanner.SpannerException;
5153
import com.google.cloud.spanner.SpannerOptions;
@@ -1998,6 +2000,11 @@ && getInclusiveStartAt().toSqlTimestamp().after(getInclusiveEndAt().toSqlTimesta
19982000
final MapperFactory mapperFactory = new MapperFactory(changeStreamDatabaseDialect);
19992001
final ChangeStreamMetrics metrics = new ChangeStreamMetrics();
20002002
final RpcPriority rpcPriority = MoreObjects.firstNonNull(getRpcPriority(), RpcPriority.HIGH);
2003+
final SpannerAccessor spannerAccessor =
2004+
SpannerAccessor.getOrCreate(changeStreamSpannerConfig);
2005+
final boolean isMutableChangeStream =
2006+
isMutableChangeStream(
2007+
spannerAccessor.getDatabaseClient(), changeStreamDatabaseDialect, changeStreamName);
20012008
final DaoFactory daoFactory =
20022009
new DaoFactory(
20032010
changeStreamSpannerConfig,
@@ -2007,7 +2014,8 @@ && getInclusiveStartAt().toSqlTimestamp().after(getInclusiveEndAt().toSqlTimesta
20072014
rpcPriority,
20082015
input.getPipeline().getOptions().getJobName(),
20092016
changeStreamDatabaseDialect,
2010-
metadataDatabaseDialect);
2017+
metadataDatabaseDialect,
2018+
isMutableChangeStream);
20112019
final ActionFactory actionFactory = new ActionFactory();
20122020

20132021
final Duration watermarkRefreshRate =
@@ -2688,4 +2696,58 @@ static String resolveSpannerProjectId(SpannerConfig config) {
26882696
? SpannerOptions.getDefaultProjectId()
26892697
: config.getProjectId().get();
26902698
}
2699+
2700+
@VisibleForTesting
2701+
static boolean isMutableChangeStream(
2702+
DatabaseClient databaseClient, Dialect dialect, String changeStreamName) {
2703+
String fetchedPartitionMode = fetchPartitionMode(databaseClient, dialect, changeStreamName);
2704+
if (fetchedPartitionMode.isEmpty()
2705+
|| fetchedPartitionMode.equalsIgnoreCase("IMMUTABLE_KEY_RANGE")) {
2706+
return false;
2707+
}
2708+
return true;
2709+
}
2710+
2711+
private static String fetchPartitionMode(
2712+
DatabaseClient databaseClient, Dialect dialect, String changeStreamName) {
2713+
try (ReadOnlyTransaction tx = databaseClient.readOnlyTransaction()) {
2714+
Statement statement;
2715+
if (dialect == Dialect.POSTGRESQL) {
2716+
statement =
2717+
Statement.newBuilder(
2718+
"select option_value\n"
2719+
+ "from information_schema.change_stream_options\n"
2720+
+ "where change_stream_name = $1 and option_name = 'partition_mode'")
2721+
.bind("p1")
2722+
.to(changeStreamName)
2723+
.build();
2724+
} else {
2725+
statement =
2726+
Statement.newBuilder(
2727+
"select option_value\n"
2728+
+ "from information_schema.change_stream_options\n"
2729+
+ "where change_stream_name = @changeStreamName and option_name = 'partition_mode'")
2730+
.bind("changeStreamName")
2731+
.to(changeStreamName)
2732+
.build();
2733+
}
2734+
ResultSet resultSet = tx.executeQuery(statement);
2735+
while (resultSet.next()) {
2736+
String value = resultSet.getString(0);
2737+
if (value != null) {
2738+
return value;
2739+
}
2740+
}
2741+
return "";
2742+
} catch (RuntimeException e) {
2743+
// Log the failure (with stack trace) but rethrow so the caller still observes
2744+
// the error.
2745+
LOG.warn(
2746+
"Failed to fetch partition_mode for change stream '{}', dialect={} - will propagate exception",
2747+
changeStreamName,
2748+
dialect,
2749+
e);
2750+
throw e;
2751+
}
2752+
}
26912753
}

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/ChangeStreamDao.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,12 @@
3131
* as a {@link ResultSet}, which can be consumed until the stream is finished.
3232
*/
3333
public class ChangeStreamDao {
34-
3534
private final String changeStreamName;
3635
private final DatabaseClient databaseClient;
3736
private final RpcPriority rpcPriority;
3837
private final String jobName;
3938
private final Dialect dialect;
39+
private final boolean isMutableChangeStream;
4040

4141
/**
4242
* Constructs a change stream dao. All the queries performed by this class will be for the given
@@ -53,12 +53,14 @@ public class ChangeStreamDao {
5353
DatabaseClient databaseClient,
5454
RpcPriority rpcPriority,
5555
String jobName,
56-
Dialect dialect) {
56+
Dialect dialect,
57+
boolean isMutableChangeStream) {
5758
this.changeStreamName = changeStreamName;
5859
this.databaseClient = databaseClient;
5960
this.rpcPriority = rpcPriority;
6061
this.jobName = jobName;
6162
this.dialect = dialect;
63+
this.isMutableChangeStream = isMutableChangeStream;
6264
}
6365

6466
/**
@@ -91,8 +93,18 @@ public ChangeStreamResultSet changeStreamQuery(
9193
String query = "";
9294
Statement statement;
9395
if (this.isPostgres()) {
94-
query =
95-
"SELECT * FROM \"spanner\".\"read_json_" + changeStreamName + "\"($1, $2, $3, $4, null)";
96+
// Ensure we have determined whether change stream uses mutable key range
97+
if (this.isMutableChangeStream) {
98+
query =
99+
"SELECT * FROM \"spanner\".\"read_proto_bytes_"
100+
+ changeStreamName
101+
+ "\"($1, $2, $3, $4, null)";
102+
} else {
103+
query =
104+
"SELECT * FROM \"spanner\".\"read_json_"
105+
+ changeStreamName
106+
+ "\"($1, $2, $3, $4, null)";
107+
}
96108
statement =
97109
Statement.newBuilder(query)
98110
.bind("p1")

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/DaoFactory.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ public class DaoFactory implements Serializable {
4949
private final String jobName;
5050
private final Dialect spannerChangeStreamDatabaseDialect;
5151
private final Dialect metadataDatabaseDialect;
52+
private final boolean isMutableChangeStream;
5253

5354
/**
5455
* Constructs a {@link DaoFactory} with the configuration to be used for the underlying instances.
@@ -68,7 +69,8 @@ public DaoFactory(
6869
RpcPriority rpcPriority,
6970
String jobName,
7071
Dialect spannerChangeStreamDatabaseDialect,
71-
Dialect metadataDatabaseDialect) {
72+
Dialect metadataDatabaseDialect,
73+
boolean isMutableChangeStream) {
7274
if (metadataSpannerConfig.getInstanceId() == null) {
7375
throw new IllegalArgumentException("Metadata instance can not be null");
7476
}
@@ -83,6 +85,7 @@ public DaoFactory(
8385
this.jobName = jobName;
8486
this.spannerChangeStreamDatabaseDialect = spannerChangeStreamDatabaseDialect;
8587
this.metadataDatabaseDialect = metadataDatabaseDialect;
88+
this.isMutableChangeStream = isMutableChangeStream;
8689
}
8790

8891
/**
@@ -143,7 +146,8 @@ public synchronized ChangeStreamDao getChangeStreamDao() {
143146
spannerAccessor.getDatabaseClient(),
144147
rpcPriority,
145148
jobName,
146-
this.spannerChangeStreamDatabaseDialect);
149+
this.spannerChangeStreamDatabaseDialect,
150+
this.isMutableChangeStream);
147151
}
148152
return changeStreamDaoInstance;
149153
}

sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOReadChangeStreamTest.java

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,18 @@
1919

2020
import static org.junit.Assert.assertEquals;
2121
import static org.junit.Assert.assertNull;
22+
import static org.mockito.ArgumentMatchers.any;
23+
import static org.mockito.Mockito.any;
24+
import static org.mockito.Mockito.when;
2225

2326
import com.google.auth.Credentials;
2427
import com.google.cloud.Timestamp;
28+
import com.google.cloud.spanner.DatabaseClient;
29+
import com.google.cloud.spanner.Dialect;
2530
import com.google.cloud.spanner.Options.RpcPriority;
31+
import com.google.cloud.spanner.ReadOnlyTransaction;
32+
import com.google.cloud.spanner.ResultSet;
33+
import com.google.cloud.spanner.Statement;
2634
import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
2735
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
2836
import org.apache.beam.sdk.io.gcp.spanner.changestreams.MetadataSpannerConfigFactory;
@@ -32,6 +40,7 @@
3240
import org.junit.Test;
3341
import org.junit.runner.RunWith;
3442
import org.junit.runners.JUnit4;
43+
import org.mockito.Mockito;
3544

3645
@RunWith(JUnit4.class)
3746
public class SpannerIOReadChangeStreamTest {
@@ -120,7 +129,8 @@ public void testSetSpannerConfigCredential() {
120129

121130
@Test
122131
public void testWithDefaultCredential() {
123-
// Get the default credential, without setting any credentials in the pipeline options or
132+
// Get the default credential, without setting any credentials in the pipeline
133+
// options or
124134
// SpannerConfig.
125135
Credentials defaultCredential =
126136
testPipeline.getOptions().as(GcpOptions.class).getGcpCredential();
@@ -140,4 +150,53 @@ public void testWithDefaultCredential() {
140150
assertEquals(defaultCredential, changeStreamSpannerConfigWithCredential.getCredentials().get());
141151
assertEquals(defaultCredential, metadataSpannerConfigWithCredential.getCredentials().get());
142152
}
153+
154+
@Test
155+
public void testFetchPartitionModeGoogleSql() {
156+
DatabaseClient databaseClient = Mockito.mock(DatabaseClient.class);
157+
ReadOnlyTransaction transaction = Mockito.mock(ReadOnlyTransaction.class);
158+
ResultSet resultSet = Mockito.mock(ResultSet.class);
159+
160+
when(databaseClient.readOnlyTransaction()).thenReturn(transaction);
161+
when(transaction.executeQuery(any(Statement.class))).thenReturn(resultSet);
162+
when(resultSet.next()).thenReturn(true).thenReturn(false);
163+
when(resultSet.getString(0)).thenReturn("MUTABLE_KEY_RANGE");
164+
165+
assertEquals(
166+
true,
167+
SpannerIO.isMutableChangeStream(
168+
databaseClient, Dialect.GOOGLE_STANDARD_SQL, TEST_CHANGE_STREAM));
169+
}
170+
171+
@Test
172+
public void testFetchPartitionModePostgres() {
173+
DatabaseClient databaseClient = Mockito.mock(DatabaseClient.class);
174+
ReadOnlyTransaction transaction = Mockito.mock(ReadOnlyTransaction.class);
175+
ResultSet resultSet = Mockito.mock(ResultSet.class);
176+
177+
when(databaseClient.readOnlyTransaction()).thenReturn(transaction);
178+
when(transaction.executeQuery(any(Statement.class))).thenReturn(resultSet);
179+
when(resultSet.next()).thenReturn(true).thenReturn(false);
180+
when(resultSet.getString(0)).thenReturn("IMMUTABLE_KEY_RANGE");
181+
182+
assertEquals(
183+
false,
184+
SpannerIO.isMutableChangeStream(databaseClient, Dialect.POSTGRESQL, TEST_CHANGE_STREAM));
185+
}
186+
187+
@Test
188+
public void testFetchPartitionModeNotFound() {
189+
DatabaseClient databaseClient = Mockito.mock(DatabaseClient.class);
190+
ReadOnlyTransaction transaction = Mockito.mock(ReadOnlyTransaction.class);
191+
ResultSet resultSet = Mockito.mock(ResultSet.class);
192+
193+
when(databaseClient.readOnlyTransaction()).thenReturn(transaction);
194+
when(transaction.executeQuery(any(Statement.class))).thenReturn(resultSet);
195+
when(resultSet.next()).thenReturn(false);
196+
197+
assertEquals(
198+
false,
199+
SpannerIO.isMutableChangeStream(
200+
databaseClient, Dialect.GOOGLE_STANDARD_SQL, TEST_CHANGE_STREAM));
201+
}
143202
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.sdk.io.gcp.spanner.changestreams.dao;
19+
20+
import static org.junit.Assert.assertTrue;
21+
import static org.mockito.ArgumentMatchers.any;
22+
import static org.mockito.Mockito.mock;
23+
import static org.mockito.Mockito.verify;
24+
import static org.mockito.Mockito.when;
25+
26+
import com.google.cloud.spanner.DatabaseClient;
27+
import com.google.cloud.spanner.Dialect;
28+
import com.google.cloud.spanner.Options.RpcPriority;
29+
import com.google.cloud.spanner.ReadOnlyTransaction;
30+
import com.google.cloud.spanner.ResultSet;
31+
import com.google.cloud.spanner.Statement;
32+
import org.junit.Before;
33+
import org.junit.Test;
34+
import org.mockito.ArgumentCaptor;
35+
36+
public class ChangeStreamDaoTest {
37+
private DatabaseClient databaseClient;
38+
private RpcPriority rpcPriority;
39+
private static final String CHANGE_STREAM_NAME = "testCS";
40+
41+
@Before
42+
public void setUp() {
43+
databaseClient = mock(DatabaseClient.class);
44+
rpcPriority = mock(RpcPriority.class);
45+
}
46+
47+
// New tests for PostgreSQL branch to verify the chosen TVF in the generated
48+
// SQL.
49+
@Test
50+
public void testChangeStreamQueryPostgresMutable() {
51+
// Arrange: single-use transaction for the actual change stream query
52+
ReadOnlyTransaction singleUseTx = mock(ReadOnlyTransaction.class);
53+
when(databaseClient.singleUse()).thenReturn(singleUseTx);
54+
55+
ChangeStreamDao changeStreamDao =
56+
new ChangeStreamDao(
57+
CHANGE_STREAM_NAME, databaseClient, rpcPriority, "testjob", Dialect.POSTGRESQL, true);
58+
59+
// Act: call the method that constructs and executes the statement
60+
changeStreamDao.changeStreamQuery(null, null, null, 0L);
61+
62+
// Assert: capture the Statement passed to singleUse().executeQuery and verify
63+
// SQL
64+
ArgumentCaptor<Statement> captor = ArgumentCaptor.forClass(Statement.class);
65+
verify(singleUseTx).executeQuery(captor.capture(), any(), any());
66+
Statement captured = captor.getValue();
67+
String sql = captured.getSql(); // adjust if different accessor is used
68+
assertTrue(
69+
"Expected SQL to contain read_proto_bytes_",
70+
sql.contains("read_proto_bytes_" + CHANGE_STREAM_NAME));
71+
}
72+
73+
@Test
74+
public void testChangeStreamQueryPostgresImmutable() {
75+
// Arrange: single-use transaction for the actual change stream query
76+
ReadOnlyTransaction singleUseTx = mock(ReadOnlyTransaction.class);
77+
when(databaseClient.singleUse()).thenReturn(singleUseTx);
78+
79+
ResultSet queryResult = mock(ResultSet.class);
80+
when(singleUseTx.executeQuery(any(), any(), any())).thenReturn(queryResult);
81+
82+
ChangeStreamDao changeStreamDao =
83+
new ChangeStreamDao(
84+
CHANGE_STREAM_NAME, databaseClient, rpcPriority, "testjob", Dialect.POSTGRESQL, false);
85+
86+
// Act
87+
changeStreamDao.changeStreamQuery(null, null, null, 0L);
88+
89+
// Assert
90+
ArgumentCaptor<Statement> captor = ArgumentCaptor.forClass(Statement.class);
91+
verify(singleUseTx).executeQuery(captor.capture(), any(), any());
92+
Statement captured = captor.getValue();
93+
String sql = captured.getSql();
94+
assertTrue(
95+
"Expected SQL to contain read_json_", sql.contains("read_json_" + CHANGE_STREAM_NAME));
96+
}
97+
}

0 commit comments

Comments
 (0)