diff --git a/Cargo.lock b/Cargo.lock index 77149371..9b483a56 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,18 @@ # It is not intended for manual editing. version = 4 +[[package]] +name = "ahash" +version = "0.8.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a15f179cd60c4584b8a8c596927aadc462e27f2ca70c04e0071964a73ba7a75" +dependencies = [ + "cfg-if", + "once_cell", + "version_check", + "zerocopy", +] + [[package]] name = "aho-corasick" version = "1.1.3" @@ -560,10 +572,11 @@ dependencies = [ "env_logger", "glob", "goblin", - "hashbrown", + "hashbrown 0.15.2", "indoc", "insta", "itoa", + "lasso", "log", "predicates", "pretty_assertions", @@ -642,6 +655,16 @@ dependencies = [ "crunchy", ] +[[package]] +name = "hashbrown" +version = "0.14.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" +dependencies = [ + "ahash", + "allocator-api2", +] + [[package]] name = "hashbrown" version = "0.15.2" @@ -695,7 +718,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8c9c992b02b5b4c94ea26e32fe5bccb7aa7d9f390ab5c1221ff895bc7ea8b652" dependencies = [ "equivalent", - "hashbrown", + "hashbrown 0.15.2", ] [[package]] @@ -760,6 +783,16 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "lasso" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e14eda50a3494b3bf7b9ce51c52434a761e383d7238ce1dd5dcec2fbc13e9fb" +dependencies = [ + "ahash", + "hashbrown 0.14.5", +] + [[package]] name = "libc" version = "0.2.169" @@ -991,9 +1024,9 @@ dependencies = [ [[package]] name = "quote" -version = "1.0.38" +version = "1.0.42" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0e4dccaaaf89514f546c693ddc140f729f958c247918a13380cccc6078391acc" +checksum = "a338cc41d27e6cc6dce6cefc13a0729dfbb81c262b1f519331575dd80ef3067f" dependencies = [ "proc-macro2", ] @@ -1629,6 +1662,26 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cfe53a6657fd280eaa890a3bc59152892ffa3e30101319d168b781ed6529b049" +[[package]] +name = "zerocopy" +version = "0.8.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fd74ec98b9250adb3ca554bdde269adf631549f51d8a8f8f0a10b50f1cb298c3" +dependencies = [ + "zerocopy-derive", +] + +[[package]] +name = "zerocopy-derive" +version = "0.8.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8a8d209fdf45cf5138cbb5a506f6b52522a25afccc534d1475dad8e31105c6a" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.98", +] + [[package]] name = "zeroize" version = "1.8.1" diff --git a/Cargo.toml b/Cargo.toml index 122ba4c7..3204d83c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,6 +43,7 @@ serde_json = { version = "1", features = ["preserve_order"]} bumpalo = { version = "3", features = ["collections"] } itoa = "1" ryu = "1" +lasso = { version = "0.7.3", features = ["ahasher", "inline-more"] } [target.'cfg(not(windows))'.dependencies] # jemalloc is significantly more peformant than the system allocator. diff --git a/PERF.md b/PERF.md index 4fdfe455..8b8c90bf 100644 --- a/PERF.md +++ b/PERF.md @@ -403,6 +403,89 @@ Template (copy/paste): - **Where**: `src/json_stream_output.rs` (FileTime/SysTime serialization). - **Impact (omer-pc, `-t 1`)**: reverting to chrono formatting regresses **+3.31%** median (605.5 ms → 625.6 ms). +### H1 (partial) — Reuse scratch buffer + reduce key/value churn in streaming JSONL output +- **What changed**: + - `evtx_dump` (`-o jsonl`, `--json-parser streaming`, `-t 1`) now reuses a single `JsonStreamOutput>` across records and + writes it directly to the output stream (avoids per-record `Vec` + `String` allocation in `EvtxRecord::into_json_stream()`). + - `JsonStreamOutput` reduces per-record heap churn by: + - interning element keys (`Arc`) instead of allocating `String` per element, + - using an inline “one value” buffer for `buffered_values` / aggregated `Data` values (avoids many small `Vec` allocations), + - recycling per-object duplicate-key tracking frames (reuses `HashSet` allocations across records). +- **Benchmarks (omer-pc, quiet-gated, W1)**: + - **before**: median **607.0 ms** + - **after**: median **572.4 ms** + - **speedup**: **1.061×** (≈ **5.7%** lower median) + - **Command (omer-pc)**: + +```bash +BASE=/tmp/evtx-h1-bench +SAMPLE=$BASE/before/samples/security_big_sample.evtx + +QUIET_IDLE_MIN=95 QUIET_LOAD1_MAX=8 $BASE/after/scripts/ensure_quiet.sh +hyperfine --warmup 3 --runs 25 \ + --export-json $BASE/h1-before-vs-after.hyperfine.json \ + "$BASE/before/target/release/evtx_dump -t 1 -o jsonl $SAMPLE > /dev/null" \ + "$BASE/after/target/release/evtx_dump -t 1 -o jsonl $SAMPLE > /dev/null" +``` + + - **Artifact**: `target/perf/h1-before-vs-after.hyperfine.json` (copied from `omer-pc:/tmp/evtx-h1-bench/h1-before-vs-after.hyperfine.json`) + +- **Profile delta (macOS, samply, W1, 200 iterations)**: + - `_platform_memmove`: **7.38% → 4.33%** leaf + - `alloc::raw_vec::RawVecInner::finish_grow`: **1.62% → 0.88%** leaf + - `alloc::raw_vec::RawVec::grow_one`: **0.71% → 0.44%** leaf + - `_rjem_malloc`: **3.15% → 1.09%** leaf + - `_rjem_sdallocx.cold.1`: **3.77% → 1.75%** leaf + - **Artifacts**: + - `target/perf/samply/h1_before.profile.json.gz` + `target/perf/samply/h1_before.profile.json.syms.json` + - `target/perf/samply/h1_after.profile.json.gz` + `target/perf/samply/h1_after.profile.json.syms.json` +- **Correctness check**: `cargo test --features fast-alloc --locked` +- **Notes**: This was a partial step; the follow-up “Zig-style duplicate-key tracking” below removes hash/memcmp hotspots and + crosses the original H1 ≥8% target on `omer-pc`. + +### H1 (finish) — Zig-style duplicate-key tracking (fixed table + interned-key IDs) +- **What changed**: + - Replaced per-object `HashSet` duplicate-key tracking with a Zig-style fixed table (`MAX_UNIQUE_NAMES = 64`) + per-base suffix counters + in `JsonStreamOutput` (`UniqueKeyTable`). + - Duplicate-key membership checks are against interned key IDs (no per-key hashing on the hot path); suffixed keys (`_1`, `_2`, …) + are only allocated on collision. + - Switched the streaming key interner to `lasso::Rodeo` (enabled `ahasher` + `inline-more`) to reduce interning hashing overhead. +- **Benchmarks (omer-pc, quiet-gated, W1)**: + - **before**: median **609.1 ms** + - **after**: median **526.3 ms** + - **speedup**: **1.157×** (≈ **13.6%** lower median) + - **Command (omer-pc)**: + +```bash +BASE=/tmp/evtx-h1-bench +SAMPLE=$BASE/before/samples/security_big_sample.evtx + +QUIET_IDLE_MIN=95 QUIET_LOAD1_MAX=8 $BASE/after/scripts/ensure_quiet.sh +hyperfine --warmup 3 --runs 25 \ + --export-json $BASE/h1-lasso-ahash-before-vs-after.hyperfine.json \ + "$BASE/before/target/release/evtx_dump -t 1 -o jsonl $SAMPLE > /dev/null" \ + "$BASE/after/target/release/evtx_dump -t 1 -o jsonl $SAMPLE > /dev/null" +``` + + - **Artifact**: `target/perf/h1-lasso-ahash-before-vs-after.hyperfine.json` (copied from `omer-pc:/tmp/evtx-h1-bench/h1-lasso-ahash-before-vs-after.hyperfine.json`) + +- **Profile delta (macOS, samply, W1, 200 iterations)**: + - **Key-tracking hot path (after1 → after2)**: + - `hashbrown::map::HashMap::get_inner`: **3.20% → 0.00%** leaf + - `hashbrown::map::HashMap::insert`: **1.83% → 0.00%** leaf + - `_platform_memcmp`: **2.99% → 2.43%** leaf + - `evtx::json_stream_output::UniqueKeyTable::reserve_unique_index`: **0.00% → 2.17%** leaf (replacement cost) + - **Key interning (after3 → after4)**: + - ` as core::hash::Hasher>::write`: **7.32% → 2.01%** leaf (enabling `lasso` `ahasher`) + - **Final vs baseline (before → after4)**: + - `_platform_memmove`: **7.38% → 4.80%** leaf + - `_rjem_malloc`: **3.15% → 1.23%** leaf + - `alloc::raw_vec::RawVecInner::finish_grow`: **1.62% → 0.96%** leaf + - **Artifacts**: + - `target/perf/samply/h1_after2.profile.json.gz` + `target/perf/samply/h1_after2.profile.json.syms.json` + - `target/perf/samply/h1_after3.profile.json.gz` + `target/perf/samply/h1_after3.profile.json.syms.json` + - `target/perf/samply/h1_after4.profile.json.gz` + `target/perf/samply/h1_after4.profile.json.syms.json` + --- ## Rejected theses diff --git a/src/bin/evtx_dump.rs b/src/bin/evtx_dump.rs index 8044c94a..96b5c3b8 100644 --- a/src/bin/evtx_dump.rs +++ b/src/bin/evtx_dump.rs @@ -8,7 +8,7 @@ use indoc::indoc; use encoding::all::encodings; use encoding::types::Encoding; use evtx::err::Result as EvtxResult; -use evtx::{EvtxParser, ParserSettings, SerializedEvtxRecord}; +use evtx::{EvtxParser, JsonStreamOutput, ParserSettings, SerializedEvtxRecord}; use log::Level; use std::fs::{self, File}; use std::io::{self, BufWriter, Seek, SeekFrom, Write}; @@ -247,14 +247,27 @@ impl EvtxDump { self.dump_record(record)? } } else { - for record in parser.records_json_stream() { - self.dump_record(record)? + // Fast path for the canonical perf workload (`-t 1`): reuse a single + // `JsonStreamOutput>` buffer across records to avoid per-record + // Vec allocations + buffer growth churn. + if *self.parser_settings.get_num_threads() == 1 { + self.dump_json_streaming_single_thread(&mut parser)?; + } else { + for record in parser.records_json_stream() { + self.dump_record(record)? + } } } #[cfg(not(feature = "wevt_templates"))] - for record in parser.records_json_stream() { - self.dump_record(record)? + { + if *self.parser_settings.get_num_threads() == 1 { + self.dump_json_streaming_single_thread(&mut parser)?; + } else { + for record in parser.records_json_stream() { + self.dump_record(record)? + } + } } } JsonParserKind::Legacy => { @@ -286,6 +299,84 @@ impl EvtxDump { Ok(()) } + fn dump_json_streaming_single_thread(&mut self, parser: &mut EvtxParser) -> Result<()> { + let settings = std::sync::Arc::new(self.parser_settings.clone()); + + // Keep and reuse the JSON output buffer across records. + let mut scratch = JsonStreamOutput::with_writer( + Vec::::with_capacity(16 * 1024), + &self.parser_settings, + ); + + for chunk_res in parser.chunks() { + let mut chunk_data = match chunk_res { + Ok(c) => c, + Err(e) => { + eprintln!("{:?}", format_err!(e)); + if self.stop_after_error { + std::process::exit(1); + } + continue; + } + }; + + let mut chunk = match chunk_data.parse(std::sync::Arc::clone(&settings)) { + Ok(c) => c, + Err(e) => { + eprintln!("{:?}", format_err!(e)); + if self.stop_after_error { + std::process::exit(1); + } + continue; + } + }; + + for record_res in chunk.iter() { + let record = match record_res { + Ok(r) => r, + Err(e) => { + eprintln!("{:?}", format_err!(e)); + if self.stop_after_error { + std::process::exit(1); + } + continue; + } + }; + + let range_filter = if let Some(ranges) = &self.ranges { + ranges.contains(&(record.event_record_id as usize)) + } else { + true + }; + + if !range_filter { + continue; + } + + if self.show_record_number { + writeln!(self.output, "Record {}", record.event_record_id)?; + } + + let capacity_hint = record.tokens.len().saturating_mul(64); + scratch.clear_buffer(); + scratch.reserve_buffer(capacity_hint); + + if let Err(e) = record.write_json_stream(&mut scratch) { + eprintln!("{:?}", format_err!(e)); + if self.stop_after_error { + std::process::exit(1); + } + continue; + } + + self.output.write_all(scratch.buffer())?; + self.output.write_all(b"\n")?; + } + } + + Ok(()) + } + fn open_parser(&self) -> Result> { if Self::is_stdin_input(&self.input) { let mut tmp = diff --git a/src/evtx_record.rs b/src/evtx_record.rs index aef3fd1a..c4fb880a 100644 --- a/src/evtx_record.rs +++ b/src/evtx_record.rs @@ -10,7 +10,7 @@ use crate::xml_output::{BinXmlOutput, XmlOutput}; use crate::{EvtxChunk, ParserSettings}; use chrono::prelude::*; -use std::io::Cursor; +use std::io::{Cursor, Write}; use std::sync::Arc; pub type RecordId = u64; @@ -162,6 +162,26 @@ impl EvtxRecord<'_> { }) } + /// Consumes the record and streams JSON into an existing `JsonStreamOutput`. + /// + /// This is useful for high-throughput JSONL emission where the caller wants to reuse + /// the output buffer across records (avoid per-record `Vec` allocations). + pub fn write_json_stream( + self, + output_builder: &mut crate::JsonStreamOutput, + ) -> Result<()> { + let event_record_id = self.event_record_id; + + parse_tokens_streaming(self.tokens, self.chunk, output_builder).map_err(|e| { + EvtxError::FailedToParseRecord { + record_id: event_record_id, + source: Box::new(e), + } + })?; + + Ok(()) + } + /// Consumes the record and parse it, producing an XML serialized record. pub fn into_xml(self) -> Result> { let mut output_builder = XmlOutput::with_writer(Vec::new(), &self.settings); diff --git a/src/json_stream_output.rs b/src/json_stream_output.rs index 7b6875c8..1a45d8e0 100644 --- a/src/json_stream_output.rs +++ b/src/json_stream_output.rs @@ -6,12 +6,20 @@ use crate::binxml::name::BinXmlName; use crate::binxml::value_variant::BinXmlValue; use crate::model::xml::{BinXmlPI, XmlElement}; use chrono::{Datelike, Timelike}; -use hashbrown::HashSet; +use lasso::{Rodeo, Spur}; use quick_xml::events::BytesText; use serde_json::Value as JsonValue; use std::borrow::Cow; use std::io::Write; +/// Zig-style fixed table size for duplicate-key tracking (see `PERF.md` H1). +/// +/// We keep this small so duplicate-key tracking stays in L1 and avoids hashing. +const MAX_UNIQUE_NAMES: usize = 64; + +#[derive(Debug, Copy, Clone, Eq, PartialEq)] +struct KeyId(Spur); + /// Represents how the current XML element is being rendered in JSON. #[derive(Debug, Copy, Clone, Eq, PartialEq)] enum ElementValueKind { @@ -28,7 +36,7 @@ enum ElementValueKind { #[derive(Debug)] struct ElementState { /// JSON key for this element in its parent object. - name: String, + name: KeyId, /// How this element's JSON value is currently represented. kind: ElementValueKind, /// Whether we've already emitted a `#text` field for this element (when `kind == Object`). @@ -36,10 +44,8 @@ struct ElementState { /// Whether we've emitted `_attributes` separately (when `separate_json_attributes == true`). /// If true and `kind == Pending` on close, we skip emitting `null` to match legacy behavior. has_separate_attributes: bool, - /// Buffered scalar values for elements without attributes. - /// We buffer instead of writing immediately to support concatenation of multiple character nodes. - /// Uses serde_json::Value to avoid lifetime issues with BinXmlValue. - buffered_values: Vec, + /// Buffered scalar values (inline fast-path avoids per-node Vec alloc). + buffered_values: BufferedValues, } /// JSON object context (either the root object or any nested object). @@ -48,7 +54,161 @@ struct ObjectFrame { /// Whether we've already written any field in this object. first_field: bool, /// Keys already used in this object (for duplicate key handling). - used_keys: HashSet, + used_keys: UniqueKeyTable, +} + +#[derive(Debug, Copy, Clone)] +struct NameCountEntry { + base: KeyId, + next_suffix: u32, +} + +/// Duplicate-key tracking without hashing: a small, linear-scanned table. +/// +/// This matches the Zig renderer’s spirit: track up to a small number of unique names per object, +/// use pointer identity fast paths, and only allocate suffix strings on collision. +#[derive(Debug)] +struct UniqueKeyTable { + /// All keys that have been emitted (including suffixed forms). + used: Vec, + /// Per-base counters to generate `base_1`, `base_2`, ... without rescanning from 1. + base_counts: Vec, +} + +impl UniqueKeyTable { + fn with_capacity(capacity: usize) -> Self { + let cap = capacity.max(1); + UniqueKeyTable { + used: Vec::with_capacity(cap), + base_counts: Vec::with_capacity(cap.min(MAX_UNIQUE_NAMES)), + } + } + + #[inline] + fn clear(&mut self) { + self.used.clear(); + self.base_counts.clear(); + } + + #[inline] + fn reserve(&mut self, additional: usize) { + self.used.reserve(additional); + self.base_counts.reserve(additional.min(MAX_UNIQUE_NAMES)); + } + + #[inline] + fn contains(&self, key: KeyId) -> bool { + // Manual loop tends to compile smaller/faster than iterator combinators here. + for &k in &self.used { + if k == key { + return true; + } + } + false + } + + #[inline] + fn base_entry_index(&self, base: KeyId) -> Option { + for (i, e) in self.base_counts.iter().enumerate() { + if e.base == base { + return Some(i); + } + } + None + } + + fn reserve_unique(&mut self, base: KeyId, interner: &mut KeyInterner) -> KeyId { + // Fast path: first time we see this base key in this object. + if !self.contains(base) { + self.used.push(base); + self.base_counts.push(NameCountEntry { + base, + next_suffix: 1, + }); + return base; + } + + // Duplicate base key: generate the next suffix, skipping any collisions with existing keys. + let entry_idx = self.base_entry_index(base); + let mut suffix = entry_idx + .map(|i| self.base_counts[i].next_suffix) + .unwrap_or(1); + + loop { + let candidate_str = { + let base_str = interner.resolve(base); + format!("{}_{}", base_str, suffix) + }; + let candidate = interner.intern(&candidate_str); + suffix = suffix.saturating_add(1); + + if !self.contains(candidate) { + self.used.push(candidate); + + if let Some(i) = entry_idx { + self.base_counts[i].next_suffix = suffix; + } else { + // Rare: base key was present but we didn't have a counter entry yet. + self.base_counts.push(NameCountEntry { + base, + next_suffix: suffix, + }); + } + + return candidate; + } + } + } +} + +/// Buffer of JSON scalar values with an inline "one value" fast-path. +/// +/// This avoids allocating a new `Vec` (and triggering `RawVec::grow_one`) for the common +/// case where an element has exactly one text node. +#[derive(Debug, Default)] +enum BufferedValues { + #[default] + Empty, + One(JsonValue), + Many(Vec), +} + +impl BufferedValues { + #[inline] + fn is_empty(&self) -> bool { + matches!(self, BufferedValues::Empty) + } + + #[inline] + fn push(&mut self, v: JsonValue) { + match self { + BufferedValues::Empty => { + *self = BufferedValues::One(v); + } + BufferedValues::One(prev) => { + let prev = std::mem::replace(prev, JsonValue::Null); + *self = BufferedValues::Many(vec![prev, v]); + } + BufferedValues::Many(vec) => vec.push(v), + } + } +} + +#[derive(Debug, Default)] +struct KeyInterner { + rodeo: Rodeo, +} + +impl KeyInterner { + #[inline] + fn intern(&mut self, s: &str) -> KeyId { + KeyId(self.rodeo.get_or_intern(s)) + } + + #[inline] + fn resolve(&self, id: KeyId) -> &str { + self.rodeo.resolve(&id.0) + } } pub struct JsonStreamOutput { @@ -63,6 +223,11 @@ pub struct JsonStreamOutput { frames: Vec, /// Stack of currently open XML elements. elements: Vec, + /// Recycled object frames to reuse per-object key tracking allocations across records. + recycled_frames: Vec, + + /// Interned key strings to avoid per-record/per-key heap churn. + key_interner: KeyInterner, /// Optional depth (in `elements`) of an `EventData` element that owns a /// synthetic `"Data": { "#text": [...] }` aggregator, used to model @@ -70,7 +235,7 @@ pub struct JsonStreamOutput { /// intermediate tree. data_owner_depth: Option, /// Collected values for the aggregated `"Data": { "#text": [...] }` array. - data_values: Vec, + data_values: BufferedValues, /// Whether we are currently inside a `` element that contributes to /// the aggregated `"Data"` array. data_inside_element: bool, @@ -84,8 +249,10 @@ impl JsonStreamOutput { separate_json_attributes: settings.should_separate_json_attributes(), frames: Vec::new(), elements: Vec::new(), + recycled_frames: Vec::new(), + key_interner: KeyInterner::default(), data_owner_depth: None, - data_values: Vec::new(), + data_values: BufferedValues::default(), data_inside_element: false, } } @@ -414,6 +581,30 @@ impl JsonStreamOutput { .expect("no current JSON object frame available") } + #[inline] + fn push_object_frame(&mut self, used_keys_capacity: usize) { + let mut frame = self.recycled_frames.pop().unwrap_or_else(|| ObjectFrame { + first_field: true, + used_keys: UniqueKeyTable::with_capacity(used_keys_capacity), + }); + + frame.first_field = true; + frame.used_keys.clear(); + frame.used_keys.reserve(used_keys_capacity); + self.frames.push(frame); + } + + #[inline] + fn pop_object_frame(&mut self) { + let mut frame = self + .frames + .pop() + .expect("attempted to pop JSON frame when none exist"); + frame.first_field = true; + frame.used_keys.clear(); + self.recycled_frames.push(frame); + } + /// Write a comma if needed for the current JSON object. fn write_comma_if_needed(&mut self) -> SerializationResult<()> { let frame = self.current_frame_mut(); @@ -425,59 +616,64 @@ impl JsonStreamOutput { } } - /// Reserve a unique key in the current frame without writing it. - /// Returns the unique key that will be used (with `_1`, `_2` suffix if needed). - fn reserve_unique_key(&mut self, key: &str) -> String { + /// Reserve a unique key in the current frame from an already-interned key. + /// This avoids hashing the same key again on hot paths. + #[inline] + fn reserve_unique_key(&mut self, key: KeyId) -> KeyId { let frame = self .frames .last_mut() .expect("no current JSON object frame"); - if frame.used_keys.contains(key) { - // Find next available suffix - let mut suffix = 1; - loop { - let candidate = format!("{}_{}", key, suffix); - if !frame.used_keys.contains(&candidate) { - frame.used_keys.insert(candidate.clone()); - return candidate; - } - suffix += 1; - } - } else { - frame.used_keys.insert(key.to_owned()); - key.to_owned() - } + frame.used_keys.reserve_unique(key, &mut self.key_interner) } /// Write a JSON string key (with surrounding quotes and escaping). /// Write a JSON string key, handling duplicates by appending `_1`, `_2`, etc. fn write_key(&mut self, key: &str) -> SerializationResult<()> { + let key = self.key_interner.intern(key); + self.write_key_id(key) + } + + #[inline] + fn write_key_id(&mut self, key: KeyId) -> SerializationResult<()> { self.write_comma_if_needed()?; - // Fast path: avoid allocating a second String for the common case. - // Reserve the key in the set, but write from `&str` directly. - let frame = self - .frames - .last_mut() - .expect("no current JSON object frame"); + let unique_key = { + let frame = self + .frames + .last_mut() + .expect("no current JSON object frame"); + frame.used_keys.reserve_unique(key, &mut self.key_interner) + }; - if frame.used_keys.insert(key.to_owned()) { - // Keys derived from XML NCName don't need escaping - self.write_json_string_ncname(key)?; - } else { - // Find next available suffix. - let mut suffix = 1; - loop { - let candidate = format!("{}_{}", key, suffix); - if !frame.used_keys.contains(&candidate) { - frame.used_keys.insert(candidate.clone()); - self.write_json_string_ncname(&candidate)?; - break; - } - suffix += 1; - } - } - self.write_bytes(b":") + // Keys derived from XML NCName don't need escaping. + let key_str = self.key_interner.resolve(unique_key); + let writer = self + .writer + .as_mut() + .expect("JsonStreamOutput writer missing"); + writer.write_all(b"\"").map_err(SerializationError::from)?; + writer + .write_all(key_str.as_bytes()) + .map_err(SerializationError::from)?; + writer.write_all(b"\":").map_err(SerializationError::from) + } + + #[inline] + fn write_reserved_key_id(&mut self, key: KeyId) -> SerializationResult<()> { + self.write_comma_if_needed()?; + + // Keys derived from XML NCName don't need escaping. + let key_str = self.key_interner.resolve(key); + let writer = self + .writer + .as_mut() + .expect("JsonStreamOutput writer missing"); + writer.write_all(b"\"").map_err(SerializationError::from)?; + writer + .write_all(key_str.as_bytes()) + .map_err(SerializationError::from)?; + writer.write_all(b"\":").map_err(SerializationError::from) } /// Write a pre-reserved key directly (no duplicate checking needed). @@ -490,20 +686,23 @@ impl JsonStreamOutput { /// Start a new nested JSON object as the value of `key` in the current object. fn start_object_value(&mut self, key: &str) -> SerializationResult<()> { - self.write_key(key)?; + let key = self.key_interner.intern(key); + self.start_object_value_id(key) + } + + #[inline] + fn start_object_value_id(&mut self, key: KeyId) -> SerializationResult<()> { + self.write_key_id(key)?; self.write_bytes(b"{")?; - self.frames.push(ObjectFrame { - first_field: true, - // Heuristic: nested objects tend to have a moderate number of keys. - used_keys: HashSet::with_capacity(32), - }); + // Heuristic: nested objects tend to have a moderate number of keys. + self.push_object_frame(32); Ok(()) } /// End the current JSON object frame. fn end_object(&mut self) -> SerializationResult<()> { self.write_bytes(b"}")?; - self.frames.pop(); + self.pop_object_frame(); Ok(()) } @@ -520,52 +719,50 @@ impl JsonStreamOutput { ElementValueKind::Pending => { // Turn `"parent": null` into `"parent": { ... }` by starting an // object value for it now. - let key = self.elements[parent_index].name.clone(); + let key = self.elements[parent_index].name; let was_reserved = self.elements[parent_index].has_separate_attributes; // If the key was pre-reserved (separate_json_attributes mode), use // write_reserved_key to avoid double-reservation. if was_reserved { - self.write_reserved_key(&key)?; + self.write_reserved_key_id(key)?; } else { - self.write_key(&key)?; + self.write_key_id(key)?; } self.write_bytes(b"{")?; - self.frames.push(ObjectFrame { - first_field: true, - used_keys: HashSet::with_capacity(32), - }); + self.push_object_frame(32); self.elements[parent_index].kind = ElementValueKind::Object; } ElementValueKind::Scalar => { // Element had text content but now has child elements too. // Turn it into an object and move buffered text to #text field. - let key = self.elements[parent_index].name.clone(); + let key = self.elements[parent_index].name; let was_reserved = self.elements[parent_index].has_separate_attributes; let buffered = std::mem::take(&mut self.elements[parent_index].buffered_values); if was_reserved { - self.write_reserved_key(&key)?; + self.write_reserved_key_id(key)?; } else { - self.write_key(&key)?; + self.write_key_id(key)?; } self.write_bytes(b"{")?; - self.frames.push(ObjectFrame { - first_field: true, - used_keys: HashSet::with_capacity(32), - }); + self.push_object_frame(32); // Write the buffered text as #text if not in separate mode // (in separate mode, text in mixed-content elements is dropped). if !buffered.is_empty() && !self.separate_json_attributes { self.write_key("#text")?; - if buffered.len() == 1 { - serde_json::to_writer(self.writer_mut(), &buffered[0]) - .map_err(SerializationError::from)?; - } else { - serde_json::to_writer(self.writer_mut(), &buffered) - .map_err(SerializationError::from)?; + match buffered { + BufferedValues::Empty => {} + BufferedValues::One(v) => { + serde_json::to_writer(self.writer_mut(), &v) + .map_err(SerializationError::from)?; + } + BufferedValues::Many(vs) => { + serde_json::to_writer(self.writer_mut(), &vs) + .map_err(SerializationError::from)?; + } } } @@ -605,45 +802,59 @@ impl JsonStreamOutput { // In separate_json_attributes mode, output directly without wrapper. // Legacy concatenates multiple string values into one. self.write_key("Data")?; - if values.len() == 1 { - serde_json::to_writer(self.writer_mut(), &values[0]) - .map_err(SerializationError::from)?; - } else { - // Concatenate multiple values as strings (legacy behavior). - let mut concat = String::new(); - for v in &values { - match v { - JsonValue::String(s) => concat.push_str(s), - JsonValue::Number(n) => concat.push_str(&n.to_string()), - JsonValue::Bool(b) => { - concat.push_str(if *b { "true" } else { "false" }) + match values { + BufferedValues::Empty => { + // Nothing to write (shouldn't happen given the outer check). + self.write_bytes(b"null")?; + } + BufferedValues::One(v) => { + serde_json::to_writer(self.writer_mut(), &v) + .map_err(SerializationError::from)?; + } + BufferedValues::Many(vs) => { + // Concatenate multiple values as strings (legacy behavior). + let mut concat = String::new(); + for v in &vs { + match v { + JsonValue::String(s) => concat.push_str(s), + JsonValue::Number(n) => concat.push_str(&n.to_string()), + JsonValue::Bool(b) => { + concat.push_str(if *b { "true" } else { "false" }) + } + JsonValue::Null => {} + _ => concat.push_str(&v.to_string()), } - JsonValue::Null => {} - _ => concat.push_str(&v.to_string()), } + + // Avoid `serde_json` for emitting the final JSON string. + self.write_json_string_escaped(&concat)?; } - serde_json::to_writer(self.writer_mut(), &concat) - .map_err(SerializationError::from)?; } } else { // With `#attributes` mode, wrap in `"Data": { "#text": ... }`. self.start_object_value("Data")?; self.write_key("#text")?; - if values.len() == 1 { - serde_json::to_writer(self.writer_mut(), &values[0]) - .map_err(SerializationError::from)?; - } else { - // Multiple `` children: aggregate into an array. - self.write_bytes(b"[")?; - for (idx, json_value) in values.into_iter().enumerate() { - if idx > 0 { - self.write_bytes(b",")?; - } - serde_json::to_writer(self.writer_mut(), &json_value) + match values { + BufferedValues::Empty => { + self.write_bytes(b"null")?; + } + BufferedValues::One(v) => { + serde_json::to_writer(self.writer_mut(), &v) .map_err(SerializationError::from)?; } - self.write_bytes(b"]")?; + BufferedValues::Many(vs) => { + // Multiple `` children: aggregate into an array. + self.write_bytes(b"[")?; + for (idx, json_value) in vs.iter().enumerate() { + if idx > 0 { + self.write_bytes(b",")?; + } + serde_json::to_writer(self.writer_mut(), json_value) + .map_err(SerializationError::from)?; + } + self.write_bytes(b"]")?; + } } self.end_object()?; @@ -694,15 +905,59 @@ impl JsonStreamOutput { } } +impl JsonStreamOutput> { + /// Get the currently written JSON bytes. + #[inline] + pub fn buffer(&self) -> &[u8] { + self.writer.as_deref().unwrap_or(&[]) + } + + /// Clear the underlying buffer while retaining its capacity. + #[inline] + pub fn clear_buffer(&mut self) { + if let Some(buf) = self.writer.as_mut() { + buf.clear(); + } + } + + /// Reserve additional capacity in the underlying buffer. + #[inline] + pub fn reserve_buffer(&mut self, additional: usize) { + if let Some(buf) = self.writer.as_mut() { + buf.reserve(additional); + } + } +} + impl BinXmlOutput for JsonStreamOutput { fn visit_start_of_stream(&mut self) -> SerializationResult<()> { + // Be defensive: if a previous record failed mid-stream, try to reset state + // so we can continue emitting subsequent records. + while !self.elements.is_empty() { + let elem = self.elements.pop().expect("checked non-empty"); + if elem.kind == ElementValueKind::Object { + // Close any dangling object frames. + if !self.frames.is_empty() { + self.pop_object_frame(); + } + } + } + while !self.frames.is_empty() { + self.pop_object_frame(); + } + + // Reset `......` aggregator state. + // If a previous record failed mid-stream while inside an unnamed `` element, + // these fields can retain stale routing state that would corrupt subsequent records + // when the same `JsonStreamOutput` is reused. + self.data_owner_depth = None; + self.data_values = BufferedValues::default(); + self.data_inside_element = false; + // Open the root JSON object. self.write_bytes(b"{")?; - self.frames.push(ObjectFrame { - first_field: true, - // Root objects can have many keys; pre-reserve to reduce rehashing. - used_keys: HashSet::with_capacity(128), - }); + // Root objects can have many keys; pre-reserve to reduce rehashing. + self.push_object_frame(128); Ok(()) } @@ -717,7 +972,9 @@ impl BinXmlOutput for JsonStreamOutput { // Close the root JSON object. if !self.frames.is_empty() { self.write_bytes(b"}")?; - self.frames.clear(); + while !self.frames.is_empty() { + self.pop_object_frame(); + } } Ok(()) @@ -744,9 +1001,10 @@ impl BinXmlOutput for JsonStreamOutput { }; let key = if let Some(name_attr) = data_name_attr { - name_attr.value.as_cow_str().into_owned() + let value: Cow<'_, str> = name_attr.value.as_cow_str(); + self.key_interner.intern(value.as_ref()) } else { - element_name.to_owned() + self.key_interner.intern(element_name) }; // Aggregated `......` case: @@ -755,7 +1013,7 @@ impl BinXmlOutput for JsonStreamOutput { if is_data && data_name_attr.is_none() && let Some(parent) = self.elements.last() - && parent.name == "EventData" + && self.key_interner.resolve(parent.name) == "EventData" { // Depth of the owning `EventData` element. let owner_depth = self.elements.len(); @@ -763,7 +1021,7 @@ impl BinXmlOutput for JsonStreamOutput { // Initialize a new aggregator for this `EventData`, if needed. if self.data_owner_depth != Some(owner_depth) { self.data_owner_depth = Some(owner_depth); - self.data_values.clear(); + self.data_values = BufferedValues::default(); } // We're now inside a `` element that contributes to @@ -792,7 +1050,7 @@ impl BinXmlOutput for JsonStreamOutput { // materialized as objects with a `#attributes` field. if has_json_attributes && !self.separate_json_attributes { // `"key": { "#attributes": { ... } }` - self.start_object_value(&key)?; + self.start_object_value_id(key)?; // Write `#attributes` object. { @@ -813,10 +1071,7 @@ impl BinXmlOutput for JsonStreamOutput { // Start attributes object. self.write_bytes(b"{")?; - self.frames.push(ObjectFrame { - first_field: true, - used_keys: HashSet::new(), - }); + self.push_object_frame(0); { for attr in &element.attributes { @@ -858,7 +1113,7 @@ impl BinXmlOutput for JsonStreamOutput { kind: ElementValueKind::Object, has_text: false, has_separate_attributes: false, - buffered_values: Vec::new(), + buffered_values: BufferedValues::default(), }); } else { // `separate_json_attributes == true` or element has no attributes. @@ -867,16 +1122,16 @@ impl BinXmlOutput for JsonStreamOutput { // If we're writing `_attributes`, pre-reserve the element key so both // the `_attributes` and the element itself use matching suffixes. let element_key = if wrote_separate_attrs { - let unique_key = self.reserve_unique_key(&key); + let unique_key = self.reserve_unique_key(key); // Emit `"_attributes": { ... }` into the parent object. - let attr_key = format!("{}_attributes", unique_key); + let attr_key = { + let s = self.key_interner.resolve(unique_key); + format!("{}_attributes", s) + }; self.write_reserved_key(&attr_key)?; self.write_bytes(b"{")?; - self.frames.push(ObjectFrame { - first_field: true, - used_keys: HashSet::new(), - }); + self.push_object_frame(0); { for attr in &element.attributes { @@ -918,7 +1173,7 @@ impl BinXmlOutput for JsonStreamOutput { kind: ElementValueKind::Pending, has_text: false, has_separate_attributes: wrote_separate_attrs, - buffered_values: Vec::new(), + buffered_values: BufferedValues::default(), }); } @@ -950,7 +1205,7 @@ impl BinXmlOutput for JsonStreamOutput { // No text and no children – render as `null`, unless we already // emitted `_attributes` separately (legacy omits the null). if !elem.has_separate_attributes { - self.write_key(&elem.name)?; + self.write_key_id(elem.name)?; self.write_bytes(b"null")?; } } @@ -959,31 +1214,36 @@ impl BinXmlOutput for JsonStreamOutput { if !elem.buffered_values.is_empty() { // If key was pre-reserved (separate_json_attributes mode), use reserved writer. if elem.has_separate_attributes { - self.write_reserved_key(&elem.name)?; + self.write_reserved_key_id(elem.name)?; } else { - self.write_key(&elem.name)?; + self.write_key_id(elem.name)?; } - if elem.buffered_values.len() == 1 { - // Single value: preserve original type. - serde_json::to_writer(self.writer_mut(), &elem.buffered_values[0]) - .map_err(SerializationError::from)?; - } else { - // Multiple values: concatenate as strings (legacy behavior). - let mut concat = String::new(); - for v in &elem.buffered_values { - // Convert JSON value back to string for concatenation - match v { - JsonValue::String(s) => concat.push_str(s), - JsonValue::Number(n) => concat.push_str(&n.to_string()), - JsonValue::Bool(b) => { - concat.push_str(if *b { "true" } else { "false" }) + match elem.buffered_values { + BufferedValues::Empty => {} + BufferedValues::One(v) => { + // Single value: preserve original type. + serde_json::to_writer(self.writer_mut(), &v) + .map_err(SerializationError::from)?; + } + BufferedValues::Many(vs) => { + // Multiple values: concatenate as strings (legacy behavior). + let mut concat = String::new(); + for v in &vs { + // Convert JSON value back to string for concatenation + match v { + JsonValue::String(s) => concat.push_str(s), + JsonValue::Number(n) => concat.push_str(&n.to_string()), + JsonValue::Bool(b) => { + concat.push_str(if *b { "true" } else { "false" }) + } + JsonValue::Null => concat.push_str("null"), + _ => concat.push_str(&v.to_string()), } - JsonValue::Null => concat.push_str("null"), - _ => concat.push_str(&v.to_string()), } + + // Avoid `serde_json` for emitting the final JSON string. + self.write_json_string_escaped(&concat)?; } - serde_json::to_writer(self.writer_mut(), &concat) - .map_err(SerializationError::from)?; } } } @@ -1006,14 +1266,18 @@ impl BinXmlOutput for JsonStreamOutput { // "#text" is a fixed ASCII key, no escaping needed self.write_bytes(b"\"#text\":")?; - if elem.buffered_values.len() == 1 { - // Single value: write directly. - serde_json::to_writer(self.writer_mut(), &elem.buffered_values[0]) - .map_err(SerializationError::from)?; - } else { - // Multiple values: write as array (legacy behavior). - serde_json::to_writer(self.writer_mut(), &elem.buffered_values) - .map_err(SerializationError::from)?; + match elem.buffered_values { + BufferedValues::Empty => {} + BufferedValues::One(v) => { + // Single value: write directly. + serde_json::to_writer(self.writer_mut(), &v) + .map_err(SerializationError::from)?; + } + BufferedValues::Many(vs) => { + // Multiple values: write as array (legacy behavior). + serde_json::to_writer(self.writer_mut(), &vs) + .map_err(SerializationError::from)?; + } } } // Close the element's object. @@ -1475,4 +1739,68 @@ mod tests { legacy_value, streaming_json ); } + + /// Regression test: when reusing `JsonStreamOutput` across records, a previous record that + /// failed mid-stream inside aggregated `...` must not + /// leak stale `data_*` routing state into the next record. + #[test] + fn test_reuse_after_error_resets_data_aggregator_state() { + let arena = Bump::new(); + let settings = ParserSettings::new().num_threads(1); + + let writer = Vec::new(); + let mut output = JsonStreamOutput::with_writer(writer, &settings); + + // "Record 1": enter unnamed `` aggregation and then "fail" before closing. + output.visit_start_of_stream().unwrap(); + + let event_data = XmlElement { + name: Cow::Owned(BinXmlName::from_str("EventData")), + attributes: vec![], + }; + let data = XmlElement { + name: Cow::Owned(BinXmlName::from_str("Data")), + attributes: vec![], + }; + + output.visit_open_start_element(&event_data).unwrap(); + output.visit_open_start_element(&data).unwrap(); + output + .visit_characters(Cow::Owned(BinXmlValue::StringType( + BumpString::from_str_in("stale", &arena), + ))) + .unwrap(); + + // Simulate caller behavior on error in the single-thread fast path: + // discard partial bytes and immediately start the next record with the same builder. + output.clear_buffer(); + + // "Record 2": plain element with text. If aggregator state wasn't reset, `visit_characters` + // would route this text into `data_values` and the element would render as `null`. + output.visit_start_of_stream().unwrap(); + + let message = XmlElement { + name: Cow::Owned(BinXmlName::from_str("Message")), + attributes: vec![], + }; + + output.visit_open_start_element(&message).unwrap(); + output + .visit_characters(Cow::Owned(BinXmlValue::StringType( + BumpString::from_str_in("hello", &arena), + ))) + .unwrap(); + output.visit_close_element(&message).unwrap(); + output.visit_end_of_stream().unwrap(); + + let bytes = output.finish().unwrap(); + let json = String::from_utf8(bytes).unwrap(); + let value: serde_json::Value = serde_json::from_str(&json).unwrap(); + + assert_eq!( + value["Message"], + serde_json::Value::String("hello".to_string()), + "Text must not be misrouted into the aggregated Data buffer when reusing JsonStreamOutput.\nJSON: {json}" + ); + } }