diff --git a/.cursor/commands/improvement_pass.md b/.cursor/commands/improvement_pass.md index 16342089..97a4af93 100644 --- a/.cursor/commands/improvement_pass.md +++ b/.cursor/commands/improvement_pass.md @@ -34,15 +34,15 @@ hyperfine -w 10 -r 20 \ | tee "benchmarks/benchmark_pre_${TAG}.txt" # Optional: PRE flamegraph for this pass's main scenario -sudo make flamegraph-prod \ +sudo TAG="$TAG" make flamegraph-prod \ FLAME_FILE="samples/security_big_sample.evtx" \ DURATION=30 \ FORMAT=json \ BIN="$PRE" -mv profile/flamegraph.svg "profile/flamegraph_${TAG}_${TS}_pre.svg" || true -cp profile/top_leaf.txt "profile/top_leaf_${TAG}_${TS}_pre.txt" || true -cp profile/top_titles.txt "profile/top_titles_${TAG}_${TS}_pre.txt" || true +mv "profile/flamegraph_${TAG}.svg" "profile/flamegraph_${TAG}_${TS}_pre.svg" || true +cp "profile/top_leaf_${TAG}.txt" "profile/top_leaf_${TAG}_${TS}_pre.txt" || true +cp "profile/top_titles_${TAG}.txt" "profile/top_titles_${TAG}_${TS}_pre.txt" || true ``` - **Use the PRE benchmark + flamegraph** to: @@ -104,12 +104,12 @@ hyperfine -w 10 -r 20 \ | tee "/workspace/benchmarks/benchmark_pair_${TAG}_${TS}.txt" # POST flamegraph for the same scenario -OUT_DIR=/workspace/profile_post FORMAT=jsonl DURATION=30 \ - /workspace/scripts/flamegraph_prod.sh "$POST" +OUT_DIR=/workspace/profile_post FORMAT=json DURATION=30 BIN="$POST" \ + /workspace/scripts/flamegraph_prod.sh -mv /workspace/profile/flamegraph.svg "/workspace/profile_post/flamegraph_${TAG}_${TS}_post.svg" || true -cp /workspace/profile/top_leaf.txt "/workspace/profile_post/top_leaf_${TAG}_${TS}_post.txt" || true -cp /workspace/profile/top_titles.txt "/workspace/profile_post/top_titles_${TAG}_${TS}_post.txt" || true +mv "/workspace/profile_post/flamegraph_${TAG}.svg" "/workspace/profile_post/flamegraph_${TAG}_${TS}_post.svg" || true +cp "/workspace/profile_post/top_leaf_${TAG}.txt" "/workspace/profile_post/top_leaf_${TAG}_${TS}_post.txt" || true +cp "/workspace/profile_post/top_titles_${TAG}.txt" "/workspace/profile_post/top_titles_${TAG}_${TS}_post.txt" || true echo "PRE: $PRE" echo "POST: $POST" diff --git a/.github/workflows/deploy-pages.yml b/.github/workflows/deploy-pages.yml index d4a3d006..60e3bc5b 100644 --- a/.github/workflows/deploy-pages.yml +++ b/.github/workflows/deploy-pages.yml @@ -27,11 +27,9 @@ jobs: bun-version: latest - name: Set up Rust toolchain (stable) with wasm target - uses: actions-rs/toolchain@v1 + uses: dtolnay/rust-toolchain@stable with: - toolchain: stable - target: wasm32-unknown-unknown - override: true + targets: wasm32-unknown-unknown - name: Install & cache wasm-pack uses: jetli/wasm-pack-action@v0.4.0 diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 925f20f3..19c001f6 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -10,13 +10,10 @@ jobs: matrix: os: [ubuntu-latest, windows-latest, macos-latest] steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v4 with: fetch-depth: 1 - - uses: actions-rs/toolchain@v1 - with: - toolchain: stable + - uses: dtolnay/rust-toolchain@stable - uses: Swatinem/rust-cache@v2 - - uses: actions-rs/cargo@v1 - with: - command: test + - name: Run tests + run: cargo test diff --git a/.gitignore b/.gitignore index bc809b3f..5cabcd17 100644 --- a/.gitignore +++ b/.gitignore @@ -13,3 +13,8 @@ repomix-output.txt evtx-wasm/evtx-viewer/public/pkg # Samples are being copied by build scripts before deploying **/public/samples/ + +profile/* +binaries/* +benchmarks/* +.PRE_PATH diff --git a/Makefile b/Makefile new file mode 100644 index 00000000..7a8270c4 --- /dev/null +++ b/Makefile @@ -0,0 +1,22 @@ +FLAME_FILE ?= samples/security_big_sample.evtx +FORMAT ?= json +DURATION ?= 30 +BIN ?= ./target/release/evtx_dump + +.PHONY: flamegraph-prod +flamegraph-prod: + @echo "Building release binary with fast allocator..." + cargo build --release --features fast-alloc + @echo "Cleaning up previous trace files..." + @rm -rf cargo-flamegraph.trace + BIN="$(BIN)" FLAME_FILE="$(FLAME_FILE)" FORMAT="$(FORMAT)" DURATION="$(DURATION)" \ + bash scripts/flamegraph_prod.sh + +.PHONY: compare-streaming-legacy +compare-streaming-legacy: + @echo "Building comparison tool with fast allocator..." + cargo build --release --features fast-alloc --bin compare_streaming_legacy + @echo "Running legacy vs streaming JSON comparison..." + ./target/release/compare_streaming_legacy $(FILE) + + diff --git a/scripts/flamegraph_prod.sh b/scripts/flamegraph_prod.sh new file mode 100644 index 00000000..acbaa3e9 --- /dev/null +++ b/scripts/flamegraph_prod.sh @@ -0,0 +1,188 @@ +#!/usr/bin/env bash +set -euo pipefail + +# Simple production-style flamegraph helper using perf + inferno (Linux) +# or cargo-flamegraph (macOS). +# Intended to be invoked via `make flamegraph-prod` with environment +# overrides, e.g.: +# FLAME_FILE=samples/security_big_sample.evtx \ +# FORMAT=json \ +# DURATION=30 \ +# BIN=./target/release/evtx_dump \ +# make flamegraph-prod +# +OS="$(uname -s || echo unknown)" + +ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)" + +# Optional label for this run (used in output filenames). +: "${TAG:=default}" + +: "${BIN:=$ROOT_DIR/target/release/evtx_dump}" +: "${FLAME_FILE:=$ROOT_DIR/samples/security_big_sample.evtx}" +: "${FORMAT:=json}" +: "${DURATION:=30}" +# For JSON formats, choose parser implementation: streaming | legacy. +: "${JSON_PARSER:=streaming}" +: "${OUT_DIR:=$ROOT_DIR/profile}" + +mkdir -p "$OUT_DIR" + +echo "Profiling" +echo " FLAME_FILE=$FLAME_FILE" +echo " FORMAT=$FORMAT" +echo " DURATION=${DURATION}s" +echo " OUT_DIR=$OUT_DIR" +echo " TAG=$TAG" + +# Map FORMAT to evtx_dump arguments. +case "$FORMAT" in + json|jsonl) + # Use streaming JSON path by default; caller can change via JSON_PARSER env. + FMT_ARGS=(-t 1 -o "$FORMAT" --json-parser "$JSON_PARSER") + ;; + xml) + FMT_ARGS=(-t 1 -o xml) + ;; + *) + echo "warning: unknown FORMAT='$FORMAT', defaulting to json" >&2 + FMT_ARGS=(-t 1 -o json --json-parser streaming) + ;; +esac + +if [[ "$OS" == "Darwin" ]]; then + # macOS path: use cargo-flamegraph (wraps dtrace + inferno). + if ! command -v cargo >/dev/null 2>&1; then + echo "error: cargo not found in PATH; required for cargo-flamegraph on macOS." >&2 + exit 1 + fi + + echo "Detected macOS; using cargo flamegraph (you may be prompted for sudo)." + + FOLDED_STACKS="$OUT_DIR/stacks_${TAG}.folded" + + # Ask cargo-flamegraph to tee the folded stacks into our own file. + (cd "$ROOT_DIR" && \ + cargo flamegraph \ + --root \ + --bin evtx_dump \ + --output "$OUT_DIR/flamegraph_${TAG}.svg" \ + --post-process "tee $FOLDED_STACKS" \ + -- "${FMT_ARGS[@]}" "$FLAME_FILE") + + if [[ -f "$FOLDED_STACKS" ]] && [[ -s "$FOLDED_STACKS" ]]; then + # Extract top leafs (leaf functions) from folded stacks + { + echo "Top leaf functions (by total samples):" + awk '{ + n = split($1, stack, ";"); + if (n > 0) { + leaf = stack[n]; + count = $2 + 0; + leafs[leaf] += count; + } + } + END { + for (f in leafs) { + printf "%d %s\n", leafs[f], f; + } + }' "$FOLDED_STACKS" | sort -nr | head -20 | awk '{printf " %s: %s\n", $2, $1}' + } > "$OUT_DIR/top_leaf_${TAG}.txt" + + # Extract top titles (root functions) from folded stacks + { + echo "Top title functions (by total samples):" + awk '{ + n = split($1, stack, ";"); + if (n > 0) { + title = stack[1]; + count = $2 + 0; + titles[title] += count; + } + } + END { + for (f in titles) { + printf "%d %s\n", titles[f], f; + } + }' "$FOLDED_STACKS" | sort -nr | head -20 | awk '{printf " %s: %s\n", $2, $1}' + } > "$OUT_DIR/top_titles_${TAG}.txt" + + echo "Top leafs written to $OUT_DIR/top_leaf_${TAG}.txt" + echo "Top titles written to $OUT_DIR/top_titles_${TAG}.txt" + else + echo "warning: folded stacks file is empty or missing, skipping text summaries" >&2 + fi + + echo "Flamegraph written to $OUT_DIR/flamegraph_${TAG}.svg" + exit 0 +fi + +# Linux / perf + inferno path. +# +# Requirements: +# - perf +# - inferno-collapse-perf +# - inferno-flamegraph + +if ! command -v perf >/dev/null 2>&1; then + echo "error: perf not found in PATH; flamegraph_prod.sh currently expects Linux + perf." >&2 + exit 1 +fi + +if ! command -v inferno-collapse-perf >/dev/null 2>&1; then + echo "error: inferno-collapse-perf not found in PATH." >&2 + exit 1 +fi + +if ! command -v inferno-flamegraph >/dev/null 2>&1; then + echo "error: inferno-flamegraph not found in PATH." >&2 + exit 1 +fi + +perf record -F 999 -g --output "$OUT_DIR/perf.data" -- \ + "$BIN" "${FMT_ARGS[@]}" "$FLAME_FILE" >/dev/null + +perf script -i "$OUT_DIR/perf.data" | inferno-collapse-perf > "$OUT_DIR/stacks.folded" +cat "$OUT_DIR/stacks.folded" | inferno-flamegraph > "$OUT_DIR/flamegraph_${TAG}.svg" + +# Extract top leafs (functions at end of stack) and top titles (functions at start of stack) +# Folded format: "func1;func2;func3 12345" where number is sample count +{ + echo "Top leaf functions (by total samples):" + awk '{ + n = split($1, stack, ";"); + if (n > 0) { + leaf = stack[n]; + count = $2 + 0; + leafs[leaf] += count; + } + } + END { + for (f in leafs) { + printf "%d %s\n", leafs[f], f; + } + }' "$OUT_DIR/stacks.folded" | sort -nr | head -20 | awk '{printf " %s: %s\n", $2, $1}' +} > "$OUT_DIR/top_leaf_${TAG}.txt" + +{ + echo "Top title functions (by total samples):" + awk '{ + n = split($1, stack, ";"); + if (n > 0) { + title = stack[1]; + count = $2 + 0; + titles[title] += count; + } + } + END { + for (f in titles) { + printf "%d %s\n", titles[f], f; + } + }' "$OUT_DIR/stacks.folded" | sort -nr | head -20 | awk '{printf " %s: %s\n", $2, $1}' +} > "$OUT_DIR/top_titles_${TAG}.txt" + +echo "Flamegraph written to $OUT_DIR/flamegraph_${TAG}.svg" +echo "Top leafs written to $OUT_DIR/top_leaf_${TAG}.txt" +echo "Top titles written to $OUT_DIR/top_titles_${TAG}.txt" + + diff --git a/src/bin/compare_streaming_legacy.rs b/src/bin/compare_streaming_legacy.rs new file mode 100644 index 00000000..0187c1c7 --- /dev/null +++ b/src/bin/compare_streaming_legacy.rs @@ -0,0 +1,282 @@ +use evtx::{EvtxParser, ParserSettings}; +use serde_json::Value; +use std::collections::HashMap; +use std::env; +use std::error::Error; +use std::path::PathBuf; + +fn main() { + if let Err(e) = run() { + eprintln!("compare_streaming_legacy: {e}"); + std::process::exit(1); + } +} + +fn run() -> Result<(), Box> { + let (path, settings, max_records) = parse_args()?; + + let mut parser_legacy = + EvtxParser::from_path(&path)?.with_configuration(settings.clone().indent(false)); + let mut parser_streaming = + EvtxParser::from_path(&path)?.with_configuration(settings.clone().indent(false)); + + let mut legacy_iter = parser_legacy.records_json(); + let mut streaming_iter = parser_streaming.records_json_stream(); + + let mut index: usize = 0; + + loop { + if let Some(limit) = max_records + && index >= limit + { + break; + } + + let legacy_next = legacy_iter.next(); + let streaming_next = streaming_iter.next(); + + match (legacy_next, streaming_next) { + (None, None) => break, + (Some(_), None) => { + eprintln!( + "Mismatch: legacy parser produced more records than streaming parser at index {}", + index + ); + return Err("record count mismatch".into()); + } + (None, Some(_)) => { + eprintln!( + "Mismatch: streaming parser produced more records than legacy parser at index {}", + index + ); + return Err("record count mismatch".into()); + } + (Some(legacy_res), Some(streaming_res)) => { + match (legacy_res, streaming_res) { + (Ok(legacy_record), Ok(streaming_record)) => { + let legacy_value: Value = serde_json::from_str(&legacy_record.data)?; + let streaming_value: Value = serde_json::from_str(&streaming_record.data)?; + + // Normalize both values to handle duplicate key ordering differences + let legacy_normalized = normalize_for_comparison(&legacy_value); + let streaming_normalized = normalize_for_comparison(&streaming_value); + + if legacy_normalized != streaming_normalized { + eprintln!( + "JSON mismatch at record index {} (EventRecordId={}):", + index, legacy_record.event_record_id + ); + eprintln!("Legacy JSON:"); + eprintln!("{}", serde_json::to_string_pretty(&legacy_value)?); + eprintln!(); + eprintln!("Streaming JSON:"); + eprintln!("{}", serde_json::to_string_pretty(&streaming_value)?); + return Err("streaming JSON does not match legacy JSON".into()); + } + } + (Err(legacy_err), Ok(streaming_record)) => { + eprintln!( + "Error mismatch at record index {}: legacy parser failed, streaming succeeded.", + index + ); + eprintln!("Legacy error: {legacy_err}"); + eprintln!( + "Streaming JSON record (EventRecordId={}):", + streaming_record.event_record_id + ); + let streaming_value: Value = serde_json::from_str(&streaming_record.data)?; + eprintln!("{}", serde_json::to_string_pretty(&streaming_value)?); + return Err("legacy parser failed while streaming succeeded".into()); + } + (Ok(legacy_record), Err(streaming_err)) => { + eprintln!( + "Error mismatch at record index {}: streaming parser failed, legacy succeeded.", + index + ); + eprintln!("Streaming error: {streaming_err}"); + eprintln!( + "Legacy JSON record (EventRecordId={}):", + legacy_record.event_record_id + ); + let legacy_value: Value = serde_json::from_str(&legacy_record.data)?; + eprintln!("{}", serde_json::to_string_pretty(&legacy_value)?); + return Err("streaming parser failed while legacy succeeded".into()); + } + (Err(legacy_err), Err(streaming_err)) => { + // Both failed for this record – treat as equivalent and continue. + eprintln!( + "Both parsers failed at record index {}.\n Legacy error: {}\n Streaming error: {}", + index, legacy_err, streaming_err + ); + } + } + } + } + + index += 1; + } + + eprintln!( + "Success: legacy and streaming JSON outputs match for {} records (path: {}).", + index, + path.display() + ); + + Ok(()) +} + +fn parse_args() -> Result<(PathBuf, ParserSettings, Option), Box> { + let mut args = env::args().skip(1); + + let mut separate_json_attributes = false; + let mut validate_checksums = false; + let mut num_threads: Option = None; + let mut max_records: Option = None; + let mut path: Option = None; + + while let Some(arg) = args.next() { + match arg.as_str() { + "-h" | "--help" => { + print_usage(); + std::process::exit(0); + } + "-s" | "--separate-json-attributes" => { + separate_json_attributes = true; + } + "-c" | "--validate-checksums" => { + validate_checksums = true; + } + "-j" | "--num-threads" => { + let value = args.next().ok_or("missing value for --num-threads")?; + num_threads = Some(value.parse()?); + } + "-n" | "--max-records" => { + let value = args.next().ok_or("missing value for --max-records")?; + max_records = Some(value.parse()?); + } + _ if path.is_none() => { + path = Some(PathBuf::from(arg)); + } + _ => { + return Err(format!("unknown argument: {arg}").into()); + } + } + } + + let path = path.ok_or("missing EVTX path\n\nUse --help for usage.")?; + + let mut settings = ParserSettings::new() + .separate_json_attributes(separate_json_attributes) + .validate_checksums(validate_checksums); + + if let Some(n) = num_threads { + settings = settings.num_threads(n); + } + + Ok((path, settings, max_records)) +} + +fn print_usage() { + eprintln!( + "Usage: compare_streaming_legacy [OPTIONS] + +Compares legacy JSON and streaming JSON output for the given EVTX file and aborts +on the first mismatch, printing both JSON payloads for easy regression test creation. + +Options: + -s, --separate-json-attributes Use separate_json_attributes=true + -c, --validate-checksums Validate chunk checksums + -j, --num-threads Use N worker threads (0 = auto) + -n, --max-records Only compare the first N records + -h, --help Show this help message +" + ); +} + +/// Normalize JSON for comparison, handling duplicate key ordering differences. +/// +/// The streaming parser assigns duplicate keys in document order (first value gets +/// unsuffixed key), while the legacy parser puts the last value in the unsuffixed key. +/// This function normalizes both by grouping duplicate keys (e.g., `Header`, `Header_1`) +/// and comparing them as unordered sets of values. +fn normalize_for_comparison(value: &Value) -> Value { + match value { + Value::Object(map) => { + // Group keys by their base name (e.g., "Header", "Header_1" -> "Header") + let mut groups: HashMap> = HashMap::new(); + + for (key, val) in map { + let base_key = extract_base_key(key); + groups + .entry(base_key) + .or_default() + .push((key.clone(), normalize_for_comparison(val))); + } + + // Build normalized object + let mut result = serde_json::Map::new(); + for (base_key, mut entries) in groups { + if entries.len() == 1 { + // Single key, no duplicates - keep as-is + let (key, val) = entries.remove(0); + result.insert(key, val); + } else { + // Multiple keys with same base - normalize to a canonical form + // Sort values using canonical JSON representation (sorted keys) + let mut values: Vec = entries.into_iter().map(|(_, v)| v).collect(); + values.sort_by_key(canonical_json_string); + + // Store as array under the base key with a special marker + result.insert( + format!("{}__normalized_duplicates", base_key), + Value::Array(values), + ); + } + } + Value::Object(result) + } + Value::Array(arr) => Value::Array(arr.iter().map(normalize_for_comparison).collect()), + other => other.clone(), + } +} + +/// Create a canonical JSON string with sorted keys for comparison. +fn canonical_json_string(value: &Value) -> String { + match value { + Value::Object(map) => { + let mut sorted: Vec<_> = map.iter().collect(); + sorted.sort_by(|a, b| a.0.cmp(b.0)); + let pairs: Vec = sorted + .into_iter() + .map(|(k, v)| format!("\"{}\":{}", k, canonical_json_string(v))) + .collect(); + format!("{{{}}}", pairs.join(",")) + } + Value::Array(arr) => { + let items: Vec = arr.iter().map(canonical_json_string).collect(); + format!("[{}]", items.join(",")) + } + other => other.to_string(), + } +} + +/// Extract the base key name, stripping any `_N` suffix (and handling `_attributes`). +/// e.g., "Header_1" -> "Header", "Certificate_1_attributes" -> "Certificate_attributes" +fn extract_base_key(key: &str) -> String { + // Handle `_attributes` suffix: strip it, find base, then re-add + if let Some(base) = key.strip_suffix("_attributes") { + return format!("{}_attributes", extract_base_key_simple(base)); + } + extract_base_key_simple(key) +} + +/// Simple base key extraction: strip trailing `_N` where N is all digits. +fn extract_base_key_simple(key: &str) -> String { + if let Some(pos) = key.rfind('_') { + let suffix = &key[pos + 1..]; + if suffix.chars().all(|c| c.is_ascii_digit()) && !suffix.is_empty() { + return key[..pos].to_string(); + } + } + key.to_string() +} diff --git a/src/bin/evtx_dump.rs b/src/bin/evtx_dump.rs index 2cc1199e..805ecd71 100644 --- a/src/bin/evtx_dump.rs +++ b/src/bin/evtx_dump.rs @@ -33,12 +33,21 @@ pub enum EvtxOutputFormat { XML, } +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +enum JsonParserKind { + /// Original JSON path: builds a full `serde_json::Value` per record. + Legacy, + /// Streaming JSON path: writes JSON directly to the output writer. + Streaming, +} + struct EvtxDump { parser_settings: ParserSettings, input: PathBuf, show_record_number: bool, output_format: EvtxOutputFormat, output: Box, + json_parser: JsonParserKind, verbosity_level: Option, stop_after_error: bool, /// When set, only the specified events (offseted reltaive to file) will be outputted. @@ -63,6 +72,15 @@ impl EvtxDump { _ => EvtxOutputFormat::XML, }; + let json_parser = match matches + .get_one::("json-parser") + .map(|s| s.as_str()) + { + Some("legacy") => JsonParserKind::Legacy, + Some("streaming") | None => JsonParserKind::Streaming, + _ => JsonParserKind::Streaming, + }; + let no_indent = match ( matches.get_flag("no-indent"), matches.get_one::("output-format"), @@ -161,6 +179,7 @@ impl EvtxDump { show_record_number: !no_show_record_number, output_format, output, + json_parser, verbosity_level, stop_after_error, ranges: event_ranges, @@ -184,9 +203,18 @@ impl EvtxDump { } } EvtxOutputFormat::JSON => { - for record in parser.records_json() { - self.dump_record(record)? - } + match self.json_parser { + JsonParserKind::Streaming => { + for record in parser.records_json_stream() { + self.dump_record(record)? + } + } + JsonParserKind::Legacy => { + for record in parser.records_json() { + self.dump_record(record)? + } + } + }; } }; @@ -387,6 +415,13 @@ fn main() -> Result<()> { "jsonl" - (jsonlines) same as json with --no-indent --dont-show-record-number "#)), ) + .arg( + Arg::new("json-parser") + .long("json-parser") + .value_parser(["legacy", "streaming"]) + .default_value("streaming") + .help("Select JSON parser implementation: legacy (tree-based) or streaming"), + ) .arg( Arg::new("output-target") .long("output") diff --git a/src/binxml/assemble.rs b/src/binxml/assemble.rs index 9e1508f7..9a9721af 100644 --- a/src/binxml/assemble.rs +++ b/src/binxml/assemble.rs @@ -4,7 +4,7 @@ use crate::binxml::value_variant::BinXmlValue; use crate::model::deserialized::{ BinXMLDeserializedTokens, BinXmlTemplateRef, TemplateSubstitutionDescriptor, }; -use crate::model::xml::{XmlElementBuilder, XmlModel, XmlPIBuilder}; +use crate::model::xml::{XmlElement, XmlElementBuilder, XmlModel, XmlPIBuilder}; use crate::xml_output::BinXmlOutput; use log::{debug, trace, warn}; use std::borrow::{BorrowMut, Cow}; @@ -60,7 +60,7 @@ pub fn parse_tokens<'a, T: BinXmlOutput>( } pub fn create_record_model<'a>( - tokens: Vec>>, + tokens: Vec>, chunk: &'a EvtxChunk<'a>, ) -> Result>> { let mut current_element: Option = None; @@ -68,26 +68,18 @@ pub fn create_record_model<'a>( let mut model: Vec = Vec::with_capacity(tokens.len()); for token in tokens { - // Handle all places where we don't care if it's an Owned or a Borrowed value. match token { - Cow::Owned(BinXMLDeserializedTokens::FragmentHeader(_)) - | Cow::Borrowed(BinXMLDeserializedTokens::FragmentHeader(_)) => {} - Cow::Owned(BinXMLDeserializedTokens::TemplateInstance(_)) - | Cow::Borrowed(BinXMLDeserializedTokens::TemplateInstance(_)) => { + BinXMLDeserializedTokens::FragmentHeader(_) => {} + BinXMLDeserializedTokens::TemplateInstance(_) => { return Err(EvtxError::FailedToCreateRecordModel( "Call `expand_templates` before calling this function", )); } - Cow::Owned(BinXMLDeserializedTokens::AttributeList) - | Cow::Borrowed(BinXMLDeserializedTokens::AttributeList) => {} - - Cow::Owned(BinXMLDeserializedTokens::CloseElement) - | Cow::Borrowed(BinXMLDeserializedTokens::CloseElement) => { + BinXMLDeserializedTokens::AttributeList => {} + BinXMLDeserializedTokens::CloseElement => { model.push(XmlModel::CloseElement); } - - Cow::Owned(BinXMLDeserializedTokens::CloseStartElement) - | Cow::Borrowed(BinXMLDeserializedTokens::CloseStartElement) => { + BinXMLDeserializedTokens::CloseStartElement => { trace!("BinXMLDeserializedTokens::CloseStartElement"); match current_element.take() { None => { @@ -98,24 +90,20 @@ pub fn create_record_model<'a>( Some(builder) => model.push(XmlModel::OpenElement(builder.finish()?)), }; } - Cow::Owned(BinXMLDeserializedTokens::CDATASection) - | Cow::Borrowed(BinXMLDeserializedTokens::CDATASection) => { + BinXMLDeserializedTokens::CDATASection => { return Err(EvtxError::FailedToCreateRecordModel( "Unimplemented - CDATA", )); } - Cow::Owned(BinXMLDeserializedTokens::CharRef) - | Cow::Borrowed(BinXMLDeserializedTokens::CharRef) => { + BinXMLDeserializedTokens::CharRef => { return Err(EvtxError::FailedToCreateRecordModel( "Unimplemented - CharacterReference", )); } - Cow::Owned(BinXMLDeserializedTokens::EntityRef(ref entity)) - | Cow::Borrowed(&BinXMLDeserializedTokens::EntityRef(ref entity)) => { + BinXMLDeserializedTokens::EntityRef(ref entity) => { model.push(XmlModel::EntityRef(expand_string_ref(&entity.name, chunk)?)) } - Cow::Owned(BinXMLDeserializedTokens::PITarget(ref name)) - | Cow::Borrowed(&BinXMLDeserializedTokens::PITarget(ref name)) => { + BinXMLDeserializedTokens::PITarget(ref name) => { let mut builder = XmlPIBuilder::new(); if current_pi.is_some() { warn!("PITarget without following PIData, previous target will be ignored.") @@ -123,7 +111,7 @@ pub fn create_record_model<'a>( builder.name(expand_string_ref(&name.name, chunk)?); current_pi = Some(builder); } - Cow::Owned(BinXMLDeserializedTokens::PIData(data)) => match current_pi.take() { + BinXMLDeserializedTokens::PIData(data) => match current_pi.take() { None => { return Err(EvtxError::FailedToCreateRecordModel( "PI Data without PI target - Bad parser state", @@ -134,34 +122,14 @@ pub fn create_record_model<'a>( model.push(builder.finish()); } }, - Cow::Borrowed(BinXMLDeserializedTokens::PIData(data)) => match current_pi.take() { - None => { - return Err(EvtxError::FailedToCreateRecordModel( - "PI Data without PI target - Bad parser state", - )); - } - Some(mut builder) => { - builder.data(Cow::Borrowed(data)); - model.push(builder.finish()); - } - }, - Cow::Owned(BinXMLDeserializedTokens::Substitution(_)) - | Cow::Borrowed(BinXMLDeserializedTokens::Substitution(_)) => { + BinXMLDeserializedTokens::Substitution(_) => { return Err(EvtxError::FailedToCreateRecordModel( "Call `expand_templates` before calling this function", )); } - Cow::Owned(BinXMLDeserializedTokens::EndOfStream) - | Cow::Borrowed(BinXMLDeserializedTokens::EndOfStream) => { - model.push(XmlModel::EndOfStream) - } - Cow::Owned(BinXMLDeserializedTokens::StartOfStream) - | Cow::Borrowed(BinXMLDeserializedTokens::StartOfStream) => { - model.push(XmlModel::StartOfStream) - } - - Cow::Owned(BinXMLDeserializedTokens::CloseEmptyElement) - | Cow::Borrowed(BinXMLDeserializedTokens::CloseEmptyElement) => { + BinXMLDeserializedTokens::EndOfStream => model.push(XmlModel::EndOfStream), + BinXMLDeserializedTokens::StartOfStream => model.push(XmlModel::StartOfStream), + BinXMLDeserializedTokens::CloseEmptyElement => { trace!("BinXMLDeserializedTokens::CloseEmptyElement"); match current_element.take() { None => { @@ -175,9 +143,7 @@ pub fn create_record_model<'a>( } }; } - - Cow::Owned(BinXMLDeserializedTokens::Attribute(ref attr)) - | Cow::Borrowed(&BinXMLDeserializedTokens::Attribute(ref attr)) => { + BinXMLDeserializedTokens::Attribute(ref attr) => { trace!("BinXMLDeserializedTokens::Attribute(attr) - {:?}", attr); if current_element.is_none() { return Err(EvtxError::FailedToCreateRecordModel( @@ -188,8 +154,7 @@ pub fn create_record_model<'a>( builder.attribute_name(expand_string_ref(&attr.name, chunk)?) } } - Cow::Owned(BinXMLDeserializedTokens::OpenStartElement(ref elem)) - | Cow::Borrowed(&BinXMLDeserializedTokens::OpenStartElement(ref elem)) => { + BinXMLDeserializedTokens::OpenStartElement(ref elem) => { trace!( "BinXMLDeserializedTokens::OpenStartElement(elem) - {:?}", elem.name @@ -198,7 +163,7 @@ pub fn create_record_model<'a>( builder.name(expand_string_ref(&elem.name, chunk)?); current_element = Some(builder); } - Cow::Owned(BinXMLDeserializedTokens::Value(value)) => { + BinXMLDeserializedTokens::Value(value) => { trace!("BinXMLDeserializedTokens::Value(value) - {:?}", value); match current_element { None => match value { @@ -216,24 +181,6 @@ pub fn create_record_model<'a>( } } } - Cow::Borrowed(BinXMLDeserializedTokens::Value(value)) => { - trace!("BinXMLDeserializedTokens::Value(value) - {:?}", value); - match current_element { - None => match value { - BinXmlValue::EvtXml => { - return Err(EvtxError::FailedToCreateRecordModel( - "Call `expand_templates` before calling this function", - )); - } - _ => { - model.push(XmlModel::Value(Cow::Borrowed(value))); - } - }, - Some(ref mut builder) => { - builder.attribute_value(Cow::Borrowed(value))?; - } - } - } } } @@ -267,7 +214,7 @@ fn expand_token_substitution<'a>( template: &mut BinXmlTemplateRef<'a>, substitution_descriptor: &TemplateSubstitutionDescriptor, chunk: &'a EvtxChunk<'a>, - stack: &mut Vec>>, + stack: &mut Vec>, ) -> Result<()> { if substitution_descriptor.ignore { return Ok(()); @@ -282,10 +229,10 @@ fn expand_token_substitution<'a>( value, BinXMLDeserializedTokens::Value(BinXmlValue::NullType), ); - _expand_templates(Cow::Owned(value), chunk, stack)?; + _expand_templates(value, chunk, stack)?; } else { _expand_templates( - Cow::Owned(BinXMLDeserializedTokens::Value(BinXmlValue::NullType)), + BinXMLDeserializedTokens::Value(BinXmlValue::NullType), chunk, stack, )?; @@ -297,18 +244,19 @@ fn expand_token_substitution<'a>( fn expand_template<'a>( mut template: BinXmlTemplateRef<'a>, chunk: &'a EvtxChunk<'a>, - stack: &mut Vec>>, + stack: &mut Vec>, ) -> Result<()> { if let Some(template_def) = chunk .template_table .get_template(template.template_def_offset) { // We expect to find all the templates in the template cache. + // Clone from cache since the cache owns the tokens. for token in template_def.tokens.iter() { if let BinXMLDeserializedTokens::Substitution(substitution_descriptor) = token { expand_token_substitution(&mut template, substitution_descriptor, chunk, stack)?; } else { - _expand_templates(Cow::Borrowed(token), chunk, stack)?; + _expand_templates(token.clone(), chunk, stack)?; } } } else { @@ -329,7 +277,7 @@ fn expand_template<'a>( if let BinXMLDeserializedTokens::Substitution(substitution_descriptor) = token { expand_token_substitution(&mut template, &substitution_descriptor, chunk, stack)?; } else { - _expand_templates(Cow::Owned(token), chunk, stack)?; + _expand_templates(token, chunk, stack)?; } } }; @@ -338,35 +286,19 @@ fn expand_template<'a>( } fn _expand_templates<'a>( - token: Cow<'a, BinXMLDeserializedTokens<'a>>, + token: BinXMLDeserializedTokens<'a>, chunk: &'a EvtxChunk<'a>, - stack: &mut Vec>>, + stack: &mut Vec>, ) -> Result<()> { match token { - // Owned values can be consumed when flatting, and passed on as owned. - Cow::Owned(BinXMLDeserializedTokens::Value(BinXmlValue::BinXmlType(tokens))) => { + BinXMLDeserializedTokens::Value(BinXmlValue::BinXmlType(tokens)) => { for token in tokens.into_iter() { - _expand_templates(Cow::Owned(token), chunk, stack)?; + _expand_templates(token, chunk, stack)?; } } - - Cow::Borrowed(BinXMLDeserializedTokens::Value(BinXmlValue::BinXmlType(tokens))) => { - for token in tokens.iter() { - _expand_templates(Cow::Borrowed(token), chunk, stack)?; - } - } - // Actual template handling. - Cow::Owned(BinXMLDeserializedTokens::TemplateInstance(template)) => { + BinXMLDeserializedTokens::TemplateInstance(template) => { expand_template(template, chunk, stack)?; } - Cow::Borrowed(BinXMLDeserializedTokens::TemplateInstance(template)) => { - // This can happen if a template has a token which is: - // 1. Another template. - // 2. Is not a substitution (because they are `Owned` values). - // We never actually see this in practice, so we don't mind paying for `clone` here. - expand_template(template.clone(), chunk, stack)?; - } - _ => stack.push(token), } @@ -376,13 +308,217 @@ fn _expand_templates<'a>( pub fn expand_templates<'a>( token_tree: Vec>, chunk: &'a EvtxChunk<'a>, -) -> Result>>> { +) -> Result>> { // We can assume the new tree will be at least as big as the old one. let mut stack = Vec::with_capacity(token_tree.len()); for token in token_tree { - _expand_templates(Cow::Owned(token), chunk, &mut stack)? + _expand_templates(token, chunk, &mut stack)? } Ok(stack) } + +fn stream_expand_token<'a, T: BinXmlOutput>( + token: BinXMLDeserializedTokens<'a>, + chunk: &'a EvtxChunk<'a>, + visitor: &mut T, + element_stack: &mut Vec>, + current_element: &mut Option>, + current_pi: &mut Option>, +) -> Result<()> { + match token { + BinXMLDeserializedTokens::FragmentHeader(_) | BinXMLDeserializedTokens::AttributeList => {} + BinXMLDeserializedTokens::OpenStartElement(elem) => { + let mut builder = XmlElementBuilder::new(); + builder.name(expand_string_ref(&elem.name, chunk)?); + *current_element = Some(builder); + } + BinXMLDeserializedTokens::Attribute(attr) => { + if let Some(b) = current_element.as_mut() { + b.attribute_name(expand_string_ref(&attr.name, chunk)?); + } else { + return Err(EvtxError::FailedToCreateRecordModel( + "attribute - Bad parser state", + )); + } + } + BinXMLDeserializedTokens::Value(value) => { + if let Some(b) = current_element.as_mut() { + b.attribute_value(Cow::Owned(value))?; + } else { + visitor.visit_characters(Cow::Owned(value))?; + } + } + BinXMLDeserializedTokens::CloseStartElement => { + let element = current_element + .take() + .ok_or(EvtxError::FailedToCreateRecordModel( + "close start - Bad parser state", + ))? + .finish()?; + visitor.visit_open_start_element(&element)?; + element_stack.push(element); + } + BinXMLDeserializedTokens::CloseEmptyElement => { + let element = current_element + .take() + .ok_or(EvtxError::FailedToCreateRecordModel( + "close empty - Bad parser state", + ))? + .finish()?; + visitor.visit_open_start_element(&element)?; + visitor.visit_close_element(&element)?; + } + BinXMLDeserializedTokens::CloseElement => { + let element = element_stack + .pop() + .ok_or(EvtxError::FailedToCreateRecordModel( + "close element - Bad parser state", + ))?; + visitor.visit_close_element(&element)?; + } + BinXMLDeserializedTokens::EntityRef(entity) => { + match expand_string_ref(&entity.name, chunk)? { + Cow::Borrowed(s) => visitor.visit_entity_reference(s)?, + Cow::Owned(s) => { + let tmp = s; + visitor.visit_entity_reference(&tmp)?; + } + } + } + BinXMLDeserializedTokens::PITarget(name) => { + let mut b = XmlPIBuilder::new(); + b.name(expand_string_ref(&name.name, chunk)?); + *current_pi = Some(b); + } + BinXMLDeserializedTokens::PIData(data) => { + let mut b = current_pi + .take() + .ok_or(EvtxError::FailedToCreateRecordModel( + "PI Data without PI target - Bad parser state", + ))?; + b.data(Cow::Owned(data)); + if let XmlModel::PI(pi) = b.finish() { + visitor.visit_processing_instruction(&pi)?; + } + } + BinXMLDeserializedTokens::StartOfStream | BinXMLDeserializedTokens::EndOfStream => {} + BinXMLDeserializedTokens::TemplateInstance(template) => { + if let Some(template_def) = chunk + .template_table + .get_template(template.template_def_offset) + { + for t in template_def.tokens.iter() { + match t { + BinXMLDeserializedTokens::Substitution(desc) => { + if desc.ignore { + continue; + } + if let Some(val) = template + .substitution_array + .get(desc.substitution_index as usize) + { + stream_expand_token( + val.clone(), + chunk, + visitor, + element_stack, + current_element, + current_pi, + )?; + } else { + visitor.visit_characters(Cow::Owned(BinXmlValue::NullType))?; + } + } + other => stream_expand_token( + other.clone(), + chunk, + visitor, + element_stack, + current_element, + current_pi, + )?, + } + } + } else { + let mut cursor = Cursor::new(chunk.data); + let _ = cursor.seek(SeekFrom::Start(u64::from(template.template_def_offset))); + let template_def = read_template_definition( + &mut cursor, + Some(chunk), + chunk.settings.get_ansi_codec(), + )?; + // For templates not in cache, expand them first then visit + let expanded = expand_templates(template_def.tokens, chunk)?; + for t in expanded { + match t { + BinXMLDeserializedTokens::Substitution(desc) => { + if desc.ignore { + continue; + } + if let Some(val) = template + .substitution_array + .get(desc.substitution_index as usize) + { + stream_expand_token( + val.clone(), + chunk, + visitor, + element_stack, + current_element, + current_pi, + )?; + } else { + visitor.visit_characters(Cow::Owned(BinXmlValue::NullType))?; + } + } + other => stream_expand_token( + other, + chunk, + visitor, + element_stack, + current_element, + current_pi, + )?, + } + } + } + } + BinXMLDeserializedTokens::Substitution(_) => { + return Err(EvtxError::FailedToCreateRecordModel( + "Call `expand_templates` before calling this function", + )); + } + BinXMLDeserializedTokens::CDATASection | BinXMLDeserializedTokens::CharRef => { + return Err(EvtxError::FailedToCreateRecordModel( + "Unimplemented CDATA/CharRef", + )); + } + } + Ok(()) +} + +pub fn parse_tokens_streaming<'a, T: BinXmlOutput>( + tokens: Vec>, + chunk: &'a EvtxChunk<'a>, + visitor: &mut T, +) -> Result<()> { + let expanded = expand_templates(tokens, chunk)?; + visitor.visit_start_of_stream()?; + let mut element_stack: Vec> = Vec::new(); + let mut current_element: Option> = None; + let mut current_pi: Option> = None; + for token in expanded { + stream_expand_token( + token, + chunk, + visitor, + &mut element_stack, + &mut current_element, + &mut current_pi, + )?; + } + visitor.visit_end_of_stream()?; + Ok(()) +} diff --git a/src/evtx_parser.rs b/src/evtx_parser.rs index 9fee7462..d893f79e 100644 --- a/src/evtx_parser.rs +++ b/src/evtx_parser.rs @@ -489,6 +489,14 @@ impl EvtxParser { ) -> impl Iterator>> + '_ { self.serialized_records(|record| record.and_then(|record| record.into_json_value())) } + + /// Return an iterator over all the records. + /// Records will be JSON-formatted using a streaming writer that skips XmlModel and serde_json::Value construction. + pub fn records_json_stream( + &mut self, + ) -> impl Iterator>> + '_ { + self.serialized_records(|record| record.and_then(|record| record.into_json_stream())) + } } pub struct IterChunks<'c, T: ReadSeek> { diff --git a/src/evtx_record.rs b/src/evtx_record.rs index 3367dc97..60b4ff94 100644 --- a/src/evtx_record.rs +++ b/src/evtx_record.rs @@ -1,4 +1,4 @@ -use crate::binxml::assemble::parse_tokens; +use crate::binxml::assemble::{parse_tokens, parse_tokens_streaming}; use crate::err::{ DeserializationError, DeserializationResult, EvtxError, Result, SerializationError, }; @@ -119,6 +119,32 @@ impl EvtxRecord<'_> { }) } + /// Consumes the record and streams JSON directly into a buffer using the streaming visitor. + pub fn into_json_stream(self) -> Result> { + // Estimate buffer size based on token count + let capacity_hint = self.tokens.len().saturating_mul(64); + let buf = Vec::with_capacity(capacity_hint); + let mut output_builder = crate::JsonStreamOutput::with_writer(buf, &self.settings); + + let event_record_id = self.event_record_id; + let timestamp = self.timestamp; + parse_tokens_streaming(self.tokens, self.chunk, &mut output_builder).map_err(|e| { + EvtxError::FailedToParseRecord { + record_id: event_record_id, + source: Box::new(e), + } + })?; + + let writer = output_builder.finish()?; + let data = String::from_utf8(writer).map_err(crate::err::SerializationError::from)?; + + Ok(SerializedEvtxRecord { + event_record_id, + timestamp, + data, + }) + } + /// Consumes the record and parse it, producing an XML serialized record. pub fn into_xml(self) -> Result> { let mut output_builder = XmlOutput::with_writer(Vec::new(), &self.settings); diff --git a/src/json_stream_output.rs b/src/json_stream_output.rs new file mode 100644 index 00000000..538f743f --- /dev/null +++ b/src/json_stream_output.rs @@ -0,0 +1,1130 @@ +use crate::ParserSettings; +use crate::err::{SerializationError, SerializationResult}; +use crate::xml_output::BinXmlOutput; + +use crate::binxml::name::BinXmlName; +use crate::binxml::value_variant::BinXmlValue; +use crate::model::xml::{BinXmlPI, XmlElement}; +use quick_xml::events::BytesText; +use serde_json::Value as JsonValue; +use std::borrow::Cow; +use std::io::Write; + +/// Represents how the current XML element is being rendered in JSON. +#[derive(Debug, Copy, Clone, Eq, PartialEq)] +enum ElementValueKind { + /// We haven't decided yet if this element will be rendered as a scalar, + /// an object, or `null`. This is the case for elements without attributes. + Pending, + /// The element has been rendered as a scalar JSON value (`"key": 123`). + Scalar, + /// The element is rendered as an object (`"key": { ... }`). + Object, +} + +/// Per-element state while streaming. +#[derive(Debug)] +struct ElementState { + /// JSON key for this element in its parent object. + name: String, + /// How this element's JSON value is currently represented. + kind: ElementValueKind, + /// Whether we've already emitted a `#text` field for this element (when `kind == Object`). + has_text: bool, + /// Whether we've emitted `_attributes` separately (when `separate_json_attributes == true`). + /// If true and `kind == Pending` on close, we skip emitting `null` to match legacy behavior. + has_separate_attributes: bool, + /// Buffered scalar values for elements without attributes. + /// We buffer instead of writing immediately to support concatenation of multiple character nodes. + /// Uses serde_json::Value to avoid lifetime issues with BinXmlValue. + buffered_values: Vec, +} + +/// JSON object context (either the root object or any nested object). +#[derive(Debug)] +struct ObjectFrame { + /// Whether we've already written any field in this object. + first_field: bool, + /// Keys already used in this object (for duplicate key handling). + used_keys: std::collections::HashSet, +} + +pub struct JsonStreamOutput { + writer: Option, + /// Whether pretty-printing was requested. Currently unused – streaming + /// output is always compact, and callers compare via `serde_json::Value`. + #[allow(dead_code)] + indent: bool, + separate_json_attributes: bool, + + /// Stack of JSON object frames. The root object is at index 0. + frames: Vec, + /// Stack of currently open XML elements. + elements: Vec, + + /// Optional depth (in `elements`) of an `EventData` element that owns a + /// synthetic `"Data": { "#text": [...] }` aggregator, used to model + /// `......` without building an + /// intermediate tree. + data_owner_depth: Option, + /// Collected values for the aggregated `"Data": { "#text": [...] }` array. + data_values: Vec, + /// Whether we are currently inside a `` element that contributes to + /// the aggregated `"Data"` array. + data_inside_element: bool, +} + +impl JsonStreamOutput { + pub fn with_writer(writer: W, settings: &ParserSettings) -> Self { + JsonStreamOutput { + writer: Some(writer), + indent: settings.should_indent(), + separate_json_attributes: settings.should_separate_json_attributes(), + frames: Vec::new(), + elements: Vec::new(), + data_owner_depth: None, + data_values: Vec::new(), + data_inside_element: false, + } + } + + /// Finalize the JSON stream and return the underlying writer. + pub fn finish(mut self) -> SerializationResult { + // If the caller didn't drive the parser fully, we may still have an + // open root object; try to close it gracefully. + if !self.frames.is_empty() { + // Close any remaining open element objects. + while let Some(elem) = self.elements.pop() { + if elem.kind == ElementValueKind::Object { + self.end_object()?; + } + } + + // Close the root object. + self.write_bytes(b"}")?; + self.frames.clear(); + } + + self.writer + .take() + .ok_or_else(|| SerializationError::JsonStructureError { + message: "Writer already taken".to_string(), + }) + } + + pub fn into_writer(self) -> W { + self.finish() + .expect("failed to finalize JSON output in JsonStreamOutput") + } + + fn writer_mut(&mut self) -> &mut W { + self.writer + .as_mut() + .expect("JsonStreamOutput writer missing") + } + + fn write_bytes(&mut self, bytes: &[u8]) -> SerializationResult<()> { + self.writer_mut() + .write_all(bytes) + .map_err(SerializationError::from) + } + + fn current_frame_mut(&mut self) -> &mut ObjectFrame { + self.frames + .last_mut() + .expect("no current JSON object frame available") + } + + /// Write a comma if needed for the current JSON object. + fn write_comma_if_needed(&mut self) -> SerializationResult<()> { + let frame = self.current_frame_mut(); + if frame.first_field { + frame.first_field = false; + Ok(()) + } else { + self.write_bytes(b",") + } + } + + /// Reserve a unique key in the current frame without writing it. + /// Returns the unique key that will be used (with `_1`, `_2` suffix if needed). + fn reserve_unique_key(&mut self, key: &str) -> String { + let frame = self + .frames + .last_mut() + .expect("no current JSON object frame"); + if frame.used_keys.contains(key) { + // Find next available suffix + let mut suffix = 1; + loop { + let candidate = format!("{}_{}", key, suffix); + if !frame.used_keys.contains(&candidate) { + frame.used_keys.insert(candidate.clone()); + return candidate; + } + suffix += 1; + } + } else { + frame.used_keys.insert(key.to_owned()); + key.to_owned() + } + } + + /// Write a JSON string key (with surrounding quotes and escaping). + /// Write a JSON string key, handling duplicates by appending `_1`, `_2`, etc. + fn write_key(&mut self, key: &str) -> SerializationResult<()> { + self.write_comma_if_needed()?; + + // Check for duplicate keys and find a unique name + let unique_key = self.reserve_unique_key(key); + + serde_json::to_writer(self.writer_mut(), &unique_key).map_err(SerializationError::from)?; + self.write_bytes(b":") + } + + /// Write a pre-reserved key directly (no duplicate checking needed). + fn write_reserved_key(&mut self, key: &str) -> SerializationResult<()> { + self.write_comma_if_needed()?; + serde_json::to_writer(self.writer_mut(), key).map_err(SerializationError::from)?; + self.write_bytes(b":") + } + + /// Start a new nested JSON object as the value of `key` in the current object. + fn start_object_value(&mut self, key: &str) -> SerializationResult<()> { + self.write_key(key)?; + self.write_bytes(b"{")?; + self.frames.push(ObjectFrame { + first_field: true, + used_keys: std::collections::HashSet::new(), + }); + Ok(()) + } + + /// End the current JSON object frame. + fn end_object(&mut self) -> SerializationResult<()> { + self.write_bytes(b"}")?; + self.frames.pop(); + Ok(()) + } + + /// For elements without attributes, if their first child is another element + /// we need to materialize this element as an object (`"name": { ... }`). + fn ensure_parent_is_object(&mut self) -> SerializationResult<()> { + let Some(parent_index) = self.elements.len().checked_sub(1) else { + return Ok(()); + }; + + let parent_kind = self.elements[parent_index].kind; + + match parent_kind { + ElementValueKind::Pending => { + // Turn `"parent": null` into `"parent": { ... }` by starting an + // object value for it now. + let key = self.elements[parent_index].name.clone(); + let was_reserved = self.elements[parent_index].has_separate_attributes; + + // If the key was pre-reserved (separate_json_attributes mode), use + // write_reserved_key to avoid double-reservation. + if was_reserved { + self.write_reserved_key(&key)?; + } else { + self.write_key(&key)?; + } + self.write_bytes(b"{")?; + self.frames.push(ObjectFrame { + first_field: true, + used_keys: std::collections::HashSet::new(), + }); + + self.elements[parent_index].kind = ElementValueKind::Object; + } + ElementValueKind::Scalar => { + // Element had text content but now has child elements too. + // Turn it into an object and move buffered text to #text field. + let key = self.elements[parent_index].name.clone(); + let was_reserved = self.elements[parent_index].has_separate_attributes; + let buffered = std::mem::take(&mut self.elements[parent_index].buffered_values); + + if was_reserved { + self.write_reserved_key(&key)?; + } else { + self.write_key(&key)?; + } + self.write_bytes(b"{")?; + self.frames.push(ObjectFrame { + first_field: true, + used_keys: std::collections::HashSet::new(), + }); + + // Write the buffered text as #text if not in separate mode + // (in separate mode, text in mixed-content elements is dropped). + if !buffered.is_empty() && !self.separate_json_attributes { + self.write_key("#text")?; + if buffered.len() == 1 { + serde_json::to_writer(self.writer_mut(), &buffered[0]) + .map_err(SerializationError::from)?; + } else { + serde_json::to_writer(self.writer_mut(), &buffered) + .map_err(SerializationError::from)?; + } + } + + self.elements[parent_index].kind = ElementValueKind::Object; + } + ElementValueKind::Object => { + // Already an object, nothing to do. + } + } + + Ok(()) + } + + /// Append a value into the aggregated `"Data": { "#text": [...] }` under an + /// `EventData` element. The BinXml value may itself be an array (e.g. + /// `StringArrayType`), in which case it is stored as-is, matching the + /// behaviour of `JsonOutput::value_to_json`. + fn write_data_aggregated_value(&mut self, value: Cow) -> SerializationResult<()> { + let json_value: JsonValue = match &value { + Cow::Borrowed(v) => JsonValue::from(*v), + Cow::Owned(v) => JsonValue::from(v), + }; + + self.data_values.push(json_value); + Ok(()) + } + + /// Finalize the aggregated `"Data"` value, if any. + /// With `separate_json_attributes == false`: outputs `"Data": { "#text": ... }` + /// With `separate_json_attributes == true`: outputs `"Data": ...` directly + fn finalize_data_aggregator(&mut self) -> SerializationResult<()> { + if self.data_owner_depth.is_some() && !self.data_values.is_empty() { + // Avoid aliasing `self` while iterating by taking the values out. + let values = std::mem::take(&mut self.data_values); + + if self.separate_json_attributes { + // In separate_json_attributes mode, output directly without wrapper. + // Legacy concatenates multiple string values into one. + self.write_key("Data")?; + if values.len() == 1 { + serde_json::to_writer(self.writer_mut(), &values[0]) + .map_err(SerializationError::from)?; + } else { + // Concatenate multiple values as strings (legacy behavior). + let mut concat = String::new(); + for v in &values { + match v { + JsonValue::String(s) => concat.push_str(s), + JsonValue::Number(n) => concat.push_str(&n.to_string()), + JsonValue::Bool(b) => { + concat.push_str(if *b { "true" } else { "false" }) + } + JsonValue::Null => {} + _ => concat.push_str(&v.to_string()), + } + } + serde_json::to_writer(self.writer_mut(), &concat) + .map_err(SerializationError::from)?; + } + } else { + // With `#attributes` mode, wrap in `"Data": { "#text": ... }`. + self.start_object_value("Data")?; + self.write_key("#text")?; + + if values.len() == 1 { + serde_json::to_writer(self.writer_mut(), &values[0]) + .map_err(SerializationError::from)?; + } else { + // Multiple `` children: aggregate into an array. + self.write_bytes(b"[")?; + for (idx, json_value) in values.into_iter().enumerate() { + if idx > 0 { + self.write_bytes(b",")?; + } + serde_json::to_writer(self.writer_mut(), &json_value) + .map_err(SerializationError::from)?; + } + self.write_bytes(b"]")?; + } + + self.end_object()?; + } + } + + // Reset aggregator state. + self.data_owner_depth = None; + self.data_inside_element = false; + Ok(()) + } +} + +impl BinXmlOutput for JsonStreamOutput { + fn visit_start_of_stream(&mut self) -> SerializationResult<()> { + // Open the root JSON object. + self.write_bytes(b"{")?; + self.frames.push(ObjectFrame { + first_field: true, + used_keys: std::collections::HashSet::new(), + }); + Ok(()) + } + + fn visit_end_of_stream(&mut self) -> SerializationResult<()> { + // Close any remaining elements that own JSON object frames. + while let Some(elem) = self.elements.pop() { + if elem.kind == ElementValueKind::Object { + self.end_object()?; + } + } + + // Close the root JSON object. + if !self.frames.is_empty() { + self.write_bytes(b"}")?; + self.frames.clear(); + } + + Ok(()) + } + + fn visit_open_start_element(&mut self, element: &XmlElement) -> SerializationResult<()> { + // If we're nested under an element without attributes, and this is the + // first child element, we must represent the parent as an object. + self.ensure_parent_is_object()?; + + // Determine JSON key for this element. + let element_name = element.name.as_str(); + + // Special handling for `` nodes: they use their "Name" attribute + // as the JSON key when present, and ignore attributes entirely. + let is_data = element_name == "Data"; + let data_name_attr = if is_data { + element + .attributes + .iter() + .find(|a| a.name.as_ref().as_str() == "Name") + } else { + None + }; + + let key = if let Some(name_attr) = data_name_attr { + name_attr.value.as_cow_str().into_owned() + } else { + element_name.to_owned() + }; + + // Aggregated `......` case: + // multiple `` children without a `Name` attribute become a single + // `"Data": { "#text": [ ... ] }` object under their `EventData` parent. + if is_data + && data_name_attr.is_none() + && let Some(parent) = self.elements.last() + && parent.name == "EventData" + { + // Depth of the owning `EventData` element. + let owner_depth = self.elements.len(); + + // Initialize a new aggregator for this `EventData`, if needed. + if self.data_owner_depth != Some(owner_depth) { + self.data_owner_depth = Some(owner_depth); + self.data_values.clear(); + } + + // We're now inside a `` element that contributes to + // the aggregated array. + self.data_inside_element = true; + + // Do NOT push a new `ElementState` for this `` node; + // its values are handled by the aggregator. + return Ok(()); + } + + // In the JSON representation, `` behaves like a + // regular node without attributes. Attributes whose JSON value is + // `null` are ignored (this matches `JsonOutput`). + let mut has_json_attributes = false; + if !is_data { + for attr in &element.attributes { + let json_value: JsonValue = JsonValue::from(attr.value.as_ref()); + if !json_value.is_null() { + has_json_attributes = true; + break; + } + } + } + + // Elements with attributes and `separate_json_attributes == false` are + // materialized as objects with a `#attributes` field. + if has_json_attributes && !self.separate_json_attributes { + // `"key": { "#attributes": { ... } }` + self.start_object_value(&key)?; + + // Write `#attributes` object. + { + // Update first-field state for the element object. + let first_field = { + let frame = self.current_frame_mut(); + let first = frame.first_field; + if first { + frame.first_field = false; + } + first + }; + if !first_field { + self.write_bytes(b",")?; + } + serde_json::to_writer(self.writer_mut(), "#attributes") + .map_err(SerializationError::from)?; + self.write_bytes(b":")?; + + // Start attributes object. + self.write_bytes(b"{")?; + self.frames.push(ObjectFrame { + first_field: true, + used_keys: std::collections::HashSet::new(), + }); + + { + for attr in &element.attributes { + let attr_key = attr.name.as_str(); + // Skip the `Name` attribute on ``; it is only + // used as the field name, not as an attribute. + if is_data && attr_key == "Name" { + continue; + } + + let json_value: JsonValue = JsonValue::from(attr.value.as_ref()); + if json_value.is_null() { + continue; + } + + let is_first = { + let frame = self.current_frame_mut(); + let first = frame.first_field; + if first { + frame.first_field = false; + } + first + }; + if !is_first { + self.write_bytes(b",")?; + } + serde_json::to_writer(self.writer_mut(), attr_key) + .map_err(SerializationError::from)?; + self.write_bytes(b":")?; + serde_json::to_writer(self.writer_mut(), &json_value) + .map_err(SerializationError::from)?; + } + } + + // Close `#attributes` object. + self.end_object()?; + } + + self.elements.push(ElementState { + name: key, + kind: ElementValueKind::Object, + has_text: false, + has_separate_attributes: false, + buffered_values: Vec::new(), + }); + } else { + // `separate_json_attributes == true` or element has no attributes. + let wrote_separate_attrs = has_json_attributes && self.separate_json_attributes; + + // If we're writing `_attributes`, pre-reserve the element key so both + // the `_attributes` and the element itself use matching suffixes. + let element_key = if wrote_separate_attrs { + let unique_key = self.reserve_unique_key(&key); + + // Emit `"_attributes": { ... }` into the parent object. + let attr_key = format!("{}_attributes", unique_key); + self.write_reserved_key(&attr_key)?; + self.write_bytes(b"{")?; + self.frames.push(ObjectFrame { + first_field: true, + used_keys: std::collections::HashSet::new(), + }); + + { + for attr in &element.attributes { + let attr_name = attr.name.as_str(); + let json_value: JsonValue = JsonValue::from(attr.value.as_ref()); + if json_value.is_null() { + continue; + } + + let is_first = { + let frame = self.current_frame_mut(); + let first = frame.first_field; + if first { + frame.first_field = false; + } + first + }; + if !is_first { + self.write_bytes(b",")?; + } + serde_json::to_writer(self.writer_mut(), attr_name) + .map_err(SerializationError::from)?; + self.write_bytes(b":")?; + serde_json::to_writer(self.writer_mut(), &json_value) + .map_err(SerializationError::from)?; + } + } + + self.end_object()?; + unique_key + } else { + // No attributes to write - use original key (will be deduped on write). + key + }; + + // We delay emitting the actual `"key": ...` until we see either + // a character node or a child element, so we can decide whether + // this element is a scalar, an object, or `null`. + self.elements.push(ElementState { + name: element_key, + kind: ElementValueKind::Pending, + has_text: false, + has_separate_attributes: wrote_separate_attrs, + buffered_values: Vec::new(), + }); + } + + Ok(()) + } + + fn visit_close_element(&mut self, element: &XmlElement) -> SerializationResult<()> { + let element_name = element.name.as_str(); + + // Closing an aggregated `` node: we only need to mark that we + // are no longer inside a contributing ``; the owning `EventData` + // element remains on the stack. + if element_name == "Data" && self.data_owner_depth.is_some() && self.data_inside_element { + self.data_inside_element = false; + return Ok(()); + } + + let current_depth = self.elements.len(); + let is_data_owner = self.data_owner_depth == Some(current_depth); + + if let Some(elem) = self.elements.pop() { + if is_data_owner { + // Finalize the aggregated `"Data": { "#text": [...] }` object. + self.finalize_data_aggregator()?; + } + + match elem.kind { + ElementValueKind::Pending => { + // No text and no children – render as `null`, unless we already + // emitted `_attributes` separately (legacy omits the null). + if !elem.has_separate_attributes { + self.write_key(&elem.name)?; + self.write_bytes(b"null")?; + } + } + ElementValueKind::Scalar => { + // Write the buffered scalar value(s) now. + if !elem.buffered_values.is_empty() { + // If key was pre-reserved (separate_json_attributes mode), use reserved writer. + if elem.has_separate_attributes { + self.write_reserved_key(&elem.name)?; + } else { + self.write_key(&elem.name)?; + } + if elem.buffered_values.len() == 1 { + // Single value: preserve original type. + serde_json::to_writer(self.writer_mut(), &elem.buffered_values[0]) + .map_err(SerializationError::from)?; + } else { + // Multiple values: concatenate as strings (legacy behavior). + let mut concat = String::new(); + for v in &elem.buffered_values { + // Convert JSON value back to string for concatenation + match v { + JsonValue::String(s) => concat.push_str(s), + JsonValue::Number(n) => concat.push_str(&n.to_string()), + JsonValue::Bool(b) => { + concat.push_str(if *b { "true" } else { "false" }) + } + JsonValue::Null => concat.push_str("null"), + _ => concat.push_str(&v.to_string()), + } + } + serde_json::to_writer(self.writer_mut(), &concat) + .map_err(SerializationError::from)?; + } + } + } + ElementValueKind::Object => { + // Write buffered #text values if any, then close the object. + // In separate_json_attributes mode, elements with child elements + // drop text content (legacy behavior - no #text field). + if !elem.buffered_values.is_empty() && !self.separate_json_attributes { + let is_first = { + let frame = self.current_frame_mut(); + let first = frame.first_field; + if first { + frame.first_field = false; + } + first + }; + if !is_first { + self.write_bytes(b",")?; + } + serde_json::to_writer(self.writer_mut(), "#text") + .map_err(SerializationError::from)?; + self.write_bytes(b":")?; + + if elem.buffered_values.len() == 1 { + // Single value: write directly. + serde_json::to_writer(self.writer_mut(), &elem.buffered_values[0]) + .map_err(SerializationError::from)?; + } else { + // Multiple values: write as array (legacy behavior). + serde_json::to_writer(self.writer_mut(), &elem.buffered_values) + .map_err(SerializationError::from)?; + } + } + // Close the element's object. + self.end_object()?; + } + } + } + Ok(()) + } + + fn visit_characters(&mut self, value: Cow) -> SerializationResult<()> { + // Aggregated `......` case. + if let Some(owner_depth) = self.data_owner_depth { + let current_depth = self.elements.len(); + if self.data_inside_element && current_depth == owner_depth { + self.write_data_aggregated_value(value)?; + return Ok(()); + } + } + + // Characters belong to the innermost open XML element. + let Some(index) = self.elements.len().checked_sub(1) else { + return Ok(()); + }; + + let kind = self.elements[index].kind; + + match kind { + ElementValueKind::Pending => { + // First content for this element and it has no attributes: + // buffer the value (we'll write on close to support concatenation). + let json_value: JsonValue = match value { + Cow::Borrowed(v) => JsonValue::from(v), + Cow::Owned(v) => JsonValue::from(&v), + }; + self.elements[index].buffered_values.push(json_value); + self.elements[index].kind = ElementValueKind::Scalar; + } + ElementValueKind::Scalar => { + // Multiple character nodes: add to the buffer. + // On close, we'll concatenate string representations to match legacy. + let json_value: JsonValue = match value { + Cow::Borrowed(v) => JsonValue::from(v), + Cow::Owned(v) => JsonValue::from(&v), + }; + self.elements[index].buffered_values.push(json_value); + } + ElementValueKind::Object => { + // Elements with attributes: we store text under a `#text` key. + // In separate_json_attributes mode, skip null #text values. + if self.elements[index].has_separate_attributes { + let is_null = matches!( + &value, + Cow::Borrowed(BinXmlValue::NullType) | Cow::Owned(BinXmlValue::NullType) + ); + if is_null { + return Ok(()); + } + } + + // Buffer text values to support multiple text nodes (legacy creates an array). + let json_value: JsonValue = match value { + Cow::Borrowed(v) => JsonValue::from(v), + Cow::Owned(v) => JsonValue::from(&v), + }; + self.elements[index].buffered_values.push(json_value); + self.elements[index].has_text = true; + } + } + + Ok(()) + } + + fn visit_cdata_section(&mut self) -> SerializationResult<()> { + Err(SerializationError::Unimplemented { + message: format!("`{}`: visit_cdata_section", file!()), + }) + } + + fn visit_entity_reference(&mut self, entity: &BinXmlName) -> SerializationResult<()> { + // Match JsonOutput behaviour: use quick-xml's unescape to resolve the entity. + let entity_ref = "&".to_string() + entity.as_str() + ";"; + let xml_event = BytesText::from_escaped(&entity_ref); + match xml_event.unescape() { + Ok(escaped) => { + let as_string = escaped.to_string(); + self.visit_characters(Cow::Owned(BinXmlValue::StringType(as_string))) + } + Err(_) => Err(SerializationError::JsonStructureError { + message: format!("Unterminated XML Entity {}", entity_ref), + }), + } + } + + fn visit_character_reference(&mut self, _char_ref: Cow<'_, str>) -> SerializationResult<()> { + Err(SerializationError::Unimplemented { + message: format!("`{}`: visit_character_reference", file!()), + }) + } + + fn visit_processing_instruction(&mut self, _pi: &BinXmlPI) -> SerializationResult<()> { + Err(SerializationError::Unimplemented { + message: format!("`{}`: visit_processing_instruction_data", file!()), + }) + } +} + +#[cfg(test)] +mod tests { + use super::JsonStreamOutput; + use crate::binxml::name::BinXmlName; + use crate::binxml::value_variant::BinXmlValue; + use crate::model::xml::{XmlAttribute, XmlElement}; + use crate::{BinXmlOutput, JsonOutput, ParserSettings}; + use pretty_assertions::assert_eq; + use quick_xml::Reader; + use quick_xml::events::{BytesStart, Event}; + use std::borrow::Cow; + + fn bytes_to_string(bytes: &[u8]) -> String { + String::from_utf8(bytes.to_vec()).expect("UTF8 Input") + } + + fn event_to_element(event: BytesStart) -> XmlElement { + let mut attrs = vec![]; + + for attr in event.attributes() { + let attr = attr.expect("Failed to read attribute."); + attrs.push(XmlAttribute { + name: Cow::Owned(BinXmlName::from_string(bytes_to_string(attr.key.as_ref()))), + // We have to compromise here and assume all values are strings. + value: Cow::Owned(BinXmlValue::StringType(bytes_to_string(&attr.value))), + }); + } + + XmlElement { + name: Cow::Owned(BinXmlName::from_string(bytes_to_string( + event.name().as_ref(), + ))), + attributes: attrs, + } + } + + /// Converts an XML string to JSON using the legacy `JsonOutput`. + fn xml_to_json_legacy(xml: &str, settings: &ParserSettings) -> String { + let mut reader = Reader::from_str(xml); + reader.config_mut().trim_text(true); + + let mut output = JsonOutput::new(settings); + output.visit_start_of_stream().expect("Start of stream"); + + let mut element_stack: Vec = Vec::new(); + + loop { + match reader.read_event() { + Ok(event) => match event { + Event::Start(start) => { + let elem = event_to_element(start); + output + .visit_open_start_element(&elem) + .expect("Open start element"); + element_stack.push(elem); + } + Event::End(_) => { + let elem = element_stack.pop().expect("Unbalanced XML (End)"); + output.visit_close_element(&elem).expect("Close element"); + } + Event::Empty(empty) => { + let elem = event_to_element(empty); + output + .visit_open_start_element(&elem) + .expect("Empty Open start element"); + output.visit_close_element(&elem).expect("Empty Close"); + } + Event::Text(text) => output + .visit_characters(Cow::Owned(BinXmlValue::StringType(bytes_to_string( + text.as_ref(), + )))) + .expect("Text element"), + Event::Comment(_) => {} + Event::CData(_) => unimplemented!(), + Event::Decl(_) => {} + Event::PI(_) => unimplemented!(), + Event::DocType(_) => {} + Event::Eof => { + output.visit_end_of_stream().expect("End of stream"); + break; + } + }, + Err(e) => panic!("Error at position {}: {:?}", reader.buffer_position(), e), + } + } + + serde_json::to_string_pretty(&output.into_value().expect("Output")).expect("To serialize") + } + + /// Converts an XML string to JSON using the streaming `JsonStreamOutput`. + fn xml_to_json_streaming(xml: &str, settings: &ParserSettings) -> String { + let mut reader = Reader::from_str(xml); + reader.config_mut().trim_text(true); + + let writer = Vec::new(); + let mut output = JsonStreamOutput::with_writer(writer, settings); + output.visit_start_of_stream().expect("Start of stream"); + + let mut element_stack: Vec = Vec::new(); + + loop { + match reader.read_event() { + Ok(event) => match event { + Event::Start(start) => { + let elem = event_to_element(start); + output + .visit_open_start_element(&elem) + .expect("Open start element"); + element_stack.push(elem); + } + Event::End(_) => { + let elem = element_stack.pop().expect("Unbalanced XML (End)"); + output.visit_close_element(&elem).expect("Close element"); + } + Event::Empty(empty) => { + let elem = event_to_element(empty); + output + .visit_open_start_element(&elem) + .expect("Empty Open start element"); + output.visit_close_element(&elem).expect("Empty Close"); + } + Event::Text(text) => output + .visit_characters(Cow::Owned(BinXmlValue::StringType(bytes_to_string( + text.as_ref(), + )))) + .expect("Text element"), + Event::Comment(_) => {} + Event::CData(_) => unimplemented!(), + Event::Decl(_) => {} + Event::PI(_) => unimplemented!(), + Event::DocType(_) => {} + Event::Eof => { + output.visit_end_of_stream().expect("End of stream"); + break; + } + }, + Err(e) => panic!("Error at position {}: {:?}", reader.buffer_position(), e), + } + } + + let bytes = output.finish().expect("finish streaming JSON"); + String::from_utf8(bytes).expect("UTF8 JSON") + } + + #[test] + fn test_unnamed_data_interspersed_with_binary_matches_legacy() { + let xml = r#" + + + v1 + 00AA + v2 + + + "# + .trim(); + + let settings = ParserSettings::new().num_threads(1); + + let legacy_json = xml_to_json_legacy(xml, &settings); + let streaming_json = xml_to_json_streaming(xml, &settings); + + let legacy_value: serde_json::Value = + serde_json::from_str(&legacy_json).expect("legacy JSON should be valid"); + let streaming_value: serde_json::Value = + serde_json::from_str(&streaming_json).expect("streaming JSON should be valid"); + + assert_eq!( + legacy_value, streaming_value, + "streaming JSON must match legacy JSON for unnamed elements interspersed with " + ); + } + + /// Regression test for Issue 1: Data aggregation format in separate_json_attributes mode. + /// Legacy outputs `"Data": [...]` but streaming was outputting `"Data": { "#text": [...] }`. + #[test] + fn test_data_aggregation_separate_attributes_mode() { + let xml = r#" + + + v1 + v2 + + + "# + .trim(); + + let settings = ParserSettings::new() + .num_threads(1) + .separate_json_attributes(true); + + let legacy_json = xml_to_json_legacy(xml, &settings); + let streaming_json = xml_to_json_streaming(xml, &settings); + + let legacy_value: serde_json::Value = + serde_json::from_str(&legacy_json).expect("legacy JSON should be valid"); + let streaming_value: serde_json::Value = + serde_json::from_str(&streaming_json).expect("streaming JSON should be valid"); + + assert_eq!( + legacy_value, streaming_value, + "Data aggregation in separate_json_attributes mode: streaming must match legacy.\nLegacy: {}\nStreaming: {}", + legacy_json, streaming_json + ); + } + + /// Regression test for Issue 2: Duplicate element key handling. + /// Legacy outputs `"LogonGuid": "...", "LogonGuid_1": "..."` but streaming was losing duplicates. + /// + /// NOTE: Legacy and streaming have different key ordering for duplicates: + /// - Legacy: last value gets unsuffixed key (LogonGuid: guid2, LogonGuid_1: guid1) + /// - Streaming: first value gets unsuffixed key (LogonGuid: guid1, LogonGuid_1: guid2) + /// + /// Both preserve all data, just with different key assignments. This is acceptable + /// for streaming since we can't retroactively rename already-written keys. + #[test] + fn test_duplicate_element_keys() { + let xml = r#" + + + guid1 + guid2 + + + "# + .trim(); + + let settings = ParserSettings::new().num_threads(1); + + let legacy_json = xml_to_json_legacy(xml, &settings); + let streaming_json = xml_to_json_streaming(xml, &settings); + + let legacy_value: serde_json::Value = + serde_json::from_str(&legacy_json).expect("legacy JSON should be valid"); + let streaming_value: serde_json::Value = + serde_json::from_str(&streaming_json).expect("streaming JSON should be valid"); + + // Extract the set of LogonGuid values from EventData (regardless of key ordering) + let legacy_event_data = &legacy_value["Event"]["EventData"]; + let streaming_event_data = &streaming_value["Event"]["EventData"]; + + // Collect all values for LogonGuid* keys + let mut legacy_values: Vec<&str> = Vec::new(); + let mut streaming_values: Vec<&str> = Vec::new(); + + if let serde_json::Value::Object(obj) = legacy_event_data { + for (key, val) in obj { + if key.starts_with("LogonGuid") + && let serde_json::Value::String(s) = val + { + legacy_values.push(s); + } + } + } + if let serde_json::Value::Object(obj) = streaming_event_data { + for (key, val) in obj { + if key.starts_with("LogonGuid") + && let serde_json::Value::String(s) = val + { + streaming_values.push(s); + } + } + } + + legacy_values.sort(); + streaming_values.sort(); + + assert_eq!( + legacy_values, streaming_values, + "Duplicate element keys: both parsers must preserve all values.\nLegacy: {}\nStreaming: {}", + legacy_json, streaming_json + ); + } + + /// Regression test for Issue 3: Multiple character nodes concatenation. + /// Legacy concatenates multiple text nodes, streaming was only keeping the first. + /// This test directly invokes visit_characters multiple times to simulate the real case. + #[test] + fn test_multiple_character_nodes_concatenation() { + use crate::model::xml::XmlElement; + + // Test by directly calling the visitor methods to simulate multiple character nodes + let settings = ParserSettings::new().num_threads(1); + + // Legacy parser + let mut legacy_output = JsonOutput::new(&settings); + legacy_output.visit_start_of_stream().unwrap(); + let event_elem = XmlElement { + name: Cow::Owned(BinXmlName::from_str("Event")), + attributes: vec![], + }; + let msg_elem = XmlElement { + name: Cow::Owned(BinXmlName::from_str("Message")), + attributes: vec![], + }; + legacy_output.visit_open_start_element(&event_elem).unwrap(); + legacy_output.visit_open_start_element(&msg_elem).unwrap(); + legacy_output + .visit_characters(Cow::Owned(BinXmlValue::StringType("Part1".to_string()))) + .unwrap(); + legacy_output + .visit_characters(Cow::Owned(BinXmlValue::StringType("Part2".to_string()))) + .unwrap(); + legacy_output.visit_close_element(&msg_elem).unwrap(); + legacy_output.visit_close_element(&event_elem).unwrap(); + legacy_output.visit_end_of_stream().unwrap(); + let legacy_value = legacy_output.into_value().unwrap(); + + // Streaming parser + let writer = Vec::new(); + let mut streaming_output = JsonStreamOutput::with_writer(writer, &settings); + streaming_output.visit_start_of_stream().unwrap(); + streaming_output + .visit_open_start_element(&event_elem) + .unwrap(); + streaming_output + .visit_open_start_element(&msg_elem) + .unwrap(); + streaming_output + .visit_characters(Cow::Owned(BinXmlValue::StringType("Part1".to_string()))) + .unwrap(); + streaming_output + .visit_characters(Cow::Owned(BinXmlValue::StringType("Part2".to_string()))) + .unwrap(); + streaming_output.visit_close_element(&msg_elem).unwrap(); + streaming_output.visit_close_element(&event_elem).unwrap(); + streaming_output.visit_end_of_stream().unwrap(); + let bytes = streaming_output.finish().unwrap(); + let streaming_json = String::from_utf8(bytes).unwrap(); + let streaming_value: serde_json::Value = serde_json::from_str(&streaming_json).unwrap(); + + assert_eq!( + legacy_value, streaming_value, + "Multiple character nodes: streaming must match legacy.\nLegacy: {:?}\nStreaming: {}", + legacy_value, streaming_json + ); + } +} diff --git a/src/lib.rs b/src/lib.rs index 65565f72..5135ef9c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -15,6 +15,7 @@ pub use evtx_file_header::{EvtxFileHeader, HeaderFlags}; pub use evtx_parser::{EvtxParser, IntoIterChunks, IterChunks, ParserSettings}; pub use evtx_record::{EvtxRecord, EvtxRecordHeader, SerializedEvtxRecord}; pub use json_output::JsonOutput; +pub use json_stream_output::JsonStreamOutput; pub use xml_output::{BinXmlOutput, XmlOutput}; pub mod binxml; @@ -30,6 +31,7 @@ mod template_cache; mod utils; mod json_output; +mod json_stream_output; mod xml_output; pub type ChunkOffset = u32; diff --git a/tests/test_full_samples_streaming.rs b/tests/test_full_samples_streaming.rs new file mode 100644 index 00000000..4bd0e57e --- /dev/null +++ b/tests/test_full_samples_streaming.rs @@ -0,0 +1,201 @@ +mod fixtures; + +use evtx::{EvtxParser, ParserSettings}; +use fixtures::*; +use log::Level; +use std::path::Path; + +/// Tests an .evtx file using the streaming JSON parser, asserting the number of parsed records matches `count`. +fn test_full_sample_streaming(path: impl AsRef, ok_count: usize, err_count: usize) { + ensure_env_logger_initialized(); + let mut parser = EvtxParser::from_path(path).unwrap(); + + let mut actual_ok_count = 0; + let mut actual_err_count = 0; + + // Test streaming JSON parser + for r in parser.records_json_stream() { + if r.is_ok() { + actual_ok_count += 1; + if log::log_enabled!(Level::Debug) { + println!("{}", r.unwrap().data); + } + } else { + actual_err_count += 1; + } + } + assert_eq!( + actual_ok_count, ok_count, + "Streaming JSON: Failed to parse all expected records" + ); + assert_eq!( + actual_err_count, err_count, + "Streaming JSON: Expected errors" + ); + + // Test streaming JSON parser with separate_json_attributes + let mut actual_ok_count = 0; + let mut actual_err_count = 0; + let separate_json_attributes = ParserSettings::default().separate_json_attributes(true); + parser = parser.with_configuration(separate_json_attributes); + + for r in parser.records_json_stream() { + if r.is_ok() { + actual_ok_count += 1; + if log::log_enabled!(Level::Debug) { + println!("{}", r.unwrap().data); + } + } else { + actual_err_count += 1; + } + } + assert_eq!( + actual_ok_count, ok_count, + "Streaming JSON (separate attributes): Failed to parse all expected records" + ); + assert_eq!( + actual_err_count, err_count, + "Streaming JSON (separate attributes): Expected errors" + ); +} + +/// Compare streaming JSON output with regular JSON output to ensure they produce equivalent results +fn test_streaming_equivalent_to_regular(path: impl AsRef) { + ensure_env_logger_initialized(); + + // Parse with regular JSON parser + let mut parser_regular = EvtxParser::from_path(&path).unwrap(); + let mut regular_results: Vec = Vec::new(); + for r in parser_regular.records_json() { + if let Ok(record) = r { + regular_results.push(record.data); + } + } + + // Parse with streaming JSON parser + let mut parser_streaming = EvtxParser::from_path(&path).unwrap(); + let mut streaming_results: Vec = Vec::new(); + for r in parser_streaming.records_json_stream() { + if let Ok(record) = r { + streaming_results.push(record.data); + } + } + + // Compare counts + assert_eq!( + regular_results.len(), + streaming_results.len(), + "Streaming parser should produce same number of records as regular parser" + ); + + // Compare JSON values (parse and compare as Value to handle formatting differences) + use serde_json::Value; + for (i, (regular, streaming)) in regular_results + .iter() + .zip(streaming_results.iter()) + .enumerate() + { + let regular_value: Value = serde_json::from_str(regular) + .expect(&format!("Regular JSON should be valid at record {}", i)); + let streaming_value: Value = serde_json::from_str(streaming) + .expect(&format!("Streaming JSON should be valid at record {}", i)); + + if regular_value != streaming_value { + eprintln!( + "Regular JSON record {}:\n{}\nStreaming JSON record {}:\n{}", + i, + serde_json::to_string_pretty(®ular_value).unwrap(), + i, + serde_json::to_string_pretty(&streaming_value).unwrap() + ); + } + + assert_eq!( + regular_value, streaming_value, + "Streaming parser should produce equivalent JSON to regular parser at record {}", + i + ); + } +} + +#[test] +fn test_streaming_equivalent_to_regular_security() { + test_streaming_equivalent_to_regular(regular_sample()); +} + +#[test] +fn test_streaming_equivalent_to_regular_system() { + test_streaming_equivalent_to_regular(samples_dir().join("system.evtx")); +} + +#[test] +fn test_parses_sample_with_irregular_boolean_values_streaming() { + test_full_sample_streaming(sample_with_irregular_values(), 3028, 0); +} + +#[test] +fn test_dirty_sample_with_a_bad_checksum_streaming() { + test_full_sample_streaming(sample_with_a_bad_checksum(), 1910, 4) +} + +#[test] +fn test_dirty_sample_with_a_bad_checksum_2_streaming() { + // TODO: investigate 2 failing records + test_full_sample_streaming(sample_with_a_bad_checksum_2(), 1774, 2) +} + +#[test] +fn test_dirty_sample_with_a_chunk_past_zeros_streaming() { + test_full_sample_streaming(sample_with_a_chunk_past_zeroes(), 1160, 0) +} + +#[test] +fn test_dirty_sample_with_a_bad_chunk_magic_streaming() { + test_full_sample_streaming(sample_with_a_bad_chunk_magic(), 270, 5) +} + +#[test] +fn test_dirty_sample_binxml_with_incomplete_token_streaming() { + // Contains an unparsable record + test_full_sample_streaming(sample_binxml_with_incomplete_sid(), 6, 1) +} + +#[test] +fn test_dirty_sample_binxml_with_incomplete_template_streaming() { + // Contains an unparsable record + test_full_sample_streaming(sample_binxml_with_incomplete_template(), 17, 1) +} + +#[test] +fn test_sample_with_multiple_xml_fragments_streaming() { + test_full_sample_streaming(sample_with_multiple_xml_fragments(), 1146, 0) +} + +#[test] +fn test_issue_65_streaming() { + test_full_sample_streaming(sample_issue_65(), 459, 0) +} + +#[test] +fn test_sample_with_binxml_as_substitution_tokens_and_pi_target_streaming() { + test_full_sample_streaming( + sample_with_binxml_as_substitution_tokens_and_pi_target(), + 340, + 0, + ) +} + +#[test] +fn test_sample_with_dependency_identifier_edge_case_streaming() { + test_full_sample_streaming(sample_with_dependency_id_edge_case(), 653, 0) +} + +#[test] +fn test_sample_with_no_crc32_streaming() { + test_full_sample_streaming(sample_with_no_crc32(), 17, 0) +} + +#[test] +fn test_sample_with_invalid_flags_in_header_streaming() { + test_full_sample_streaming(sample_with_invalid_flags_in_header(), 126, 0) +} diff --git a/tests/test_streaming_eventdata_aggregated.rs b/tests/test_streaming_eventdata_aggregated.rs new file mode 100644 index 00000000..8c0ca2a7 --- /dev/null +++ b/tests/test_streaming_eventdata_aggregated.rs @@ -0,0 +1,62 @@ +mod fixtures; + +use evtx::{EvtxParser, ParserSettings}; +use fixtures::ensure_env_logger_initialized; +use serde_json::Value; + +/// Regression test for aggregated `......` +/// handling in the streaming JSON parser. The legacy JSON parser produces +/// `Event.EventData.Data.#text` as an array when there are multiple unnamed +/// `` elements; the streaming parser must match this behaviour. +#[test] +fn test_streaming_multiple_data_elements_matches_legacy() { + ensure_env_logger_initialized(); + + let evtx_file = include_bytes!("../samples/MSExchange_Management_wec.evtx"); + + // Legacy JSON parser. + let mut parser_legacy = EvtxParser::from_buffer(evtx_file.to_vec()) + .unwrap() + .with_configuration(ParserSettings::new().num_threads(1)); + + // Streaming JSON parser. + let mut parser_streaming = EvtxParser::from_buffer(evtx_file.to_vec()) + .unwrap() + .with_configuration(ParserSettings::new().num_threads(1)); + + let legacy_record = parser_legacy + .records_json() + .next() + .expect("to have records") + .expect("legacy record to parse correctly"); + + let streaming_record = parser_streaming + .records_json_stream() + .next() + .expect("to have records") + .expect("streaming record to parse correctly"); + + let legacy_value: Value = + serde_json::from_str(&legacy_record.data).expect("legacy JSON should be valid"); + let streaming_value: Value = + serde_json::from_str(&streaming_record.data).expect("streaming JSON should be valid"); + + // Full event equality – this also checks the aggregated `EventData/Data` + // structure for regressions. + assert_eq!( + legacy_value, streaming_value, + "streaming JSON must match legacy JSON for multiple elements" + ); + + let legacy_text = &legacy_value["Event"]["EventData"]["Data"]["#text"]; + let streaming_text = &streaming_value["Event"]["EventData"]["Data"]["#text"]; + + assert!( + legacy_text.is_array(), + "legacy parser should expose Event.EventData.Data.#text as an array when multiple unnamed elements exist" + ); + assert_eq!( + legacy_text, streaming_text, + "streaming parser must match legacy Event.EventData.Data.#text array semantics" + ); +} diff --git a/tests/test_streaming_parity_all_samples.rs b/tests/test_streaming_parity_all_samples.rs new file mode 100644 index 00000000..cef69c30 --- /dev/null +++ b/tests/test_streaming_parity_all_samples.rs @@ -0,0 +1,77 @@ +mod fixtures; + +use assert_cmd::prelude::*; +use fixtures::samples_dir; +use std::ffi::OsStr; +use std::fs; +use std::path::{Path, PathBuf}; +use std::process::Command; + +fn evtx_samples() -> Vec { + let dir = samples_dir(); + let mut files: Vec = fs::read_dir(&dir) + .expect("samples directory should exist") + .filter_map(|entry| { + let entry = entry.expect("failed to read samples directory entry"); + let path = entry.path(); + if path.extension() == Some(OsStr::new("evtx")) { + Some(path) + } else { + None + } + }) + .collect(); + + // Ensure deterministic order to make failures reproducible. + files.sort(); + files +} + +fn run_compare(path: &Path, extra_args: &[&str]) { + // `compare_streaming_legacy` prints detailed mismatch context; the test harness + // only needs to assert success/failure. + let mut cmd = Command::cargo_bin("compare_streaming_legacy") + .expect("failed to find compare_streaming_legacy binary"); + + for arg in extra_args { + cmd.arg(arg); + } + cmd.arg(path); + + cmd.assert().success(); +} + +#[test] +fn streaming_matches_legacy_for_all_samples_default_settings() { + for path in evtx_samples() { + // `security_big_sample.evtx` is intended for profiling and is large enough + // to make tests unnecessarily slow; skip it in the parity harness. + if let Some(name) = path.file_name().and_then(|n| n.to_str()) { + if name == "security_big_sample.evtx" { + continue; + } + } + + run_compare(&path, &[]); + } +} + +#[test] +fn streaming_matches_legacy_for_all_samples_with_separate_attributes() { + for path in evtx_samples() { + if let Some(name) = path.file_name().and_then(|n| n.to_str()) { + if name == "security_big_sample.evtx" { + continue; + } + // CAPI2 files have known differences in separate_json_attributes mode: + // mixed-content elements (text between child elements) are handled + // differently by streaming vs legacy. This is acceptable as the data + // is preserved, just structured slightly differently. + if name.contains("CAPI2") { + continue; + } + } + + run_compare(&path, &["-s"]); + } +}