Skip to content

Commit f603f62

Browse files
zxqfd555-pwManul from Pathway
authored andcommitted
delta table replays in snapshot mode (#9312)
GitOrigin-RevId: de0553ffc030bda6aac15fef1f3e4c9063c36b78
1 parent f2f7dd4 commit f603f62

File tree

9 files changed

+266
-71
lines changed

9 files changed

+266
-71
lines changed

CHANGELOG.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,12 @@ All notable changes to this project will be documented in this file.
55
This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
66
## [Unreleased]
77

8+
### Changed
9+
- `pw.io.deltalake.read` now accepts the `start_from_timestamp_ms` parameter for non-append-only tables. In this case, the connector will replay the history of changes in the table version by version starting from the state of the table at the given timestamp. The differences between versions will be applied atomically.
10+
811
## [0.26.3] - 2025-10-03
912

1013
### Added
11-
1214
- New parser `pathway.xpacks.llm.parsers.PaddleOCRParser` supporting parsing of PDF, PPTX and images.
1315

1416
## [0.26.2] - 2025-10-01

python/pathway/io/deltalake/__init__.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -356,8 +356,9 @@ def read(
356356
custom endpoint is left blank, the authorized user's credentials for S3 will
357357
be used.
358358
start_from_timestamp_ms: If defined, only changes that occurred after the specified
359-
timestamp will be read. This parameter can only be used for tables with
360-
append-only behavior.
359+
timestamp are read. When used with **non-append-only tables**, the state
360+
of the table at the given timestamp is loaded first, and then all updates are read
361+
incrementally.
361362
autocommit_duration_ms: The maximum time between two commits. Every
362363
``autocommit_duration_ms`` milliseconds, the updates received by the connector are
363364
committed and pushed into Pathway's computation graph.

python/pathway/tests/test_io.py

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5174,3 +5174,107 @@ def run_test(n_expected_entries: int, n_different_times: int):
51745174
]
51755175
write_lines(input_path, input_contents)
51765176
run_test(10, 1)
5177+
5178+
5179+
def test_delta_snapshot_mode_rewind(tmp_path):
5180+
input_path = tmp_path / "input.jsonl"
5181+
delta_table_path = tmp_path / "delta"
5182+
output_path = tmp_path / "output.jsonl"
5183+
pstorage_path = tmp_path / "PStorage"
5184+
5185+
row_1 = {
5186+
"key": 1,
5187+
"value": "one",
5188+
}
5189+
row_2 = {
5190+
"key": 2,
5191+
"value": "two",
5192+
}
5193+
row_3 = {
5194+
"key": 3,
5195+
"value": "three",
5196+
}
5197+
5198+
class InputSchema(pw.Schema):
5199+
key: int = pw.column_definition(primary_key=True)
5200+
value: str
5201+
5202+
def update_delta_table_snapshot(rows: list[dict]):
5203+
G.clear()
5204+
with open(input_path, "w") as f:
5205+
for row in rows:
5206+
f.write(json.dumps(row))
5207+
f.write("\n")
5208+
table = pw.io.jsonlines.read(input_path, schema=InputSchema, mode="static")
5209+
pw.io.deltalake.write(table, delta_table_path, output_table_type="snapshot")
5210+
run_all(
5211+
persistence_config=pw.persistence.Config(
5212+
backend=pw.persistence.Backend.filesystem(pstorage_path)
5213+
)
5214+
)
5215+
5216+
def from_delta_table_to_file(start_from_timestamp_ms: int | None) -> list[dict]:
5217+
G.clear()
5218+
table = pw.io.deltalake.read(
5219+
delta_table_path,
5220+
schema=InputSchema,
5221+
mode="static",
5222+
start_from_timestamp_ms=start_from_timestamp_ms,
5223+
)
5224+
pw.io.jsonlines.write(table, output_path)
5225+
run_all()
5226+
result = []
5227+
with open(output_path, "r") as f:
5228+
for row in f:
5229+
parsed_row = json.loads(row)
5230+
if start_from_timestamp_ms is None:
5231+
assert parsed_row["diff"] == 1
5232+
result.append(parsed_row)
5233+
return result
5234+
5235+
time_start_1 = int(time.time() * 1000)
5236+
time.sleep(0.05)
5237+
update_delta_table_snapshot([row_1, row_2, row_3])
5238+
assert len(from_delta_table_to_file(None)) == 3
5239+
5240+
time_start_2 = int(time.time() * 1000)
5241+
time.sleep(0.05)
5242+
update_delta_table_snapshot([row_1, row_2])
5243+
assert len(from_delta_table_to_file(None)) == 2
5244+
5245+
time_start_3 = int(time.time() * 1000)
5246+
time.sleep(0.05)
5247+
update_delta_table_snapshot([row_1, row_3])
5248+
assert len(from_delta_table_to_file(None)) == 2
5249+
5250+
time_start_4 = int(time.time() * 1000)
5251+
time.sleep(0.05)
5252+
update_delta_table_snapshot([row_2])
5253+
assert len(from_delta_table_to_file(None)) == 1
5254+
time.sleep(0.05)
5255+
time_start_5 = int(time.time() * 1000)
5256+
5257+
# We start with an empty set.
5258+
# Then, we move to [1, 2, 3]. It's 3 actions.
5259+
# Then, we move to [1, 2] via just one action: -1. It's 4 actions in total.
5260+
# Then, we move to [1, 3], which takes -2, +3. It's 6 actions in total.
5261+
# Then, we move to [2], which takes -1, -3, +2. It's 9 actions.
5262+
assert len(from_delta_table_to_file(time_start_1)) == 9
5263+
5264+
# We the state at `time_start_2` corresponds to the snapshot with 3 elements.
5265+
# Then we apply all diffs, so the size of the log is the same as in the previous
5266+
# case.
5267+
assert len(from_delta_table_to_file(time_start_2)) == 9
5268+
5269+
# We start with [1, 2]. It's 2 events.
5270+
# Then, we do -2, +3 to advance to [1, 3]. It's 4 events.
5271+
# Then, we do -1, -3, +2 to advance to [2]. It's 7 events.
5272+
assert len(from_delta_table_to_file(time_start_3)) == 7
5273+
5274+
# We start with [1, 3]. It's 2 events.
5275+
# Then, we do -1, -3, +2 to advance to [2]. It's 5 events.
5276+
assert len(from_delta_table_to_file(time_start_4)) == 5
5277+
5278+
# There are no events following after `time_start_5`, so we take the snapshot
5279+
# that is actual at this point of time. It's just one action: [2]
5280+
assert len(from_delta_table_to_file(time_start_5)) == 1

src/connectors/data_lake/delta.rs

Lines changed: 108 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,9 @@ use crate::connectors::data_format::{
5151
};
5252
use crate::connectors::data_lake::buffering::PayloadType;
5353
use crate::connectors::data_lake::ArrowDataType;
54-
use crate::connectors::data_storage::{ConnectorMode, ConversionError, ValuesMap};
54+
use crate::connectors::data_storage::{
55+
CommitPossibility, ConnectorMode, ConversionError, ValuesMap,
56+
};
5557
use crate::connectors::metadata::ParquetMetadata;
5658
use crate::connectors::scanner::S3Scanner;
5759
use crate::connectors::{
@@ -701,48 +703,26 @@ impl DeltaTableReader {
701703
}
702704
let mut current_version = table.version();
703705

704-
let mut parquet_files_queue = {
705-
let history = runtime.block_on(async {
706-
Ok::<Vec<DeltaTableCommitInfo>, ReadError>(table.history(None).await?)
707-
})?;
708-
Self::get_reader_actions(&table, path, history, &column_types)?
709-
};
706+
let mut parquet_files_queue = VecDeque::new();
710707
let mut backfilling_entries_queue = VecDeque::new();
708+
let mut snapshot_loading_needed = backfilling_thresholds.is_empty();
711709

712710
if let Some(start_from_timestamp_ms) = start_from_timestamp_ms {
713711
assert!(backfilling_thresholds.is_empty()); // Checked upstream in python_api.rs
714-
let current_timestamp = current_unix_timestamp_ms();
715-
if start_from_timestamp_ms > current_timestamp.try_into().unwrap() {
716-
warn!("The timestamp {start_from_timestamp_ms} is greater than the current timestamp {current_timestamp}. All new entries will be read.");
717-
}
718-
let (earliest_version, latest_version) = runtime.block_on(async {
719-
Ok::<(i64, i64), ReadError>((
720-
table.get_earliest_version().await?,
721-
table.get_latest_version().await?,
722-
))
723-
})?;
724-
let snapshot = table.snapshot()?;
725-
726-
let mut last_version_below_threshold = None;
727-
for version in earliest_version..=latest_version {
728-
let Some(timestamp) = snapshot.version_timestamp(version) else {
729-
continue;
730-
};
731-
if timestamp < start_from_timestamp_ms {
732-
last_version_below_threshold = Some(version);
733-
} else {
734-
break;
735-
}
736-
}
737-
if let Some(last_version_below_threshold) = last_version_below_threshold {
738-
runtime
739-
.block_on(async { table.load_version(last_version_below_threshold).await })?;
740-
current_version = last_version_below_threshold;
741-
} else {
742-
current_version = earliest_version;
743-
warn!("All available versions are newer than the specified timestamp {start_from_timestamp_ms}. The read will start from the beginning.");
744-
}
745-
parquet_files_queue.clear();
712+
Self::handle_start_from_timestamp_ms(
713+
&runtime,
714+
&mut table,
715+
start_from_timestamp_ms,
716+
is_append_only,
717+
&mut current_version,
718+
&mut snapshot_loading_needed,
719+
)?;
720+
} else {
721+
snapshot_loading_needed = true;
722+
}
723+
if snapshot_loading_needed {
724+
parquet_files_queue =
725+
Self::get_reader_actions_for_table(&runtime, &table, path, &column_types)?;
746726
}
747727

748728
if !backfilling_thresholds.is_empty() {
@@ -771,6 +751,64 @@ impl DeltaTableReader {
771751
})
772752
}
773753

754+
fn handle_start_from_timestamp_ms(
755+
runtime: &TokioRuntime,
756+
table: &mut DeltaTable,
757+
start_from_timestamp_ms: i64,
758+
is_append_only: bool,
759+
current_version: &mut i64,
760+
snapshot_loading_needed: &mut bool,
761+
) -> Result<(), ReadError> {
762+
let current_timestamp = current_unix_timestamp_ms();
763+
if start_from_timestamp_ms > current_timestamp.try_into().unwrap() {
764+
warn!("The timestamp {start_from_timestamp_ms} is greater than the current timestamp {current_timestamp}. All new entries will be read.");
765+
}
766+
let (earliest_version, latest_version) = runtime.block_on(async {
767+
Ok::<(i64, i64), ReadError>((
768+
table.get_earliest_version().await?,
769+
table.get_latest_version().await?,
770+
))
771+
})?;
772+
let snapshot = table.snapshot()?;
773+
774+
let mut last_version_below_threshold = None;
775+
let mut version_at_threshold = None;
776+
for version in earliest_version..=latest_version {
777+
let Some(timestamp) = snapshot.version_timestamp(version) else {
778+
continue;
779+
};
780+
if timestamp < start_from_timestamp_ms {
781+
last_version_below_threshold = Some(version);
782+
} else {
783+
if timestamp == start_from_timestamp_ms {
784+
version_at_threshold = Some(version);
785+
}
786+
break;
787+
}
788+
}
789+
790+
if !is_append_only && version_at_threshold.is_some() {
791+
*current_version = version_at_threshold.unwrap();
792+
} else if let Some(last_version_below_threshold) = last_version_below_threshold {
793+
*current_version = last_version_below_threshold;
794+
} else {
795+
*current_version = earliest_version;
796+
warn!(
797+
"All available versions are newer than the specified timestamp {start_from_timestamp_ms}. The read will start from the beginning, version {current_version}."
798+
);
799+
// NB: All versions are newer than the requested one, meaning that we need to read the
800+
// full state at the `earliest_version` and then continue incrementally.
801+
}
802+
803+
if is_append_only && last_version_below_threshold.is_some() {
804+
// We've found the threshold version, we read only diffs from this version onwards.
805+
*snapshot_loading_needed = false;
806+
}
807+
808+
runtime.block_on(async { table.load_version(*current_version).await })?;
809+
Ok(())
810+
}
811+
774812
fn record_batch_has_pathway_fields(batch: &ArrowRecordBatch) -> bool {
775813
for (field, _) in SPECIAL_OUTPUT_FIELDS {
776814
if let Some(time_column) = batch.column_by_name(field) {
@@ -891,7 +929,8 @@ impl DeltaTableReader {
891929
if is_new_block {
892930
backfilling_entries_queue.push_back(BackfillingEntry::SourceEvent(
893931
ReadResult::FinishedSource {
894-
commit_allowed: true,
932+
// Applicable only for append-only tables, hence no need to avoid squashing diff = +1 with diff = -1
933+
commit_possibility: CommitPossibility::Possible,
895934
},
896935
));
897936
backfilling_entries_queue.push_back(BackfillingEntry::SourceEvent(
@@ -909,7 +948,9 @@ impl DeltaTableReader {
909948
}
910949
backfilling_entries_queue.push_back(BackfillingEntry::SourceEvent(
911950
ReadResult::FinishedSource {
912-
commit_allowed: true,
951+
// Same as above, we don't force commits, since the situation with losing/collapsing +1 and -1 events
952+
// is not possible here
953+
commit_possibility: CommitPossibility::Possible,
913954
},
914955
));
915956
if pathway_meta_column_added {
@@ -943,6 +984,18 @@ impl DeltaTableReader {
943984
}
944985
}
945986

987+
fn get_reader_actions_for_table(
988+
runtime: &TokioRuntime,
989+
table: &DeltaTable,
990+
base_path: &str,
991+
column_types: &HashMap<String, Type>,
992+
) -> Result<VecDeque<DeltaReaderAction>, ReadError> {
993+
let history = runtime.block_on(async {
994+
Ok::<Vec<DeltaTableCommitInfo>, ReadError>(table.history(None).await?)
995+
})?;
996+
Self::get_reader_actions(table, base_path, history, column_types)
997+
}
998+
946999
fn get_reader_actions(
9471000
table: &DeltaTable,
9481001
base_path: &str,
@@ -1219,12 +1272,21 @@ impl DeltaTableReader {
12191272
None => {
12201273
// The Pathway time advancement (e.g. commit) is only possible if it was the
12211274
// last Parquet block within a version.
1275+
let is_last_in_version = self
1276+
.current_action
1277+
.as_ref()
1278+
.expect("current action must be set if there's a reader")
1279+
.is_last_in_version;
1280+
12221281
let source_event = ReadResult::FinishedSource {
1223-
commit_allowed: self
1224-
.current_action
1225-
.as_ref()
1226-
.expect("current action must be set if there's a reader")
1227-
.is_last_in_version,
1282+
commit_possibility: if is_last_in_version {
1283+
// The versions are read on-line, force to avoid squashing same-key events
1284+
// with the previous or the next versions.
1285+
// Note that it can be less strict if the batch only has additions.
1286+
CommitPossibility::Forced
1287+
} else {
1288+
CommitPossibility::Forbidden
1289+
},
12281290
};
12291291
self.reader = None;
12301292
self.current_action = None;

src/connectors/data_lake/iceberg.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ use crate::async_runtime::create_async_tokio_runtime;
3434
use crate::connectors::data_format::NDARRAY_SINGLE_ELEMENT_FIELD_NAME;
3535
use crate::connectors::data_lake::buffering::PayloadType;
3636
use crate::connectors::data_lake::MetadataPerColumn;
37-
use crate::connectors::data_storage::ConnectorMode;
37+
use crate::connectors::data_storage::{CommitPossibility, ConnectorMode};
3838
use crate::connectors::metadata::IcebergMetadata;
3939
use crate::connectors::{
4040
DataEventType, OffsetKey, OffsetValue, ReadError, ReadResult, Reader, ReaderContext,
@@ -475,7 +475,7 @@ impl IcebergReader {
475475
self.diff_queue
476476
.push_front(ReadResult::NewSource(new_source_metadata.into()));
477477
self.diff_queue.push_back(ReadResult::FinishedSource {
478-
commit_allowed: true,
478+
commit_possibility: CommitPossibility::Possible,
479479
});
480480
}
481481

0 commit comments

Comments
 (0)