Skip to content

Commit 5e5c9aa

Browse files
committed
Parse proto bytes change stream record in ChangeStreamRecordMapper
1 parent e12436a commit 5e5c9aa

File tree

5 files changed

+207
-7
lines changed

5 files changed

+207
-7
lines changed

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1977,11 +1977,6 @@ && getInclusiveStartAt().toSqlTimestamp().after(getInclusiveEndAt().toSqlTimesta
19771977
+ changeStreamDatabaseId
19781978
+ " has dialect "
19791979
+ changeStreamDatabaseDialect);
1980-
LOG.info(
1981-
"The Spanner database "
1982-
+ fullPartitionMetadataDatabaseId
1983-
+ " has dialect "
1984-
+ metadataDatabaseDialect);
19851980
PartitionMetadataTableNames partitionMetadataTableNames =
19861981
Optional.ofNullable(getMetadataTable())
19871982
.map(
@@ -2005,6 +2000,7 @@ && getInclusiveStartAt().toSqlTimestamp().after(getInclusiveEndAt().toSqlTimesta
20052000
final boolean isMutableChangeStream =
20062001
isMutableChangeStream(
20072002
spannerAccessor.getDatabaseClient(), changeStreamDatabaseDialect, changeStreamName);
2003+
LOG.info("The change stream " + changeStreamName + " is mutable: " + isMutableChangeStream);
20082004
final DaoFactory daoFactory =
20092005
new DaoFactory(
20102006
changeStreamSpannerConfig,

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/ChangeStreamResultSet.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.google.cloud.Timestamp;
2121
import com.google.cloud.spanner.ResultSet;
2222
import com.google.cloud.spanner.Struct;
23+
import com.google.protobuf.InvalidProtocolBufferException;
2324
import org.joda.time.Duration;
2425

2526
/**
@@ -128,6 +129,33 @@ public boolean isProtoChangeRecord() {
128129
&& resultSet.getColumnType(0).getCode() == com.google.cloud.spanner.Type.Code.PROTO;
129130
}
130131

132+
/**
133+
* Returns the change stream record at the current pointer by parsing the bytes column. It also
134+
* updates the timestamp at which the record was read.
135+
*
136+
* <p>Should only be used for PostgreSQL databases when the change stream record is delivered as
137+
* proto bytes.
138+
*
139+
* @return a change stream record as a proto or null
140+
*/
141+
public com.google.spanner.v1.ChangeStreamRecord getProtoChangeStreamRecordFromBytes() {
142+
recordReadAt = Timestamp.now();
143+
try {
144+
// Use getBytes(0) for the BYTES column returned by read_proto_bytes_ TVF
145+
return com.google.spanner.v1.ChangeStreamRecord.parseFrom(
146+
resultSet.getBytes(0).toByteArray());
147+
} catch (InvalidProtocolBufferException e) {
148+
throw new RuntimeException("Failed to parse the proto bytes to ChangeStreamRecord proto", e);
149+
}
150+
}
151+
152+
/** Returns true if the result set at the current pointer contain only one bytes change record. */
153+
public boolean isProtoBytesChangeRecord() {
154+
return resultSet.getColumnCount() == 1
155+
&& !resultSet.isNull(0)
156+
&& resultSet.getColumnType(0).getCode() == com.google.cloud.spanner.Type.Code.BYTES;
157+
}
158+
131159
/**
132160
* Returns the record at the current pointer as {@link JsonB}. It also updates the timestamp at
133161
* which the record was read.

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapper.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -218,18 +218,28 @@ public class ChangeStreamRecordMapper {
218218
* @param resultSet the change stream result set
219219
* @param resultSetMetadata the metadata generated when reading the change stream row
220220
* @return a {@link List} of {@link ChangeStreamRecord} subclasses
221+
* @throws InvalidProtocolBufferException
221222
*/
222223
public List<ChangeStreamRecord> toChangeStreamRecords(
223224
PartitionMetadata partition,
224225
ChangeStreamResultSet resultSet,
225226
ChangeStreamResultSetMetadata resultSetMetadata) {
226227
if (this.isPostgres()) {
227-
// In PostgresQL, change stream records are returned as JsonB.
228+
// For `MUTABLE_KEY_RANGE` option, change stream records are returned as protos.
229+
if (resultSet.isProtoBytesChangeRecord()) {
230+
return Arrays.asList(
231+
toChangeStreamRecord(
232+
partition, resultSet.getProtoChangeStreamRecordFromBytes(), resultSetMetadata));
233+
}
234+
235+
// For `IMMUTABLE_KEY_RANGE` option, change stream records are returned as
236+
// JsonB.
228237
return Collections.singletonList(
229238
toChangeStreamRecordJson(partition, resultSet.getPgJsonb(0), resultSetMetadata));
230239
}
231240

232-
// In GoogleSQL, for `MUTABLE_KEY_RANGE` option, change stream records are returned as Protos.
241+
// In GoogleSQL, for `MUTABLE_KEY_RANGE` option, change stream records are
242+
// returned as Protos.
233243
if (resultSet.isProtoChangeRecord()) {
234244
return Arrays.asList(
235245
toChangeStreamRecord(
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.sdk.io.gcp.spanner.changestreams.dao;
19+
20+
import static org.apache.beam.sdk.io.gcp.spanner.changestreams.util.TestProtoMapper.recordToProto;
21+
import static org.junit.Assert.assertEquals;
22+
import static org.junit.Assert.assertNotNull;
23+
import static org.mockito.Mockito.mock;
24+
import static org.mockito.Mockito.when;
25+
26+
import com.google.cloud.ByteArray;
27+
import com.google.cloud.Timestamp;
28+
import com.google.cloud.spanner.ResultSet;
29+
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.HeartbeatRecord;
30+
import org.junit.Test;
31+
32+
public class ChangeStreamResultSetTest {
33+
34+
@Test
35+
public void testGetProtoChangeStreamRecordFromBytes() throws Exception {
36+
// 1. Create an expected ChangeStreamRecord proto
37+
Timestamp now = Timestamp.now();
38+
final HeartbeatRecord heartbeatRecord =
39+
new HeartbeatRecord(Timestamp.ofTimeSecondsAndNanos(10L, 20), null);
40+
com.google.spanner.v1.ChangeStreamRecord expectedRecord = recordToProto(heartbeatRecord);
41+
assertNotNull(expectedRecord);
42+
43+
// 2. Convert it to bytes (simulating how Spanner PostgreSQL returns it)
44+
byte[] protoBytes = expectedRecord.toByteArray();
45+
46+
// 3. Mock the underlying Spanner ResultSet
47+
ResultSet mockResultSet = mock(ResultSet.class);
48+
// Simulate column 0 containing the BYTES representation of the proto
49+
when(mockResultSet.getBytes(0)).thenReturn(ByteArray.copyFrom(protoBytes));
50+
51+
// 4. Initialize ChangeStreamResultSet with the mock
52+
ChangeStreamResultSet changeStreamResultSet = new ChangeStreamResultSet(mockResultSet);
53+
54+
// 5. Call the new method and assert it parses correctly
55+
// (Note: This assumes you have added getProtoChangeStreamRecordFromBytes to the class)
56+
com.google.spanner.v1.ChangeStreamRecord actualRecord =
57+
changeStreamResultSet.getProtoChangeStreamRecordFromBytes();
58+
59+
assertEquals(expectedRecord, actualRecord);
60+
}
61+
}

sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapperTest.java

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1039,4 +1039,109 @@ public void testMappingProtoRowToDataChangeRecord() {
10391039
Collections.singletonList(dataChangeRecord),
10401040
mapper.toChangeStreamRecords(partition, resultSet, resultSetMetadata));
10411041
}
1042+
1043+
@Test
1044+
public void testMappingProtoBytesRowToPartitionStartRecord() {
1045+
final PartitionStartRecord partitionStartRecord =
1046+
new PartitionStartRecord(
1047+
Timestamp.MIN_VALUE,
1048+
"fakeRecordSequence",
1049+
Arrays.asList("partitionToken1", "partitionToken2"),
1050+
null);
1051+
com.google.spanner.v1.ChangeStreamRecord changeStreamRecordProto =
1052+
recordToProto(partitionStartRecord);
1053+
assertNotNull(changeStreamRecordProto);
1054+
ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class);
1055+
1056+
when(resultSet.isProtoBytesChangeRecord()).thenReturn(true);
1057+
when(resultSet.getProtoChangeStreamRecordFromBytes()).thenReturn(changeStreamRecordProto);
1058+
assertEquals(
1059+
Collections.singletonList(partitionStartRecord),
1060+
mapperPostgres.toChangeStreamRecords(partition, resultSet, resultSetMetadata));
1061+
}
1062+
1063+
@Test
1064+
public void testMappingProtoBytesRowToPartitionEndRecord() {
1065+
final PartitionEndRecord partitionEndChange =
1066+
new PartitionEndRecord(Timestamp.MIN_VALUE, "fakeRecordSequence", null);
1067+
com.google.spanner.v1.ChangeStreamRecord changeStreamRecordProto =
1068+
recordToProto(partitionEndChange);
1069+
assertNotNull(changeStreamRecordProto);
1070+
ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class);
1071+
1072+
when(resultSet.isProtoBytesChangeRecord()).thenReturn(true);
1073+
when(resultSet.getProtoChangeStreamRecordFromBytes()).thenReturn(changeStreamRecordProto);
1074+
assertEquals(
1075+
Collections.singletonList(partitionEndChange),
1076+
mapperPostgres.toChangeStreamRecords(partition, resultSet, resultSetMetadata));
1077+
}
1078+
1079+
@Test
1080+
public void testMappingProtoBytesRowToPartitionEventRecord() {
1081+
final PartitionEventRecord partitionEventRecord =
1082+
new PartitionEventRecord(Timestamp.MIN_VALUE, "fakeRecordSequence", null);
1083+
com.google.spanner.v1.ChangeStreamRecord changeStreamRecordProto =
1084+
recordToProto(partitionEventRecord);
1085+
assertNotNull(changeStreamRecordProto);
1086+
ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class);
1087+
1088+
when(resultSet.isProtoBytesChangeRecord()).thenReturn(true);
1089+
when(resultSet.getProtoChangeStreamRecordFromBytes()).thenReturn(changeStreamRecordProto);
1090+
assertEquals(
1091+
Collections.singletonList(partitionEventRecord),
1092+
mapperPostgres.toChangeStreamRecords(partition, resultSet, resultSetMetadata));
1093+
}
1094+
1095+
@Test
1096+
public void testMappingProtoBytesRowToHeartbeatRecord() {
1097+
final HeartbeatRecord heartbeatRecord =
1098+
new HeartbeatRecord(Timestamp.ofTimeSecondsAndNanos(10L, 20), null);
1099+
com.google.spanner.v1.ChangeStreamRecord changeStreamRecordProto =
1100+
recordToProto(heartbeatRecord);
1101+
assertNotNull(changeStreamRecordProto);
1102+
ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class);
1103+
1104+
when(resultSet.isProtoBytesChangeRecord()).thenReturn(true);
1105+
when(resultSet.getProtoChangeStreamRecordFromBytes()).thenReturn(changeStreamRecordProto);
1106+
assertEquals(
1107+
Collections.singletonList(heartbeatRecord),
1108+
mapperPostgres.toChangeStreamRecords(partition, resultSet, resultSetMetadata));
1109+
}
1110+
1111+
@Test
1112+
public void testMappingProtoBytesRowToDataChangeRecord() {
1113+
final DataChangeRecord dataChangeRecord =
1114+
new DataChangeRecord(
1115+
"partitionToken",
1116+
Timestamp.ofTimeSecondsAndNanos(10L, 20),
1117+
"serverTransactionId",
1118+
true,
1119+
"1",
1120+
"tableName",
1121+
Arrays.asList(
1122+
new ColumnType("column1", new TypeCode("{\"code\":\"INT64\"}"), true, 1L),
1123+
new ColumnType("column2", new TypeCode("{\"code\":\"BYTES\"}"), false, 2L)),
1124+
Collections.singletonList(
1125+
new Mod(
1126+
"{\"column1\":\"value1\"}",
1127+
"{\"column2\":\"oldValue2\"}",
1128+
"{\"column2\":\"newValue2\"}")),
1129+
ModType.UPDATE,
1130+
ValueCaptureType.OLD_AND_NEW_VALUES,
1131+
10L,
1132+
2L,
1133+
"transactionTag",
1134+
true,
1135+
null);
1136+
com.google.spanner.v1.ChangeStreamRecord changeStreamRecordProto =
1137+
recordToProto(dataChangeRecord);
1138+
assertNotNull(changeStreamRecordProto);
1139+
ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class);
1140+
1141+
when(resultSet.isProtoBytesChangeRecord()).thenReturn(true);
1142+
when(resultSet.getProtoChangeStreamRecordFromBytes()).thenReturn(changeStreamRecordProto);
1143+
assertEquals(
1144+
Collections.singletonList(dataChangeRecord),
1145+
mapperPostgres.toChangeStreamRecords(partition, resultSet, resultSetMetadata));
1146+
}
10421147
}

0 commit comments

Comments
 (0)