Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
import com.google.cloud.spanner.Options;
import com.google.cloud.spanner.Options.RpcPriority;
import com.google.cloud.spanner.PartitionOptions;
import com.google.cloud.spanner.ReadOnlyTransaction;
import com.google.cloud.spanner.ResultSet;
import com.google.cloud.spanner.Spanner;
import com.google.cloud.spanner.SpannerException;
import com.google.cloud.spanner.SpannerOptions;
Expand Down Expand Up @@ -1998,6 +2000,11 @@ && getInclusiveStartAt().toSqlTimestamp().after(getInclusiveEndAt().toSqlTimesta
final MapperFactory mapperFactory = new MapperFactory(changeStreamDatabaseDialect);
final ChangeStreamMetrics metrics = new ChangeStreamMetrics();
final RpcPriority rpcPriority = MoreObjects.firstNonNull(getRpcPriority(), RpcPriority.HIGH);
final SpannerAccessor spannerAccessor =
SpannerAccessor.getOrCreate(changeStreamSpannerConfig);
final boolean isMutableChangeStream =
isMutableChangeStream(
spannerAccessor.getDatabaseClient(), changeStreamDatabaseDialect, changeStreamName);
final DaoFactory daoFactory =
new DaoFactory(
changeStreamSpannerConfig,
Expand All @@ -2007,7 +2014,8 @@ && getInclusiveStartAt().toSqlTimestamp().after(getInclusiveEndAt().toSqlTimesta
rpcPriority,
input.getPipeline().getOptions().getJobName(),
changeStreamDatabaseDialect,
metadataDatabaseDialect);
metadataDatabaseDialect,
isMutableChangeStream);
final ActionFactory actionFactory = new ActionFactory();

final Duration watermarkRefreshRate =
Expand Down Expand Up @@ -2688,4 +2696,58 @@ static String resolveSpannerProjectId(SpannerConfig config) {
? SpannerOptions.getDefaultProjectId()
: config.getProjectId().get();
}

@VisibleForTesting
static boolean isMutableChangeStream(
DatabaseClient databaseClient, Dialect dialect, String changeStreamName) {
String fetchedPartitionMode = fetchPartitionMode(databaseClient, dialect, changeStreamName);
if (fetchedPartitionMode.isEmpty()
|| fetchedPartitionMode.equalsIgnoreCase("IMMUTABLE_KEY_RANGE")) {
return false;
}
return true;
}

private static String fetchPartitionMode(
DatabaseClient databaseClient, Dialect dialect, String changeStreamName) {
try (ReadOnlyTransaction tx = databaseClient.readOnlyTransaction()) {
Statement statement;
if (dialect == Dialect.POSTGRESQL) {
statement =
Statement.newBuilder(
"select option_value\n"
+ "from information_schema.change_stream_options\n"
+ "where change_stream_name = $1 and option_name = 'partition_mode'")
.bind("p1")
.to(changeStreamName)
.build();
} else {
statement =
Statement.newBuilder(
"select option_value\n"
+ "from information_schema.change_stream_options\n"
+ "where change_stream_name = @changeStreamName and option_name = 'partition_mode'")
.bind("changeStreamName")
.to(changeStreamName)
.build();
}
ResultSet resultSet = tx.executeQuery(statement);
while (resultSet.next()) {
String value = resultSet.getString(0);
if (value != null) {
return value;
}
}
return "";
} catch (RuntimeException e) {
// Log the failure (with stack trace) but rethrow so the caller still observes
// the error.
LOG.warn(
"Failed to fetch partition_mode for change stream '{}', dialect={} - will propagate exception",
changeStreamName,
dialect,
e);
throw e;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@
* as a {@link ResultSet}, which can be consumed until the stream is finished.
*/
public class ChangeStreamDao {

private final String changeStreamName;
private final DatabaseClient databaseClient;
private final RpcPriority rpcPriority;
private final String jobName;
private final Dialect dialect;
private final boolean isMutableChangeStream;

/**
* Constructs a change stream dao. All the queries performed by this class will be for the given
Expand All @@ -53,12 +53,14 @@ public class ChangeStreamDao {
DatabaseClient databaseClient,
RpcPriority rpcPriority,
String jobName,
Dialect dialect) {
Dialect dialect,
boolean isMutableChangeStream) {
this.changeStreamName = changeStreamName;
this.databaseClient = databaseClient;
this.rpcPriority = rpcPriority;
this.jobName = jobName;
this.dialect = dialect;
this.isMutableChangeStream = isMutableChangeStream;
}

/**
Expand Down Expand Up @@ -91,8 +93,18 @@ public ChangeStreamResultSet changeStreamQuery(
String query = "";
Statement statement;
if (this.isPostgres()) {
query =
"SELECT * FROM \"spanner\".\"read_json_" + changeStreamName + "\"($1, $2, $3, $4, null)";
// Ensure we have determined whether change stream uses mutable key range
if (this.isMutableChangeStream) {
query =
"SELECT * FROM \"spanner\".\"read_proto_bytes_"
+ changeStreamName
+ "\"($1, $2, $3, $4, null)";
} else {
query =
"SELECT * FROM \"spanner\".\"read_json_"
+ changeStreamName
+ "\"($1, $2, $3, $4, null)";
}
statement =
Statement.newBuilder(query)
.bind("p1")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public class DaoFactory implements Serializable {
private final String jobName;
private final Dialect spannerChangeStreamDatabaseDialect;
private final Dialect metadataDatabaseDialect;
private final boolean isMutableChangeStream;

/**
* Constructs a {@link DaoFactory} with the configuration to be used for the underlying instances.
Expand All @@ -68,7 +69,8 @@ public DaoFactory(
RpcPriority rpcPriority,
String jobName,
Dialect spannerChangeStreamDatabaseDialect,
Dialect metadataDatabaseDialect) {
Dialect metadataDatabaseDialect,
boolean isMutableChangeStream) {
if (metadataSpannerConfig.getInstanceId() == null) {
throw new IllegalArgumentException("Metadata instance can not be null");
}
Expand All @@ -83,6 +85,7 @@ public DaoFactory(
this.jobName = jobName;
this.spannerChangeStreamDatabaseDialect = spannerChangeStreamDatabaseDialect;
this.metadataDatabaseDialect = metadataDatabaseDialect;
this.isMutableChangeStream = isMutableChangeStream;
}

/**
Expand Down Expand Up @@ -143,7 +146,8 @@ public synchronized ChangeStreamDao getChangeStreamDao() {
spannerAccessor.getDatabaseClient(),
rpcPriority,
jobName,
this.spannerChangeStreamDatabaseDialect);
this.spannerChangeStreamDatabaseDialect,
this.isMutableChangeStream);
}
return changeStreamDaoInstance;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,18 @@

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.when;

import com.google.auth.Credentials;
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.DatabaseClient;
import com.google.cloud.spanner.Dialect;
import com.google.cloud.spanner.Options.RpcPriority;
import com.google.cloud.spanner.ReadOnlyTransaction;
import com.google.cloud.spanner.ResultSet;
import com.google.cloud.spanner.Statement;
import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.MetadataSpannerConfigFactory;
Expand All @@ -32,6 +40,7 @@
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mockito;

@RunWith(JUnit4.class)
public class SpannerIOReadChangeStreamTest {
Expand Down Expand Up @@ -120,7 +129,8 @@ public void testSetSpannerConfigCredential() {

@Test
public void testWithDefaultCredential() {
// Get the default credential, without setting any credentials in the pipeline options or
// Get the default credential, without setting any credentials in the pipeline
// options or
// SpannerConfig.
Credentials defaultCredential =
testPipeline.getOptions().as(GcpOptions.class).getGcpCredential();
Expand All @@ -140,4 +150,53 @@ public void testWithDefaultCredential() {
assertEquals(defaultCredential, changeStreamSpannerConfigWithCredential.getCredentials().get());
assertEquals(defaultCredential, metadataSpannerConfigWithCredential.getCredentials().get());
}

@Test
public void testFetchPartitionModeGoogleSql() {
DatabaseClient databaseClient = Mockito.mock(DatabaseClient.class);
ReadOnlyTransaction transaction = Mockito.mock(ReadOnlyTransaction.class);
ResultSet resultSet = Mockito.mock(ResultSet.class);

when(databaseClient.readOnlyTransaction()).thenReturn(transaction);
when(transaction.executeQuery(any(Statement.class))).thenReturn(resultSet);
when(resultSet.next()).thenReturn(true).thenReturn(false);
when(resultSet.getString(0)).thenReturn("MUTABLE_KEY_RANGE");

assertEquals(
true,
SpannerIO.isMutableChangeStream(
databaseClient, Dialect.GOOGLE_STANDARD_SQL, TEST_CHANGE_STREAM));
}

@Test
public void testFetchPartitionModePostgres() {
DatabaseClient databaseClient = Mockito.mock(DatabaseClient.class);
ReadOnlyTransaction transaction = Mockito.mock(ReadOnlyTransaction.class);
ResultSet resultSet = Mockito.mock(ResultSet.class);

when(databaseClient.readOnlyTransaction()).thenReturn(transaction);
when(transaction.executeQuery(any(Statement.class))).thenReturn(resultSet);
when(resultSet.next()).thenReturn(true).thenReturn(false);
when(resultSet.getString(0)).thenReturn("IMMUTABLE_KEY_RANGE");

assertEquals(
false,
SpannerIO.isMutableChangeStream(databaseClient, Dialect.POSTGRESQL, TEST_CHANGE_STREAM));
}

@Test
public void testFetchPartitionModeNotFound() {
DatabaseClient databaseClient = Mockito.mock(DatabaseClient.class);
ReadOnlyTransaction transaction = Mockito.mock(ReadOnlyTransaction.class);
ResultSet resultSet = Mockito.mock(ResultSet.class);

when(databaseClient.readOnlyTransaction()).thenReturn(transaction);
when(transaction.executeQuery(any(Statement.class))).thenReturn(resultSet);
when(resultSet.next()).thenReturn(false);

assertEquals(
false,
SpannerIO.isMutableChangeStream(
databaseClient, Dialect.GOOGLE_STANDARD_SQL, TEST_CHANGE_STREAM));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,7 @@ public void testInvalidRecordReceivedWithDefaultSettings() {
Timestamp.ofTimeSecondsAndNanos(startTimestamp.getSeconds(), startTimestamp.getNanos() + 1);

mockGetDialect();
mockChangeStreamOptions();
mockTableExists();
mockGetWatermark(startTimestamp);
ResultSet getPartitionResultSet = mockGetParentPartition(startTimestamp, endTimestamp);
Expand Down Expand Up @@ -377,6 +378,38 @@ public void testInvalidRecordReceivedWithDefaultSettings() {
}
}

private void mockChangeStreamOptions() {
Statement changeStreamOptionsStatement =
Statement.newBuilder(
"select option_value\n"
+ "from information_schema.change_stream_options\n"
+ "where change_stream_name = @changeStreamName and option_name = 'partition_mode'")
.bind("changeStreamName")
.to(TEST_CHANGE_STREAM)
.build();
ResultSetMetadata changeStreamOptionsResultSetMetadata =
ResultSetMetadata.newBuilder()
.setRowType(
StructType.newBuilder()
.addFields(
Field.newBuilder()
.setName("option_value")
.setType(Type.newBuilder().setCode(TypeCode.STRING).build())
.build())
.build())
.build();
ResultSet changeStreamOptionsResultSet =
ResultSet.newBuilder()
.addRows(
ListValue.newBuilder()
.addValues(Value.newBuilder().setStringValue("NEW_VALUES").build())
.build())
.setMetadata(changeStreamOptionsResultSetMetadata)
.build();
mockSpannerService.putPartialStatementResult(
StatementResult.query(changeStreamOptionsStatement, changeStreamOptionsResultSet));
}

private void mockInvalidChangeStreamRecordReceived(Timestamp now, Timestamp after3Seconds) {
Statement changeStreamQueryStatement =
Statement.newBuilder(
Expand Down
Loading
Loading