Skip to content

Commit 4164b72

Browse files
committed
feat(tracing): add tracing spans to all I/O sections
Signed-off-by: Florian Valeye <[email protected]>
1 parent 4fbee1e commit 4164b72

File tree

13 files changed

+333
-21
lines changed

13 files changed

+333
-21
lines changed

crates/core/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ rstest = { version = "0.26.1" }
9999
serial_test = "3"
100100
tempfile = "3"
101101
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
102+
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
102103

103104
[features]
104105
default = ["rustls"]

crates/core/src/delta_datafusion/mod.rs

Lines changed: 121 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -936,6 +936,14 @@ fn join_batches_with_add_actions(
936936
}
937937

938938
/// Determine which files contain a record that satisfies the predicate
939+
#[tracing::instrument(
940+
skip_all,
941+
fields(
942+
version = snapshot.version(),
943+
total_files = tracing::field::Empty,
944+
matching_files = tracing::field::Empty
945+
)
946+
)]
939947
pub(crate) async fn find_files_scan(
940948
snapshot: &DeltaTableState,
941949
log_store: LogStoreRef,
@@ -948,6 +956,8 @@ pub(crate) async fn find_files_scan(
948956
.try_collect()
949957
.await?;
950958

959+
tracing::Span::current().record("total_files", candidate_map.len());
960+
951961
let scan_config = DeltaScanConfigBuilder {
952962
include_file_column: true,
953963
..Default::default()
@@ -987,12 +997,20 @@ pub(crate) async fn find_files_scan(
987997
let task_ctx = Arc::new(TaskContext::from(state));
988998
let path_batches = datafusion::physical_plan::collect(limit, task_ctx).await?;
989999

990-
join_batches_with_add_actions(
1000+
let matching_files = join_batches_with_add_actions(
9911001
path_batches,
9921002
candidate_map,
9931003
config.file_column_name.as_ref().unwrap(),
9941004
true,
995-
)
1005+
)?;
1006+
1007+
tracing::Span::current().record("matching_files", matching_files.len());
1008+
tracing::debug!(
1009+
matching_files = matching_files.len(),
1010+
"physical scan completed"
1011+
);
1012+
1013+
Ok(matching_files)
9961014
}
9971015

9981016
pub(crate) async fn scan_memory_table(
@@ -1051,6 +1069,15 @@ pub(crate) async fn scan_memory_table(
10511069
}
10521070

10531071
/// Finds files in a snapshot that match the provided predicate.
1072+
#[tracing::instrument(
1073+
skip_all,
1074+
fields(
1075+
version = snapshot.version(),
1076+
has_predicate = predicate.is_some(),
1077+
partition_scan = tracing::field::Empty,
1078+
candidate_count = tracing::field::Empty
1079+
)
1080+
)]
10541081
pub async fn find_files(
10551082
snapshot: &DeltaTableState,
10561083
log_store: LogStoreRef,
@@ -1072,25 +1099,52 @@ pub async fn find_files(
10721099
expr_properties.result?;
10731100

10741101
if expr_properties.partition_only {
1102+
tracing::Span::current().record("partition_scan", true);
1103+
tracing::debug!("using partition-only scan (memory table)");
10751104
let candidates = scan_memory_table(&log_store, snapshot, predicate).await?;
1105+
tracing::Span::current().record("candidate_count", candidates.len());
1106+
tracing::info!(
1107+
candidates = candidates.len(),
1108+
scan_type = "partition",
1109+
"file pruning completed"
1110+
);
10761111
Ok(FindFiles {
10771112
candidates,
10781113
partition_scan: true,
10791114
})
10801115
} else {
1116+
tracing::Span::current().record("partition_scan", false);
1117+
tracing::debug!("using physical data scan");
10811118
let candidates =
10821119
find_files_scan(snapshot, log_store, state, predicate.to_owned()).await?;
10831120

1121+
tracing::Span::current().record("candidate_count", candidates.len());
1122+
tracing::info!(
1123+
candidates = candidates.len(),
1124+
scan_type = "physical",
1125+
"file pruning completed"
1126+
);
10841127
Ok(FindFiles {
10851128
candidates,
10861129
partition_scan: false,
10871130
})
10881131
}
10891132
}
1090-
None => Ok(FindFiles {
1091-
candidates: snapshot.file_actions(&log_store).await?,
1092-
partition_scan: true,
1093-
}),
1133+
None => {
1134+
tracing::Span::current().record("partition_scan", true);
1135+
tracing::debug!("no predicate, returning all files");
1136+
let candidates = snapshot.file_actions(&log_store).await?;
1137+
tracing::Span::current().record("candidate_count", candidates.len());
1138+
tracing::info!(
1139+
candidates = candidates.len(),
1140+
scan_type = "all",
1141+
"file pruning completed"
1142+
);
1143+
Ok(FindFiles {
1144+
candidates,
1145+
partition_scan: true,
1146+
})
1147+
}
10941148
}
10951149
}
10961150

@@ -2388,4 +2442,65 @@ mod tests {
23882442
self.inner.rename_if_not_exists(from, to).await
23892443
}
23902444
}
2445+
2446+
#[test]
2447+
fn test_find_files_tracing_span() {
2448+
let span = tracing::info_span!(
2449+
"find_files",
2450+
version = 10,
2451+
has_predicate = true,
2452+
partition_scan = tracing::field::Empty,
2453+
candidate_count = tracing::field::Empty
2454+
);
2455+
2456+
let metadata = span.metadata().expect("span should have metadata");
2457+
assert_eq!(metadata.name(), "find_files");
2458+
assert_eq!(metadata.level(), &tracing::Level::INFO);
2459+
assert!(metadata.is_span());
2460+
2461+
span.record("partition_scan", true);
2462+
span.record("candidate_count", 45);
2463+
}
2464+
2465+
#[test]
2466+
fn test_find_files_scan_tracing_span() {
2467+
let span = tracing::info_span!(
2468+
"find_files_scan",
2469+
version = 10,
2470+
total_files = tracing::field::Empty,
2471+
matching_files = tracing::field::Empty
2472+
);
2473+
2474+
let metadata = span.metadata().expect("span should have metadata");
2475+
assert_eq!(metadata.name(), "find_files_scan");
2476+
assert_eq!(metadata.level(), &tracing::Level::INFO);
2477+
2478+
span.record("total_files", 1000);
2479+
span.record("matching_files", 12);
2480+
}
2481+
2482+
#[test]
2483+
fn test_find_files_result_structure() {
2484+
use crate::kernel::Add;
2485+
2486+
let find_result = FindFiles {
2487+
candidates: vec![Add {
2488+
path: "test.parquet".to_string(),
2489+
size: 1000,
2490+
modification_time: 0,
2491+
data_change: true,
2492+
stats: None,
2493+
partition_values: Default::default(),
2494+
tags: None,
2495+
deletion_vector: None,
2496+
base_row_id: None,
2497+
default_row_commit_version: None,
2498+
clustering_provider: None,
2499+
}],
2500+
partition_scan: true,
2501+
};
2502+
2503+
assert_eq!(find_result.candidates.len(), 1);
2504+
assert!(find_result.partition_scan);
2505+
}
23912506
}

crates/core/src/kernel/transaction/mod.rs

Lines changed: 100 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -619,14 +619,21 @@ impl<'a> std::future::IntoFuture for PreparedCommit<'a> {
619619
let commit_or_bytes = this.commit_or_bytes;
620620

621621
if this.table_data.is_none() {
622+
debug!("committing initial table version 0");
622623
this.log_store
623624
.write_commit_entry(0, commit_or_bytes.clone(), this.operation_id)
624625
.await?;
625626
return Ok(PostCommit {
626627
version: 0,
627628
data: this.data,
628-
create_checkpoint: false,
629-
cleanup_expired_logs: None,
629+
create_checkpoint: this
630+
.post_commit
631+
.map(|v| v.create_checkpoint)
632+
.unwrap_or_default(),
633+
cleanup_expired_logs: this
634+
.post_commit
635+
.map(|v| v.cleanup_expired_logs)
636+
.unwrap_or_default(),
630637
log_store: this.log_store,
631638
table_data: None,
632639
custom_execute_handler: this.post_commit_hook_handler,
@@ -637,9 +644,20 @@ impl<'a> std::future::IntoFuture for PreparedCommit<'a> {
637644
// unwrap() is safe here due to the above check
638645
let mut read_snapshot = this.table_data.unwrap().eager_snapshot().clone();
639646

647+
let commit_span = info_span!(
648+
"commit_with_retries",
649+
base_version = read_snapshot.version(),
650+
max_retries = this.max_retries,
651+
attempt = field::Empty,
652+
target_version = field::Empty,
653+
conflicts_checked = 0
654+
);
655+
let _enter = commit_span.enter();
656+
640657
let mut attempt_number = 1;
641658
let total_retries = this.max_retries + 1;
642659
while attempt_number <= total_retries {
660+
commit_span.record("attempt", attempt_number);
643661
let latest_version = this
644662
.log_store
645663
.get_latest_version(read_snapshot.version())
@@ -649,16 +667,28 @@ impl<'a> std::future::IntoFuture for PreparedCommit<'a> {
649667
// If max_retries are set to 0, do not try to use the conflict checker to resolve the conflict
650668
// and throw immediately
651669
if this.max_retries == 0 {
670+
warn!(
671+
base_version = read_snapshot.version(),
672+
latest_version = latest_version,
673+
"table updated but max_retries is 0, failing immediately"
674+
);
652675
return Err(
653676
TransactionError::MaxCommitAttempts(this.max_retries as i32).into()
654677
);
655678
}
656-
warn!("Attempting to write a transaction {} but the underlying table has been updated to {latest_version}\n{:?}", read_snapshot.version() + 1, this.log_store);
679+
warn!(
680+
base_version = read_snapshot.version(),
681+
latest_version = latest_version,
682+
versions_behind = latest_version - read_snapshot.version(),
683+
"table updated during transaction, checking for conflicts"
684+
);
657685
let mut steps = latest_version - read_snapshot.version();
686+
let mut conflicts_checked = 0;
658687

659688
// Need to check for conflicts with each version between the read_snapshot and
660689
// the latest!
661690
while steps != 0 {
691+
conflicts_checked += 1;
662692
let summary = WinningCommitSummary::try_new(
663693
this.log_store.as_ref(),
664694
latest_version - steps,
@@ -680,24 +710,40 @@ impl<'a> std::future::IntoFuture for PreparedCommit<'a> {
680710
match conflict_checker.check_conflicts() {
681711
Ok(_) => {}
682712
Err(err) => {
713+
error!(
714+
conflicts_checked = conflicts_checked,
715+
error = %err,
716+
"conflict detected, aborting transaction"
717+
);
683718
return Err(TransactionError::CommitConflict(err).into());
684719
}
685720
}
686721
steps -= 1;
687722
}
723+
commit_span.record("conflicts_checked", conflicts_checked);
724+
debug!(
725+
conflicts_checked = conflicts_checked,
726+
"all conflicts resolved, updating snapshot"
727+
);
688728
// Update snapshot to latest version after successful conflict check
689729
read_snapshot
690730
.update(&this.log_store, Some(latest_version as u64))
691731
.await?;
692732
}
693733
let version: i64 = latest_version + 1;
734+
commit_span.record("target_version", version);
694735

695736
match this
696737
.log_store
697738
.write_commit_entry(version, commit_or_bytes.clone(), this.operation_id)
698739
.await
699740
{
700741
Ok(()) => {
742+
info!(
743+
version = version,
744+
num_retries = attempt_number as u64 - 1,
745+
"transaction committed successfully"
746+
);
701747
return Ok(PostCommit {
702748
version,
703749
data: this.data,
@@ -718,12 +764,21 @@ impl<'a> std::future::IntoFuture for PreparedCommit<'a> {
718764
});
719765
}
720766
Err(TransactionError::VersionAlreadyExists(version)) => {
721-
error!("The transaction {version} already exists, will retry!");
767+
warn!(
768+
version = version,
769+
attempt = attempt_number,
770+
"version already exists, will retry"
771+
);
722772
// If the version already exists, loop through again and re-check
723773
// conflicts
724774
attempt_number += 1;
725775
}
726776
Err(err) => {
777+
error!(
778+
version = version,
779+
error = %err,
780+
"commit failed, aborting"
781+
);
727782
this.log_store
728783
.abort_commit_entry(version, commit_or_bytes, this.operation_id)
729784
.await?;
@@ -732,6 +787,10 @@ impl<'a> std::future::IntoFuture for PreparedCommit<'a> {
732787
}
733788
}
734789

790+
error!(
791+
max_retries = this.max_retries,
792+
"exceeded maximum commit attempts"
793+
);
735794
Err(TransactionError::MaxCommitAttempts(this.max_retries as i32).into())
736795
})
737796
}
@@ -967,4 +1026,41 @@ mod tests {
9671026
.await
9681027
.unwrap();
9691028
}
1029+
1030+
#[test]
1031+
fn test_commit_with_retries_tracing_span() {
1032+
let span = info_span!(
1033+
"commit_with_retries",
1034+
base_version = 5,
1035+
max_retries = 10,
1036+
attempt = field::Empty,
1037+
target_version = field::Empty,
1038+
conflicts_checked = 0
1039+
);
1040+
1041+
let metadata = span.metadata().expect("span should have metadata");
1042+
assert_eq!(metadata.name(), "commit_with_retries");
1043+
assert_eq!(metadata.level(), &Level::INFO);
1044+
assert!(metadata.is_span());
1045+
1046+
span.record("attempt", 1);
1047+
span.record("target_version", 6);
1048+
span.record("conflicts_checked", 2);
1049+
}
1050+
1051+
#[test]
1052+
fn test_commit_properties_with_retries() {
1053+
let props = CommitProperties::default()
1054+
.with_max_retries(5)
1055+
.with_create_checkpoint(false);
1056+
1057+
assert_eq!(props.max_retries, 5);
1058+
assert!(!props.create_checkpoint);
1059+
}
1060+
1061+
#[test]
1062+
fn test_commit_metrics() {
1063+
let metrics = CommitMetrics { num_retries: 3 };
1064+
assert_eq!(metrics.num_retries, 3);
1065+
}
9701066
}

0 commit comments

Comments
 (0)