Skip to content

Commit de5ef5f

Browse files
committed
Update ChangeStreamDao to query differnet TVF for postgresSQL based on
the change stream partition mode For MUTABLE_KEY_RANGE change stream, use read_proto_bytes_, else use read_json_
1 parent 8481373 commit de5ef5f

File tree

2 files changed

+199
-2
lines changed

2 files changed

+199
-2
lines changed

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/ChangeStreamDao.java

Lines changed: 89 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.google.cloud.spanner.Dialect;
2323
import com.google.cloud.spanner.Options;
2424
import com.google.cloud.spanner.Options.RpcPriority;
25+
import com.google.cloud.spanner.ReadOnlyTransaction;
2526
import com.google.cloud.spanner.ResultSet;
2627
import com.google.cloud.spanner.Statement;
2728
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.InitialPartition;
@@ -32,12 +33,23 @@
3233
*/
3334
public class ChangeStreamDao {
3435

36+
// new enum for partition mode
37+
protected enum PartitionMode {
38+
UNKNOWN,
39+
MUTABLE_KEY_RANGE,
40+
IMMUTABLE_KEY_RANGE
41+
}
42+
3543
private final String changeStreamName;
3644
private final DatabaseClient databaseClient;
3745
private final RpcPriority rpcPriority;
3846
private final String jobName;
3947
private final Dialect dialect;
4048

49+
// Always non-null to satisfy nullness checker.
50+
// Start UNKNOWN until we fetch and cache the real mode.
51+
private volatile PartitionMode partitionMode = PartitionMode.UNKNOWN;
52+
4153
/**
4254
* Constructs a change stream dao. All the queries performed by this class will be for the given
4355
* change stream name with the specified rpc priority. The job name will be used to tag all the
@@ -91,8 +103,20 @@ public ChangeStreamResultSet changeStreamQuery(
91103
String query = "";
92104
Statement statement;
93105
if (this.isPostgres()) {
94-
query =
95-
"SELECT * FROM \"spanner\".\"read_json_" + changeStreamName + "\"($1, $2, $3, $4, null)";
106+
// Ensure we have determined whether change stream uses mutable key range
107+
boolean isMutable = isMutableKeyRangeChangeStream();
108+
109+
if (isMutable) {
110+
query =
111+
"SELECT * FROM \"spanner\".\"read_proto_bytes_"
112+
+ changeStreamName
113+
+ "\"($1, $2, $3, $4, null)";
114+
} else {
115+
query =
116+
"SELECT * FROM \"spanner\".\"read_json_"
117+
+ changeStreamName
118+
+ "\"($1, $2, $3, $4, null)";
119+
}
96120
statement =
97121
Statement.newBuilder(query)
98122
.bind("p1")
@@ -138,4 +162,67 @@ public ChangeStreamResultSet changeStreamQuery(
138162
private boolean isPostgres() {
139163
return this.dialect == Dialect.POSTGRESQL;
140164
}
165+
166+
// Returns the PartitionMode, fetching from Spanner on first call and caching.
167+
protected PartitionMode getPartitionMode() {
168+
PartitionMode mode = this.partitionMode;
169+
if (mode != PartitionMode.UNKNOWN) {
170+
return mode;
171+
}
172+
synchronized (this) {
173+
if (this.partitionMode == PartitionMode.UNKNOWN) {
174+
String fetchedPartitionMode =
175+
fetchPartitionMode(this.databaseClient, this.dialect, this.changeStreamName);
176+
if (fetchedPartitionMode.isEmpty()
177+
|| fetchedPartitionMode.equalsIgnoreCase("IMMUTABLE_KEY_RANGE")) {
178+
mode = PartitionMode.IMMUTABLE_KEY_RANGE;
179+
} else {
180+
mode = PartitionMode.MUTABLE_KEY_RANGE;
181+
}
182+
}
183+
}
184+
return mode;
185+
}
186+
187+
// Convenience boolean method kept for compatibility
188+
protected boolean isMutableKeyRangeChangeStream() {
189+
return getPartitionMode() == PartitionMode.MUTABLE_KEY_RANGE;
190+
}
191+
192+
// Returns the partition_mode option value for the given change stream.
193+
private static String fetchPartitionMode(
194+
DatabaseClient databaseClient, Dialect dialect, String changeStreamName) {
195+
try (ReadOnlyTransaction tx = databaseClient.readOnlyTransaction()) {
196+
Statement statement;
197+
if (dialect == Dialect.POSTGRESQL) {
198+
statement =
199+
Statement.newBuilder(
200+
"select option_name, option_value\n"
201+
+ "from information_schema.change_stream_options\n"
202+
+ "where change_stream_name = $1")
203+
.bind("p1")
204+
.to(changeStreamName)
205+
.build();
206+
} else {
207+
statement =
208+
Statement.newBuilder(
209+
"select option_name, option_value\n"
210+
+ "from information_schema.change_stream_options\n"
211+
+ "where change_stream_name = @changeStreamName")
212+
.bind("changeStreamName")
213+
.to(changeStreamName)
214+
.build();
215+
}
216+
217+
try (ResultSet resultSet = tx.executeQuery(statement)) {
218+
while (resultSet.next()) {
219+
String optionName = resultSet.getString(0);
220+
if ("partition_mode".equalsIgnoreCase(optionName)) {
221+
return resultSet.getString(1);
222+
}
223+
}
224+
}
225+
}
226+
return "";
227+
}
141228
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.sdk.io.gcp.spanner.changestreams.dao;
19+
20+
import static org.junit.Assert.assertEquals;
21+
import static org.mockito.ArgumentMatchers.any;
22+
import static org.mockito.Mockito.mock;
23+
import static org.mockito.Mockito.when;
24+
25+
import com.google.cloud.spanner.DatabaseClient;
26+
import com.google.cloud.spanner.Dialect;
27+
import com.google.cloud.spanner.Options.RpcPriority;
28+
import com.google.cloud.spanner.ReadOnlyTransaction;
29+
import com.google.cloud.spanner.ResultSet;
30+
import org.junit.Before;
31+
import org.junit.Test;
32+
33+
public class ChangeStreamDaoTest {
34+
private DatabaseClient databaseClient;
35+
private RpcPriority rpcPriority;
36+
private ChangeStreamDao changeStreamDao;
37+
private static final String CHANGE_STREAM_NAME = "testCS";
38+
39+
@Before
40+
public void setUp() {
41+
databaseClient = mock(DatabaseClient.class);
42+
rpcPriority = mock(RpcPriority.class);
43+
}
44+
45+
@Test
46+
public void testPartitionOptionMutable() {
47+
ReadOnlyTransaction readOnlyTransaction = mock(ReadOnlyTransaction.class);
48+
when(databaseClient.readOnlyTransaction()).thenReturn(readOnlyTransaction);
49+
50+
ResultSet resultSet = mock(ResultSet.class);
51+
when(readOnlyTransaction.executeQuery(any())).thenReturn(resultSet);
52+
when(resultSet.next()).thenReturn(true).thenReturn(false);
53+
when(resultSet.getString(0)).thenReturn("partition_mode");
54+
when(resultSet.getString(1)).thenReturn("MUTABLE_KEY_RANGE");
55+
56+
ChangeStreamDao changeStreamDao =
57+
new ChangeStreamDao(
58+
CHANGE_STREAM_NAME,
59+
databaseClient,
60+
rpcPriority,
61+
"testjob",
62+
Dialect.GOOGLE_STANDARD_SQL);
63+
64+
assertEquals(true, changeStreamDao.isMutableKeyRangeChangeStream());
65+
}
66+
67+
@Test
68+
public void testPartitionOptionImmutable() {
69+
ReadOnlyTransaction readOnlyTransaction = mock(ReadOnlyTransaction.class);
70+
when(databaseClient.readOnlyTransaction()).thenReturn(readOnlyTransaction);
71+
72+
ResultSet resultSet = mock(ResultSet.class);
73+
when(readOnlyTransaction.executeQuery(any())).thenReturn(resultSet);
74+
when(resultSet.next()).thenReturn(true).thenReturn(false);
75+
when(resultSet.getString(0)).thenReturn("partition_mode");
76+
when(resultSet.getString(1)).thenReturn("IMMUTABLE_KEY_RANGE");
77+
78+
ChangeStreamDao changeStreamDao =
79+
new ChangeStreamDao(
80+
CHANGE_STREAM_NAME,
81+
databaseClient,
82+
rpcPriority,
83+
"testjob",
84+
Dialect.GOOGLE_STANDARD_SQL);
85+
86+
assertEquals(false, changeStreamDao.isMutableKeyRangeChangeStream());
87+
}
88+
89+
@Test
90+
public void testPartitionOptionEmpty() {
91+
ReadOnlyTransaction readOnlyTransaction = mock(ReadOnlyTransaction.class);
92+
when(databaseClient.readOnlyTransaction()).thenReturn(readOnlyTransaction);
93+
94+
ResultSet resultSet = mock(ResultSet.class);
95+
when(readOnlyTransaction.executeQuery(any())).thenReturn(resultSet);
96+
when(resultSet.next()).thenReturn(true).thenReturn(false);
97+
when(resultSet.getString(0)).thenReturn("partition_mode");
98+
when(resultSet.getString(1)).thenReturn("");
99+
100+
ChangeStreamDao changeStreamDao =
101+
new ChangeStreamDao(
102+
CHANGE_STREAM_NAME,
103+
databaseClient,
104+
rpcPriority,
105+
"testjob",
106+
Dialect.GOOGLE_STANDARD_SQL);
107+
108+
assertEquals(false, changeStreamDao.isMutableKeyRangeChangeStream());
109+
}
110+
}

0 commit comments

Comments
 (0)