Skip to content

Commit 8e57a18

Browse files
committed
Remove the bounded channel for queueing into the parquet sink
1 parent d60f761 commit 8e57a18

File tree

2 files changed

+4
-4
lines changed

2 files changed

+4
-4
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "hotdog"
3-
version = "1.2.1"
3+
version = "1.2.2"
44
authors = ["R. Tyler Croy <[email protected]>"]
55
edition = "2024"
66

src/sink/parquet.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
use super::{Message, Sink};
77

88
use arrow_json::reader::{ReaderBuilder, infer_json_schema};
9-
use async_channel::{Receiver, Sender, bounded};
9+
use async_channel::{Receiver, Sender, unbounded};
1010
use async_compat::Compat;
1111
use dipstick::InputQueueScope;
1212
use object_store::ObjectStore;
@@ -52,7 +52,7 @@ impl Sink for Parquet {
5252
schemas: &[crate::settings::Schema],
5353
_stats: InputQueueScope,
5454
) -> Self {
55-
let (tx, rx) = bounded(1024);
55+
let (tx, rx) = unbounded();
5656
// [object_store] largely expects environment variables to be all lowercased for
5757
// consideration as options
5858
let opts: HashMap<String, String> =
@@ -136,7 +136,7 @@ impl Sink for Parquet {
136136
destination,
137137
payload,
138138
} => {
139-
let _span = span!(Level::INFO, "Parquet sink recv");
139+
let _span = span!(Level::DEBUG, "Parquet sink recv");
140140

141141
if !buffer.contains_key(&destination) {
142142
buffer.insert(destination.clone(), vec![]);

0 commit comments

Comments
 (0)