Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ on:
- '.github/ISSUE_TEMPLATE/**'
branches:
- main
- 1.*.x
- 0.*.x
- pr/**/ci
- ci-*

concurrency:
group: ${{ github.workflow }}-${{ github.head_ref || github.ref || github.run_id }}
Expand Down
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ members = [
name = "sea-streamer"
version = "0.5.2"
authors = ["Chris Tsang <[email protected]>"]
edition = "2021"
edition = "2024"
description = "🌊 The stream processing toolkit for Rust"
license = "MIT OR Apache-2.0"
documentation = "https://docs.rs/sea-streamer"
repository = "https://github.com/SeaQL/sea-streamer"
categories = ["concurrency"]
keywords = ["async", "stream", "kafka", "stream-processing"]
rust-version = "1.60"
rust-version = "1.85.0"

[package.metadata.docs.rs]
features = ["json", "kafka", "stdio", "socket"]
Expand Down
4 changes: 2 additions & 2 deletions benchmark/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
name = "sea-streamer-benchmark"
version = "0.5.0"
authors = ["Chris Tsang <[email protected]>"]
edition = "2021"
edition = "2024"
description = "🌊 The stream processing toolkit for Rust"
license = "MIT OR Apache-2.0"
documentation = "https://docs.rs/sea-streamer"
repository = "https://github.com/SeaQL/sea-streamer"
categories = ["concurrency"]
rust-version = "1.60"
rust-version = "1.85.0"

[dependencies]
anyhow = { version = "1" }
Expand Down
6 changes: 4 additions & 2 deletions benchmark/src/bin/baseline.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use anyhow::Result;
use clap::Parser;
use sea_streamer::{runtime::sleep, StreamUrl};
use sea_streamer::{StreamUrl, runtime::sleep};
use std::time::Duration;

#[derive(Debug, Parser)]
Expand All @@ -22,7 +22,9 @@ async fn main() -> Result<()> {
std::hint::black_box(stream);

for i in 0..100_000 {
let message = format!("The this the message payload {i:0>5}: Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo");
let message = format!(
"The this the message payload {i:0>5}: Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo"
);
std::hint::black_box(message);
if i % 1000 == 0 {
sleep(Duration::from_nanos(1)).await;
Expand Down
4 changes: 3 additions & 1 deletion benchmark/src/bin/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ async fn main() -> Result<()> {
.await?;

for i in 0..100_000 {
let message = format!("The this the message payload {i:0>5}: Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo");
let message = format!(
"The this the message payload {i:0>5}: Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo"
);
producer.send(message)?;
if i % 1000 == 0 {
tokio::time::sleep(Duration::from_nanos(1)).await;
Expand Down
4 changes: 2 additions & 2 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
name = "sea-streamer-examples"
version = "0.5.0"
authors = ["Chris Tsang <[email protected]>"]
edition = "2021"
edition = "2024"
description = "🌊 The stream processing toolkit for Rust"
license = "MIT OR Apache-2.0"
documentation = "https://docs.rs/sea-streamer"
repository = "https://github.com/SeaQL/sea-streamer"
categories = ["concurrency"]
rust-version = "1.60"
rust-version = "1.85.0"

[dependencies]
anyhow = { version = "1" }
Expand Down
3 changes: 2 additions & 1 deletion examples/price-feed/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
[package]
name = "sea-streamer-price-feed"
version = "0.1.0"
edition = "2021"
edition = "2024"
rust-version = "1.85.0"
publish = false

[dependencies]
Expand Down
4 changes: 2 additions & 2 deletions examples/price-feed/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use anyhow::{bail, Result};
use anyhow::{Result, bail};
use async_tungstenite::tungstenite::Message;
use clap::Parser;
use rust_decimal::Decimal;
use sea_streamer::{
Producer, SeaProducer, SeaStreamer, Streamer, StreamerUri, TIMESTAMP_FORMAT, Timestamp,
export::futures::{SinkExt, StreamExt},
Producer, SeaProducer, SeaStreamer, Streamer, StreamerUri, Timestamp, TIMESTAMP_FORMAT,
};
use serde::{Deserialize, Serialize};

Expand Down
3 changes: 2 additions & 1 deletion examples/sea-orm-sink/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
[package]
name = "sea-streamer-sea-orm-sink"
version = "0.1.0"
edition = "2021"
edition = "2024"
rust-version = "1.85.0"
publish = false

[dependencies]
Expand Down
2 changes: 1 addition & 1 deletion examples/src/bin/blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ use flume::bounded;
use std::time::Duration;

use sea_streamer::{
runtime::{sleep, spawn_task},
Buffer, Consumer, ConsumerMode, ConsumerOptions, Message, Producer, SeaConsumer,
SeaConsumerOptions, SeaMessage, SeaProducer, SeaStreamer, SharedMessage, StreamUrl, Streamer,
runtime::{sleep, spawn_task},
};

#[derive(Debug, Parser)]
Expand Down
2 changes: 1 addition & 1 deletion examples/src/bin/buffered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ use flume::bounded;
use std::time::Duration;

use sea_streamer::{
runtime::{sleep, spawn_task},
Buffer, Consumer, ConsumerMode, ConsumerOptions, Message, Producer, SeaConsumer,
SeaConsumerOptions, SeaMessage, SeaProducer, SeaStreamer, SharedMessage, StreamUrl, Streamer,
runtime::{sleep, spawn_task},
};

#[derive(Debug, Parser)]
Expand Down
4 changes: 2 additions & 2 deletions examples/src/bin/resumable.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use anyhow::Result;
use clap::Parser;
use sea_streamer::{
kafka::AutoOffsetReset,
redis::{AutoCommit, AutoStreamReset},
Buffer, Consumer, ConsumerMode, ConsumerOptions, Message, Producer, SeaConsumer,
SeaConsumerOptions, SeaMessage, SeaProducer, SeaStreamer, SeaStreamerBackend, StreamUrl,
Streamer,
kafka::AutoOffsetReset,
redis::{AutoCommit, AutoStreamReset},
};
use std::time::Duration;

Expand Down
4 changes: 2 additions & 2 deletions sea-streamer-file/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@
name = "sea-streamer-file"
version = "0.5.2"
authors = ["Chris Tsang <[email protected]>"]
edition = "2021"
edition = "2024"
description = "🌊 SeaStreamer File Backend"
license = "MIT OR Apache-2.0"
documentation = "https://docs.rs/sea-streamer-file"
repository = "https://github.com/SeaQL/sea-streamer"
categories = ["concurrency"]
keywords = ["async", "stream", "stream-processing"]
rust-version = "1.60"
rust-version = "1.85.0"

[package.metadata.docs.rs]
features = []
Expand Down
2 changes: 1 addition & 1 deletion sea-streamer-file/src/bin/clock.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use anyhow::{anyhow, Result};
use anyhow::{Result, anyhow};
use clap::Parser;
use sea_streamer_file::{FileId, FileStreamer};
use sea_streamer_types::{Producer, StreamKey, Streamer};
Expand Down
2 changes: 1 addition & 1 deletion sea-streamer-file/src/bin/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
use anyhow::Result;
use clap::Parser;
use sea_streamer_file::{
format::MessageJson, is_end_of_stream, FileErr, FileId, MessageSource, StreamMode,
FileErr, FileId, MessageSource, StreamMode, format::MessageJson, is_end_of_stream,
};
use sea_streamer_types::{Buffer, Message, TIMESTAMP_FORMAT};
use std::str::FromStr;
Expand Down
4 changes: 2 additions & 2 deletions sea-streamer-file/src/bin/sink.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use anyhow::{anyhow, Result};
use anyhow::{Result, anyhow};
use clap::Parser;
use sea_streamer_file::{FileId, MessageSink, DEFAULT_BEACON_INTERVAL, DEFAULT_FILE_SIZE_LIMIT};
use sea_streamer_file::{DEFAULT_BEACON_INTERVAL, DEFAULT_FILE_SIZE_LIMIT, FileId, MessageSink};
use sea_streamer_types::{MessageHeader, OwnedMessage, ShardId, StreamKey, Timestamp};
use std::time::Duration;

Expand Down
2 changes: 1 addition & 1 deletion sea-streamer-file/src/bin/stdin-to-file.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use anyhow::Result;
use clap::Parser;
use sea_streamer_file::export::flume::{unbounded, Receiver};
use sea_streamer_file::export::flume::{Receiver, unbounded};
use sea_streamer_file::{AsyncFile, FileId};

#[derive(Debug, Parser)]
Expand Down
2 changes: 1 addition & 1 deletion sea-streamer-file/src/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::{ByteSink, ByteSource, FileErr};
use std::{
cmp::Ordering,
collections::VecDeque,
future::{ready, Ready},
future::{Ready, ready},
};

pub trait Appendable: Default {
Expand Down
2 changes: 1 addition & 1 deletion sea-streamer-file/src/consumer/future.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Basically copied from sea-streamer-redis
use super::{FileConsumer, NextFuture};
use crate::FileResult;
use sea_streamer_types::{export::futures::Stream, Consumer, SharedMessage};
use sea_streamer_types::{Consumer, SharedMessage, export::futures::Stream};
use std::{fmt::Debug, future::Future};

pub struct StreamFuture<'a> {
Expand Down
10 changes: 5 additions & 5 deletions sea-streamer-file/src/consumer/group.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use flume::{bounded, unbounded, Receiver, Sender};
use sea_streamer_runtime::{spawn_task, AsyncMutex};
use flume::{Receiver, Sender, bounded, unbounded};
use sea_streamer_runtime::{AsyncMutex, spawn_task};
use std::{
collections::HashMap,
ops::Deref,
Expand All @@ -8,12 +8,12 @@ use std::{

use super::{CtrlMsg, FileConsumer};
use crate::{
is_end_of_stream, is_internal, is_pulse, is_wildcard, pulse_message, ConfigErr, FileErr,
FileId, MessageSource, StreamMode,
ConfigErr, FileErr, FileId, MessageSource, StreamMode, is_end_of_stream, is_internal, is_pulse,
is_wildcard, pulse_message,
};
use sea_streamer_types::{
export::futures::{select, FutureExt},
ConsumerGroup, Message, ShardId, SharedMessage, StreamKey,
export::futures::{FutureExt, select},
};

lazy_static::lazy_static! {
Expand Down
8 changes: 4 additions & 4 deletions sea-streamer-file/src/consumer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@ mod group;

pub use future::StreamFuture as FileMessageStream;

use flume::{r#async::RecvFut, Receiver, Sender, TrySendError};
use flume::{Receiver, Sender, TrySendError, r#async::RecvFut};
use sea_streamer_types::{
export::futures::{Future, FutureExt},
Consumer as ConsumerTrait, SeqPos, ShardId, SharedMessage, StreamErr, StreamKey, Timestamp,
export::futures::{Future, FutureExt},
};

use crate::{is_pulse, FileErr, FileId, FileResult, SeekTarget};
pub(crate) use group::new_consumer;
use crate::{FileErr, FileId, FileResult, SeekTarget, is_pulse};
use group::Sid;
pub(crate) use group::new_consumer;

pub use self::group::query_streamer;
use self::group::{preseek_consumer, remove_consumer};
Expand Down
2 changes: 1 addition & 1 deletion sea-streamer-file/src/dyn_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::{
AsyncFile, ByteSource, Bytes, FileErr, FileId, FileReader, FileReaderFuture, FileSource,
FileSourceFuture, ReadFrom,
};
use sea_streamer_types::{export::futures::FutureExt, SeqPos};
use sea_streamer_types::{SeqPos, export::futures::FutureExt};

/// A runtime adapter of `FileReader` and `FileSource`,
/// also able to switch between the two mode of operations dynamically.
Expand Down
2 changes: 1 addition & 1 deletion sea-streamer-file/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{format::FormatErr, ConfigErr};
use crate::{ConfigErr, format::FormatErr};
use sea_streamer_types::{StreamErr, StreamResult};
use std::str::Utf8Error;
use thiserror::Error;
Expand Down
2 changes: 1 addition & 1 deletion sea-streamer-file/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use sea_streamer_runtime::file::{
AsyncReadExt, AsyncSeekExt, AsyncWriteExt, File, OpenOptions, SeekFrom,
};
use sea_streamer_types::{
export::futures::{future::BoxFuture, FutureExt},
SeqPos, StreamUrlErr, StreamerUri,
export::futures::{FutureExt, future::BoxFuture},
};

pub(crate) const BUFFER_SIZE: usize = 10240;
Expand Down
2 changes: 1 addition & 1 deletion sea-streamer-file/src/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@
//! with the stream key `SEA_STREAMER_INTERNAL` and payload `EOS`.

use crate::{
crc::{crc16_cdma2000, crc_update},
ByteSink, ByteSource, Bytes, FileErr,
crc::{crc_update, crc16_cdma2000},
};
use sea_streamer_types::{
Buffer, Message as MessageTrait, OwnedMessage, SeqNo, ShardId, StreamKey, StreamKeyErr,
Expand Down
22 changes: 9 additions & 13 deletions sea-streamer-file/src/messages.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
use std::{cmp::Ordering, collections::BTreeMap, num::NonZeroU32, path::Path};

use sea_streamer_types::{
export::futures::{future::BoxFuture, FutureExt},
Buffer, Message as MessageTrait, MessageHeader, OwnedMessage, SeqNo, SeqPos, ShardId,
SharedMessage, StreamKey, Timestamp, SEA_STREAMER_INTERNAL,
Buffer, Message as MessageTrait, MessageHeader, OwnedMessage, SEA_STREAMER_INTERNAL, SeqNo,
SeqPos, ShardId, SharedMessage, StreamKey, Timestamp,
export::futures::{FutureExt, future::BoxFuture},
};

use crate::{
format::{Beacon, Checksum, FormatErr, Header, Marker, Message, RunningChecksum},
AsyncFile, BeaconReader, ByteBuffer, ByteSource, Bytes, DynFileSource, FileErr, FileId,
FileReader, FileSink, FileSourceType, SeekErr, StreamMode, SurveyResult, Surveyor,
SEA_STREAMER_WILDCARD,
FileReader, FileSink, FileSourceType, SEA_STREAMER_WILDCARD, SeekErr, StreamMode, SurveyResult,
Surveyor,
format::{Beacon, Checksum, FormatErr, Header, Marker, Message, RunningChecksum},
};

pub const END_OF_STREAM: &str = "EOS";
Expand Down Expand Up @@ -133,11 +133,7 @@ impl MessageSource {
SeqPos::At(nth) => {
#[allow(clippy::unnecessary_cast)]
let at = nth as u64 * self.beacon_interval();
if at < self.known_size() {
at
} else {
max
}
if at < self.known_size() { at } else { max }
}
};
if self.offset != pos {
Expand Down Expand Up @@ -234,7 +230,7 @@ impl MessageSource {
break 'outer match e {
FileErr::NotEnoughBytes => Err(FileErr::SeekErr(SeekErr::OutOfBound)),
e => Err(e),
}
};
}
};
// read until we found what we want
Expand All @@ -245,7 +241,7 @@ impl MessageSource {
break 'outer match e {
FileErr::NotEnoughBytes => Err(FileErr::SeekErr(SeekErr::OutOfBound)),
e => Err(e),
}
};
}
};
if let SurveyResult::Right = compare(&to, mess.message.header()) {
Expand Down
6 changes: 3 additions & 3 deletions sea-streamer-file/src/producer/backend.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use flume::{unbounded, Receiver, Sender};
use sea_streamer_runtime::{spawn_task, AsyncMutex, TaskHandle};
use flume::{Receiver, Sender, unbounded};
use sea_streamer_runtime::{AsyncMutex, TaskHandle, spawn_task};
use std::{collections::HashMap, num::NonZeroU32};

use super::{Request, RequestTo};
use crate::{
format::{Checksum, Header, RunningChecksum},
BeaconReader, BeaconState, ByteBuffer, DynFileSource, FileConnectOptions, FileErr, FileId,
FileProducer, FileProducerOptions, FileReader, FileSink, MessageSink, MessageSource,
StreamMode,
format::{Checksum, Header, RunningChecksum},
};
use sea_streamer_types::{
Message, MessageHeader, OwnedMessage, SeqNo, SeqPos, ShardId, StreamKey, Timestamp,
Expand Down
6 changes: 3 additions & 3 deletions sea-streamer-file/src/producer/mod.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
mod backend;

use flume::{r#async::RecvFut, unbounded, Sender};
use flume::{Sender, r#async::RecvFut, unbounded};
use std::{fmt::Debug, future::Future};

use crate::{Bytes, FileErr, FileId, FileResult};
use sea_streamer_types::{
export::futures::FutureExt, Buffer, MessageHeader, Producer as ProducerTrait, ShardId,
StreamErr, StreamKey, StreamResult, Timestamp,
Buffer, MessageHeader, Producer as ProducerTrait, ShardId, StreamErr, StreamKey, StreamResult,
Timestamp, export::futures::FutureExt,
};

pub(crate) use backend::{end_producer, new_producer};
Expand Down
Loading