Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class DebeziumConstants {
public static final String INCOMING_TS_MS_FIELD = "ts_ms";

public static final String INCOMING_SOURCE_NAME_FIELD = "source.name";
public static final String INCOMING_SOURCE_SCHEMA_FIELD = "source.schema";
public static final String INCOMING_SOURCE_TS_MS_FIELD = "source.ts_ms";
public static final String INCOMING_SOURCE_TXID_FIELD = "source.txId";

Expand All @@ -51,6 +52,7 @@ public class DebeziumConstants {
public static final String FLATTENED_OP_COL_NAME = "_change_operation_type";
public static final String UPSTREAM_PROCESSING_TS_COL_NAME = "_upstream_event_processed_ts_ms";
public static final String FLATTENED_SHARD_NAME = "db_shard_source_partition";
public static final String FLATTENED_SCHEMA_NAME = "db_schema_source_partition";
public static final String FLATTENED_TS_COL_NAME = "_event_origin_ts_ms";
public static final String FLATTENED_TX_ID_COL_NAME = "_event_tx_id";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ protected Dataset<Row> processDataset(Dataset<Row> rowDataset) {
String.format("%s as %s", DebeziumConstants.INCOMING_OP_FIELD, DebeziumConstants.FLATTENED_OP_COL_NAME),
String.format("%s as %s", DebeziumConstants.INCOMING_TS_MS_FIELD, DebeziumConstants.UPSTREAM_PROCESSING_TS_COL_NAME),
String.format("%s as %s", DebeziumConstants.INCOMING_SOURCE_NAME_FIELD, DebeziumConstants.FLATTENED_SHARD_NAME),
String.format("%s as %s", DebeziumConstants.INCOMING_SOURCE_SCHEMA_FIELD, DebeziumConstants.FLATTENED_SCHEMA_NAME),
String.format("%s as %s", DebeziumConstants.INCOMING_SOURCE_TS_MS_FIELD, DebeziumConstants.FLATTENED_TS_COL_NAME),
String.format("%s as %s", DebeziumConstants.INCOMING_SOURCE_TXID_FIELD, DebeziumConstants.FLATTENED_TX_ID_COL_NAME),
String.format("%s as %s", DebeziumConstants.INCOMING_SOURCE_LSN_FIELD, DebeziumConstants.FLATTENED_LSN_COL_NAME),
Expand All @@ -70,6 +71,7 @@ protected Dataset<Row> processDataset(Dataset<Row> rowDataset) {
String.format("%s as %s", DebeziumConstants.INCOMING_OP_FIELD, DebeziumConstants.FLATTENED_OP_COL_NAME),
String.format("%s as %s", DebeziumConstants.INCOMING_TS_MS_FIELD, DebeziumConstants.UPSTREAM_PROCESSING_TS_COL_NAME),
String.format("%s as %s", DebeziumConstants.INCOMING_SOURCE_NAME_FIELD, DebeziumConstants.FLATTENED_SHARD_NAME),
String.format("%s as %s", DebeziumConstants.INCOMING_SOURCE_SCHEMA_FIELD, DebeziumConstants.FLATTENED_SCHEMA_NAME),
String.format("%s as %s", DebeziumConstants.INCOMING_SOURCE_TS_MS_FIELD, DebeziumConstants.FLATTENED_TS_COL_NAME),
String.format("%s as %s", DebeziumConstants.INCOMING_SOURCE_TXID_FIELD, DebeziumConstants.FLATTENED_TX_ID_COL_NAME),
String.format("%s as %s", DebeziumConstants.INCOMING_SOURCE_LSN_FIELD, DebeziumConstants.FLATTENED_LSN_COL_NAME),
Expand Down