Skip to content

Commit 3f02b2c

Browse files
committed
feat(tracing): Add tracing spans to all I/O sections
Signed-off-by: Florian Valeye <[email protected]>
1 parent 4fbee1e commit 3f02b2c

File tree

12 files changed

+234
-21
lines changed

12 files changed

+234
-21
lines changed

crates/core/src/delta_datafusion/mod.rs

Lines changed: 60 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -936,6 +936,14 @@ fn join_batches_with_add_actions(
936936
}
937937

938938
/// Determine which files contain a record that satisfies the predicate
939+
#[tracing::instrument(
940+
skip_all,
941+
fields(
942+
version = snapshot.version(),
943+
total_files = tracing::field::Empty,
944+
matching_files = tracing::field::Empty
945+
)
946+
)]
939947
pub(crate) async fn find_files_scan(
940948
snapshot: &DeltaTableState,
941949
log_store: LogStoreRef,
@@ -948,6 +956,8 @@ pub(crate) async fn find_files_scan(
948956
.try_collect()
949957
.await?;
950958

959+
tracing::Span::current().record("total_files", candidate_map.len());
960+
951961
let scan_config = DeltaScanConfigBuilder {
952962
include_file_column: true,
953963
..Default::default()
@@ -987,12 +997,20 @@ pub(crate) async fn find_files_scan(
987997
let task_ctx = Arc::new(TaskContext::from(state));
988998
let path_batches = datafusion::physical_plan::collect(limit, task_ctx).await?;
989999

990-
join_batches_with_add_actions(
1000+
let matching_files = join_batches_with_add_actions(
9911001
path_batches,
9921002
candidate_map,
9931003
config.file_column_name.as_ref().unwrap(),
9941004
true,
995-
)
1005+
)?;
1006+
1007+
tracing::Span::current().record("matching_files", matching_files.len());
1008+
tracing::debug!(
1009+
matching_files = matching_files.len(),
1010+
"physical scan completed"
1011+
);
1012+
1013+
Ok(matching_files)
9961014
}
9971015

9981016
pub(crate) async fn scan_memory_table(
@@ -1051,6 +1069,15 @@ pub(crate) async fn scan_memory_table(
10511069
}
10521070

10531071
/// Finds files in a snapshot that match the provided predicate.
1072+
#[tracing::instrument(
1073+
skip_all,
1074+
fields(
1075+
version = snapshot.version(),
1076+
has_predicate = predicate.is_some(),
1077+
partition_scan = tracing::field::Empty,
1078+
candidate_count = tracing::field::Empty
1079+
)
1080+
)]
10541081
pub async fn find_files(
10551082
snapshot: &DeltaTableState,
10561083
log_store: LogStoreRef,
@@ -1072,25 +1099,52 @@ pub async fn find_files(
10721099
expr_properties.result?;
10731100

10741101
if expr_properties.partition_only {
1102+
tracing::Span::current().record("partition_scan", true);
1103+
tracing::debug!("using partition-only scan (memory table)");
10751104
let candidates = scan_memory_table(&log_store, snapshot, predicate).await?;
1105+
tracing::Span::current().record("candidate_count", candidates.len());
1106+
tracing::info!(
1107+
candidates = candidates.len(),
1108+
scan_type = "partition",
1109+
"file pruning completed"
1110+
);
10761111
Ok(FindFiles {
10771112
candidates,
10781113
partition_scan: true,
10791114
})
10801115
} else {
1116+
tracing::Span::current().record("partition_scan", false);
1117+
tracing::debug!("using physical data scan");
10811118
let candidates =
10821119
find_files_scan(snapshot, log_store, state, predicate.to_owned()).await?;
10831120

1121+
tracing::Span::current().record("candidate_count", candidates.len());
1122+
tracing::info!(
1123+
candidates = candidates.len(),
1124+
scan_type = "physical",
1125+
"file pruning completed"
1126+
);
10841127
Ok(FindFiles {
10851128
candidates,
10861129
partition_scan: false,
10871130
})
10881131
}
10891132
}
1090-
None => Ok(FindFiles {
1091-
candidates: snapshot.file_actions(&log_store).await?,
1092-
partition_scan: true,
1093-
}),
1133+
None => {
1134+
tracing::Span::current().record("partition_scan", true);
1135+
tracing::debug!("no predicate, returning all files");
1136+
let candidates = snapshot.file_actions(&log_store).await?;
1137+
tracing::Span::current().record("candidate_count", candidates.len());
1138+
tracing::info!(
1139+
candidates = candidates.len(),
1140+
scan_type = "all",
1141+
"file pruning completed"
1142+
);
1143+
Ok(FindFiles {
1144+
candidates,
1145+
partition_scan: true,
1146+
})
1147+
}
10941148
}
10951149
}
10961150

crates/core/src/kernel/transaction/mod.rs

Lines changed: 63 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -619,14 +619,21 @@ impl<'a> std::future::IntoFuture for PreparedCommit<'a> {
619619
let commit_or_bytes = this.commit_or_bytes;
620620

621621
if this.table_data.is_none() {
622+
tracing::debug!("committing initial table version 0");
622623
this.log_store
623624
.write_commit_entry(0, commit_or_bytes.clone(), this.operation_id)
624625
.await?;
625626
return Ok(PostCommit {
626627
version: 0,
627628
data: this.data,
628-
create_checkpoint: false,
629-
cleanup_expired_logs: None,
629+
create_checkpoint: this
630+
.post_commit
631+
.map(|v| v.create_checkpoint)
632+
.unwrap_or_default(),
633+
cleanup_expired_logs: this
634+
.post_commit
635+
.map(|v| v.cleanup_expired_logs)
636+
.unwrap_or_default(),
630637
log_store: this.log_store,
631638
table_data: None,
632639
custom_execute_handler: this.post_commit_hook_handler,
@@ -637,9 +644,20 @@ impl<'a> std::future::IntoFuture for PreparedCommit<'a> {
637644
// unwrap() is safe here due to the above check
638645
let mut read_snapshot = this.table_data.unwrap().eager_snapshot().clone();
639646

647+
let commit_span = tracing::info_span!(
648+
"commit_with_retries",
649+
base_version = read_snapshot.version(),
650+
max_retries = this.max_retries,
651+
attempt = tracing::field::Empty,
652+
target_version = tracing::field::Empty,
653+
conflicts_checked = 0
654+
);
655+
let _enter = commit_span.enter();
656+
640657
let mut attempt_number = 1;
641658
let total_retries = this.max_retries + 1;
642659
while attempt_number <= total_retries {
660+
commit_span.record("attempt", attempt_number);
643661
let latest_version = this
644662
.log_store
645663
.get_latest_version(read_snapshot.version())
@@ -649,16 +667,28 @@ impl<'a> std::future::IntoFuture for PreparedCommit<'a> {
649667
// If max_retries are set to 0, do not try to use the conflict checker to resolve the conflict
650668
// and throw immediately
651669
if this.max_retries == 0 {
670+
tracing::warn!(
671+
base_version = read_snapshot.version(),
672+
latest_version = latest_version,
673+
"table updated but max_retries is 0, failing immediately"
674+
);
652675
return Err(
653676
TransactionError::MaxCommitAttempts(this.max_retries as i32).into()
654677
);
655678
}
656-
warn!("Attempting to write a transaction {} but the underlying table has been updated to {latest_version}\n{:?}", read_snapshot.version() + 1, this.log_store);
679+
tracing::warn!(
680+
base_version = read_snapshot.version(),
681+
latest_version = latest_version,
682+
versions_behind = latest_version - read_snapshot.version(),
683+
"table updated during transaction, checking for conflicts"
684+
);
657685
let mut steps = latest_version - read_snapshot.version();
686+
let mut conflicts_checked = 0;
658687

659688
// Need to check for conflicts with each version between the read_snapshot and
660689
// the latest!
661690
while steps != 0 {
691+
conflicts_checked += 1;
662692
let summary = WinningCommitSummary::try_new(
663693
this.log_store.as_ref(),
664694
latest_version - steps,
@@ -680,24 +710,40 @@ impl<'a> std::future::IntoFuture for PreparedCommit<'a> {
680710
match conflict_checker.check_conflicts() {
681711
Ok(_) => {}
682712
Err(err) => {
713+
tracing::error!(
714+
conflicts_checked = conflicts_checked,
715+
error = %err,
716+
"conflict detected, aborting transaction"
717+
);
683718
return Err(TransactionError::CommitConflict(err).into());
684719
}
685720
}
686721
steps -= 1;
687722
}
723+
commit_span.record("conflicts_checked", conflicts_checked);
724+
tracing::debug!(
725+
conflicts_checked = conflicts_checked,
726+
"all conflicts resolved, updating snapshot"
727+
);
688728
// Update snapshot to latest version after successful conflict check
689729
read_snapshot
690730
.update(&this.log_store, Some(latest_version as u64))
691731
.await?;
692732
}
693733
let version: i64 = latest_version + 1;
734+
commit_span.record("target_version", version);
694735

695736
match this
696737
.log_store
697738
.write_commit_entry(version, commit_or_bytes.clone(), this.operation_id)
698739
.await
699740
{
700741
Ok(()) => {
742+
tracing::info!(
743+
version = version,
744+
num_retries = attempt_number as u64 - 1,
745+
"transaction committed successfully"
746+
);
701747
return Ok(PostCommit {
702748
version,
703749
data: this.data,
@@ -718,12 +764,21 @@ impl<'a> std::future::IntoFuture for PreparedCommit<'a> {
718764
});
719765
}
720766
Err(TransactionError::VersionAlreadyExists(version)) => {
721-
error!("The transaction {version} already exists, will retry!");
767+
tracing::warn!(
768+
version = version,
769+
attempt = attempt_number,
770+
"version already exists, will retry"
771+
);
722772
// If the version already exists, loop through again and re-check
723773
// conflicts
724774
attempt_number += 1;
725775
}
726776
Err(err) => {
777+
tracing::error!(
778+
version = version,
779+
error = %err,
780+
"commit failed, aborting"
781+
);
727782
this.log_store
728783
.abort_commit_entry(version, commit_or_bytes, this.operation_id)
729784
.await?;
@@ -732,6 +787,10 @@ impl<'a> std::future::IntoFuture for PreparedCommit<'a> {
732787
}
733788
}
734789

790+
tracing::error!(
791+
max_retries = this.max_retries,
792+
"exceeded maximum commit attempts"
793+
);
735794
Err(TransactionError::MaxCommitAttempts(this.max_retries as i32).into())
736795
})
737796
}

crates/core/src/logstore/mod.rs

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -680,19 +680,31 @@ pub async fn get_latest_version(
680680
}
681681

682682
/// Read delta log for a specific version
683+
#[tracing::instrument(skip(storage), fields(version = version, path = %commit_uri_from_version(version)))]
683684
pub async fn read_commit_entry(
684685
storage: &dyn ObjectStore,
685686
version: i64,
686687
) -> DeltaResult<Option<Bytes>> {
687688
let commit_uri = commit_uri_from_version(version);
688689
match storage.get(&commit_uri).await {
689-
Ok(res) => Ok(Some(res.bytes().await?)),
690-
Err(ObjectStoreError::NotFound { .. }) => Ok(None),
691-
Err(err) => Err(err.into()),
690+
Ok(res) => {
691+
let bytes = res.bytes().await?;
692+
tracing::debug!(size = bytes.len(), "commit entry read successfully");
693+
Ok(Some(bytes))
694+
}
695+
Err(ObjectStoreError::NotFound { .. }) => {
696+
tracing::debug!("commit entry not found");
697+
Ok(None)
698+
}
699+
Err(err) => {
700+
tracing::error!(error = %err, "failed to read commit entry");
701+
Err(err.into())
702+
}
692703
}
693704
}
694705

695706
/// Default implementation for writing a commit entry
707+
#[tracing::instrument(skip(storage), fields(version = version, tmp_path = %tmp_commit, commit_path = %commit_uri_from_version(version)))]
696708
pub async fn write_commit_entry(
697709
storage: &dyn ObjectStore,
698710
version: i64,
@@ -706,21 +718,28 @@ pub async fn write_commit_entry(
706718
.map_err(|err| -> TransactionError {
707719
match err {
708720
ObjectStoreError::AlreadyExists { .. } => {
721+
tracing::warn!("commit entry already exists");
709722
TransactionError::VersionAlreadyExists(version)
710723
}
711-
_ => TransactionError::from(err),
724+
_ => {
725+
tracing::error!(error = %err, "failed to write commit entry");
726+
TransactionError::from(err)
727+
}
712728
}
713729
})?;
730+
tracing::debug!("commit entry written successfully");
714731
Ok(())
715732
}
716733

717734
/// Default implementation for aborting a commit entry
735+
#[tracing::instrument(skip(storage), fields(version = _version, tmp_path = %tmp_commit))]
718736
pub async fn abort_commit_entry(
719737
storage: &dyn ObjectStore,
720738
_version: i64,
721739
tmp_commit: &Path,
722740
) -> Result<(), TransactionError> {
723741
storage.delete_with_retries(tmp_commit, 15).await?;
742+
tracing::debug!("commit entry aborted successfully");
724743
Ok(())
725744
}
726745

crates/core/src/logstore/storage/retry_ext.rs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ pub trait ObjectStoreRetryExt: ObjectStore {
3131
/// bytes to location, or fail. No clients should be able to observe a partially written object
3232
///
3333
/// Note that `put_with_opts` may have precondition semantics, and thus may not be retriable.
34+
#[tracing::instrument(skip(self, bytes), fields(path = %location, size = bytes.content_length()))]
3435
async fn put_with_retries(
3536
&self,
3637
location: &Path,
@@ -40,15 +41,20 @@ pub trait ObjectStoreRetryExt: ObjectStore {
4041
let mut attempt_number = 1;
4142
while attempt_number <= max_retries {
4243
match self.put(location, bytes.clone()).await {
43-
Ok(result) => return Ok(result),
44+
Ok(result) => {
45+
tracing::debug!(attempt = attempt_number, "put operation succeeded");
46+
return Ok(result);
47+
}
4448
Err(err) if attempt_number == max_retries => {
49+
tracing::warn!(attempt = attempt_number, error = %err, "put operation failed after max retries");
4550
return Err(err);
4651
}
4752
Err(Error::Generic { store, source }) => {
4853
debug!("put_with_retries attempt {attempt_number} failed: {store} {source}");
4954
attempt_number += 1;
5055
}
5156
Err(err) => {
57+
tracing::error!(attempt = attempt_number, error = %err, "put operation failed with non-retryable error");
5258
return Err(err);
5359
}
5460
}
@@ -57,19 +63,25 @@ pub trait ObjectStoreRetryExt: ObjectStore {
5763
}
5864

5965
/// Delete the object at the specified location
66+
#[tracing::instrument(skip(self), fields(path = %location))]
6067
async fn delete_with_retries(&self, location: &Path, max_retries: usize) -> Result<()> {
6168
let mut attempt_number = 1;
6269
while attempt_number <= max_retries {
6370
match self.delete(location).await {
64-
Ok(()) | Err(Error::NotFound { .. }) => return Ok(()),
71+
Ok(()) | Err(Error::NotFound { .. }) => {
72+
tracing::debug!(attempt = attempt_number, "delete operation succeeded");
73+
return Ok(());
74+
}
6575
Err(err) if attempt_number == max_retries => {
76+
tracing::warn!(attempt = attempt_number, error = %err, "delete operation failed after max retries");
6677
return Err(err);
6778
}
6879
Err(Error::Generic { store, source }) => {
6980
debug!("delete_with_retries attempt {attempt_number} failed: {store} {source}");
7081
attempt_number += 1;
7182
}
7283
Err(err) => {
84+
tracing::error!(attempt = attempt_number, error = %err, "delete operation failed with non-retryable error");
7385
return Err(err);
7486
}
7587
}

0 commit comments

Comments
 (0)