Skip to content

Commit a22a97e

Browse files
authored
feat: tracing spans across threadpool (#3894)
# Description The description of the main changes of your pull request # Related Issue(s) <!--- For example: - closes #106 ---> # Documentation <!--- Share links to useful documentation ---> --------- Signed-off-by: Ion Koutsouris <[email protected]>
1 parent acfaee5 commit a22a97e

File tree

5 files changed

+53
-22
lines changed

5 files changed

+53
-22
lines changed

crates/core/src/kernel/mod.rs

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,11 @@
22
//!
33
//! The Kernel module contains all the logic for reading and processing the Delta Lake transaction log.
44
5-
use std::sync::{Arc, LazyLock};
6-
75
use delta_kernel::engine::arrow_expression::ArrowEvaluationHandler;
6+
use std::sync::{Arc, LazyLock};
7+
use tokio::task::JoinHandle;
8+
use tracing::dispatcher;
9+
use tracing::Span;
810

911
pub mod arrow;
1012
pub mod error;
@@ -23,3 +25,20 @@ pub use snapshot::*;
2325

2426
pub(crate) static ARROW_HANDLER: LazyLock<Arc<ArrowEvaluationHandler>> =
2527
LazyLock::new(|| Arc::new(ArrowEvaluationHandler {}));
28+
29+
pub(crate) fn spawn_blocking_with_span<F, R>(f: F) -> JoinHandle<R>
30+
where
31+
F: FnOnce() -> R + Send + 'static,
32+
R: Send + 'static,
33+
{
34+
// Capture the current dispatcher and span
35+
let dispatch = dispatcher::get_default(|d| d.clone());
36+
let span = Span::current();
37+
38+
tokio::task::spawn_blocking(move || {
39+
dispatcher::with_default(&dispatch, || {
40+
let _enter = span.enter();
41+
f()
42+
})
43+
})
44+
}

crates/core/src/kernel/snapshot/mod.rs

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -40,11 +40,12 @@ use object_store::path::Path;
4040
use object_store::ObjectStore;
4141
use serde_json::Deserializer;
4242
use tokio::task::spawn_blocking;
43+
use tracing::Instrument;
4344
use url::Url;
4445

4546
use super::{Action, CommitInfo, Metadata, Protocol};
4647
use crate::kernel::arrow::engine_ext::{kernel_to_arrow, ExpressionEvaluatorExt};
47-
use crate::kernel::{StructType, ARROW_HANDLER};
48+
use crate::kernel::{spawn_blocking_with_span, StructType, ARROW_HANDLER};
4849
use crate::logstore::{LogStore, LogStoreExt};
4950
use crate::{to_kernel_predicate, DeltaResult, DeltaTableConfig, DeltaTableError, PartitionFilter};
5051

@@ -80,7 +81,7 @@ impl Snapshot {
8081
config: DeltaTableConfig,
8182
version: Option<Version>,
8283
) -> DeltaResult<Self> {
83-
let snapshot = match spawn_blocking(move || {
84+
let snapshot = match spawn_blocking_with_span(move || {
8485
let mut builder = KernelSnapshot::builder_for(table_root);
8586
if let Some(version) = version {
8687
builder = builder.at_version(version);
@@ -162,7 +163,7 @@ impl Snapshot {
162163
// TODO: bundle operation id with log store ...
163164
let engine = log_store.engine(None);
164165
let current = self.inner.clone();
165-
let snapshot = spawn_blocking(move || {
166+
let snapshot = spawn_blocking_with_span(move || {
166167
let mut builder = KernelSnapshot::builder_from(current);
167168
if let Some(version) = target_version {
168169
builder = builder.at_version(version);
@@ -432,9 +433,10 @@ impl Snapshot {
432433
// TODO: bundle operation id with log store ...
433434
let engine = log_store.engine(None);
434435
let inner = self.inner.clone();
435-
let version = spawn_blocking(move || inner.get_app_id_version(&app_id, engine.as_ref()))
436-
.await
437-
.map_err(|e| DeltaTableError::GenericError { source: e.into() })??;
436+
let version =
437+
spawn_blocking_with_span(move || inner.get_app_id_version(&app_id, engine.as_ref()))
438+
.await
439+
.map_err(|e| DeltaTableError::GenericError { source: e.into() })??;
438440
Ok(version)
439441
}
440442

@@ -451,9 +453,10 @@ impl Snapshot {
451453
let engine = log_store.engine(None);
452454
let inner = self.inner.clone();
453455
let domain = domain.to_string();
454-
let metadata = spawn_blocking(move || inner.get_domain_metadata(&domain, engine.as_ref()))
455-
.await
456-
.map_err(|e| DeltaTableError::GenericError { source: e.into() })??;
456+
let metadata =
457+
spawn_blocking_with_span(move || inner.get_domain_metadata(&domain, engine.as_ref()))
458+
.await
459+
.map_err(|e| DeltaTableError::GenericError { source: e.into() })??;
457460
Ok(metadata)
458461
}
459462
}

crates/core/src/kernel/snapshot/stream.rs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
//! the code in this file is hoisted from datafusion with only slight modifications
22
//!
3-
use std::pin::Pin;
4-
53
use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
64
use futures::stream::BoxStream;
75
use futures::{Future, Stream, StreamExt};
6+
use std::pin::Pin;
87
use tokio::sync::mpsc::{Receiver, Sender};
98
use tokio::task::JoinSet;
9+
use tracing::dispatcher;
10+
use tracing::Span;
1011

1112
use crate::errors::DeltaResult;
1213
use crate::DeltaTableError;
@@ -106,7 +107,16 @@ impl<O: Send + 'static> ReceiverStreamBuilder<O> {
106107
F: FnOnce() -> DeltaResult<()>,
107108
F: Send + 'static,
108109
{
109-
self.join_set.spawn_blocking(f);
110+
// Capture the current dispatcher and span
111+
let dispatch = dispatcher::get_default(|d| d.clone());
112+
let span = Span::current();
113+
114+
self.join_set.spawn_blocking(move || {
115+
dispatcher::with_default(&dispatch, || {
116+
let _enter = span.enter();
117+
f()
118+
})
119+
});
110120
}
111121

112122
/// Create a stream of all data written to `tx`

crates/core/src/logstore/mod.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,13 +69,12 @@ use serde::ser::SerializeSeq;
6969
use serde::{Deserialize, Serialize};
7070
use serde_json::Deserializer;
7171
use tokio::runtime::RuntimeFlavor;
72-
use tokio::task::spawn_blocking;
7372
use tracing::*;
7473
use url::Url;
7574
use uuid::Uuid;
7675

7776
use crate::kernel::transaction::TransactionError;
78-
use crate::kernel::Action;
77+
use crate::kernel::{spawn_blocking_with_span, Action};
7978
use crate::{DeltaResult, DeltaTableError};
8079

8180
pub use self::config::StorageConfig;
@@ -660,7 +659,7 @@ pub async fn get_latest_version(
660659
let storage = log_store.engine(None).storage_handler();
661660
let log_root = log_store.log_root_url();
662661

663-
let segment = spawn_blocking(move || {
662+
let segment = spawn_blocking_with_span(move || {
664663
LogSegment::for_table_changes(storage.as_ref(), log_root, current_version as u64, None)
665664
})
666665
.await

crates/core/src/protocol/checkpoints.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,10 @@ use object_store::ObjectStore;
1717
use parquet::arrow::async_writer::ParquetObjectWriter;
1818
use parquet::arrow::AsyncArrowWriter;
1919
use regex::Regex;
20-
use tokio::task::spawn_blocking;
2120
use tracing::{debug, error};
2221
use uuid::Uuid;
2322

23+
use crate::kernel::spawn_blocking_with_span;
2424
use crate::logstore::{LogStore, LogStoreExt, DELTA_LOG_REGEX};
2525
use crate::table::config::TablePropertiesExt as _;
2626
use crate::{open_table_with_version, DeltaTable};
@@ -45,7 +45,7 @@ pub(crate) async fn create_checkpoint_for(
4545
let engine = log_store.engine(operation_id);
4646

4747
let task_engine = engine.clone();
48-
let snapshot = spawn_blocking(move || {
48+
let snapshot = spawn_blocking_with_span(move || {
4949
Snapshot::builder_for(table_root)
5050
.at_version(version)
5151
.build(task_engine.as_ref())
@@ -59,7 +59,7 @@ pub(crate) async fn create_checkpoint_for(
5959
let cp_path = Path::from_url_path(cp_url.path())?;
6060
let mut cp_data = cp_writer.checkpoint_data(engine.as_ref())?;
6161

62-
let (first_batch, mut cp_data) = spawn_blocking(move || {
62+
let (first_batch, mut cp_data) = spawn_blocking_with_span(move || {
6363
let Some(first_batch) = cp_data.next() else {
6464
return Err(DeltaTableError::Generic("No data".to_string()));
6565
};
@@ -82,7 +82,7 @@ pub(crate) async fn create_checkpoint_for(
8282

8383
let mut current_batch;
8484
loop {
85-
(current_batch, cp_data) = spawn_blocking(move || {
85+
(current_batch, cp_data) = spawn_blocking_with_span(move || {
8686
let Some(first_batch) = cp_data.next() else {
8787
return Ok::<_, DeltaTableError>((None, cp_data));
8888
};
@@ -116,7 +116,7 @@ pub(crate) async fn create_checkpoint_for(
116116
last_modified: file_meta.last_modified.timestamp_millis(),
117117
};
118118

119-
spawn_blocking(move || cp_writer.finalize(engine.as_ref(), &file_meta, cp_data))
119+
spawn_blocking_with_span(move || cp_writer.finalize(engine.as_ref(), &file_meta, cp_data))
120120
.await
121121
.map_err(|e| DeltaTableError::Generic(e.to_string()))??;
122122

0 commit comments

Comments
 (0)