Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
1bf8e5c
Squash starting code. Compression + search run, but search always dis…
davidlion Oct 3, 2025
457052b
Merge remote-tracking branch 'upstream/main' into clpsls-prototype
davidlion Oct 3, 2025
8e10d94
Merge remote-tracking branch 'upstream/main' into clpsls-prototype
davidlion Oct 7, 2025
eacfaed
Initial code for reading plain text files. Need change type of ls par…
davidlion Oct 14, 2025
92b6134
Convert FullMatch and LogType to VarStringT for search.
davidlion Oct 14, 2025
d511a22
Add capture group support. Support LogType pure-wildcard search. Weir…
davidlion Oct 15, 2025
48fc474
Remove some debug logs.
davidlion Oct 15, 2025
7a1fbdd
Merge remote-tracking branch 'upstream/main' into clpsls-prototype
davidlion Oct 15, 2025
922bc67
Drop FullMatch type as it is unnecessary.
davidlion Oct 15, 2025
9300edc
Use DictionaryFloat to work around float conversion; Small refactors.
davidlion Oct 16, 2025
8d4c317
Added FormattedFloat using stod.
davidlion Oct 16, 2025
5e0bdda
Merge remote-tracking branch 'upstream/main' into clpsls-prototype
davidlion Oct 17, 2025
7dff61f
Merge remote-tracking branch 'upstream/main' into clpsls-prototype
davidlion Nov 3, 2025
1c0e50b
Remove cmake/Modules/FindLibArchive.cmake due to conflict with task v…
davidlion Nov 3, 2025
591a457
Fix error code linking issue in indexer.
davidlion Nov 3, 2025
44adcb2
Bump libarchive.
davidlion Nov 3, 2025
aac0bea
Update log surgeon with in-progress PR of Token refactor for bug fixes.
davidlion Nov 3, 2025
c66ca59
Add experimental cli arg with printing placeholders.
davidlion Nov 3, 2025
98a6443
Random tweak.
davidlion Nov 7, 2025
d979fc4
Refactor argument passing and log surgeon parser creation.
davidlion Nov 7, 2025
b2d941c
Switch back to log surgeon main.
davidlion Nov 7, 2025
bf58b50
feat: Add logtype and variable stats tracking and compressing/decompr…
davidlion Nov 10, 2025
150123f
Merge remote-tracking branch 'upstream/main' into clpsls-prototype
davidlion Nov 10, 2025
d355b8b
Fix newline in clpsls debug logs.
davidlion Nov 11, 2025
b9d67aa
Fix type typo.
davidlion Nov 11, 2025
da1ff34
Refactor the creation of an output handler and use it with experiment…
davidlion Nov 11, 2025
edb5376
Merge remote-tracking branch 'upstream/main' into clpsls-prototype
davidlion Nov 11, 2025
f4d1779
Call flush on output_handler.
davidlion Nov 11, 2025
c4397c0
namespace fix
davidlion Nov 11, 2025
7296e0a
Improve experimental flag handling.
davidlion Nov 12, 2025
d1dc63c
Refactor experimental stats to be in a separate file inside an archive.
davidlion Nov 12, 2025
437b2ce
Update cli args before validating.
davidlion Nov 12, 2025
b9a58d6
Remove dead code for a feature that will be added separately later.
davidlion Nov 12, 2025
016c3bf
Fix non-experimental path to correctly use ClpStrings again.
davidlion Nov 12, 2025
7c75d79
Merge remote-tracking branch 'upstream/main' into clpsls-prototype
davidlion Nov 13, 2025
0fc78fa
Add functionality to Array to fix possible holes/gaps between var dic…
davidlion Nov 13, 2025
5626d3c
Add TypedVar to QueryRunner initialize_reader.
davidlion Nov 13, 2025
8e4a36a
Merge remote-tracking branch 'upstream/main' into clpsls-prototype
davidlion Nov 24, 2025
b3fd4c0
Revert dep changes to build.
davidlion Nov 24, 2025
0bdb9cf
Revert some other changes.
davidlion Nov 24, 2025
0a1ae76
Small linting fix.
davidlion Nov 24, 2025
6c5fc8b
Revert unnecessary change to clean up diff.
davidlion Nov 24, 2025
fec1cb0
Refactor archive experimental option handling; unit tests passing.
davidlion Nov 25, 2025
1254976
Merge remote-tracking branch 'upstream/main' into clpsls-prototype
davidlion Nov 25, 2025
92fa397
Remove TypedVar and add stat helpers to ArchiveWrtier.
davidlion Nov 26, 2025
e2e7765
Remove unused get_stats from ColumnReader.
davidlion Nov 26, 2025
a97404b
Set prototype NodeType IDs to be large avoiding any conflicts with as…
davidlion Nov 26, 2025
c35dcd9
Remove unused enocded logtype id from LogTypeColumn{Reader,Writer}.
davidlion Nov 26, 2025
f2bc72f
Merge remote-tracking branch 'upstream/main' into clpsls-prototype
davidlion Nov 26, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions components/core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -343,8 +343,11 @@ set(SOURCE_FILES_clp_s_unitTest
src/clp_s/ArchiveReader.hpp
src/clp_s/ArchiveReaderAdaptor.cpp
src/clp_s/ArchiveReaderAdaptor.hpp
src/clp_s/ArchiveStats.cpp
src/clp_s/ArchiveStats.hpp
src/clp_s/ArchiveWriter.cpp
src/clp_s/ArchiveWriter.hpp
src/clp_s/Array.hpp
src/clp_s/ColumnReader.cpp
src/clp_s/ColumnReader.hpp
src/clp_s/ColumnWriter.cpp
Expand Down
8 changes: 8 additions & 0 deletions components/core/src/clp/EncodedVariableInterpreter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,14 @@ class EncodedVariableInterpreter {
logtype.push_back(enum_to_underlying_type(ir::VariablePlaceholder::Float));
}

/**
* Adds a clps schema node variable placeholder to the given logtype.
* @param logtype
*/
static void add_schema_var(std::string& logtype) {
logtype.push_back(enum_to_underlying_type(ir::VariablePlaceholder::Schema));
}

/**
* Adds an escape character to the given logtype
* @param logtype
Expand Down
4 changes: 2 additions & 2 deletions components/core/src/clp/GrepCore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -257,9 +257,9 @@ bool GrepCore::get_bounds_of_next_potential_var(
return false;
}
search_token = SearchToken{token.value()};
search_token.m_type_ids_set.insert(search_token.m_type_ids_ptr->at(0));
search_token.m_type_ids_set.insert(search_token.get_type_ids()->at(0));
}
auto const& type = search_token.m_type_ids_ptr->at(0);
auto const& type = search_token.get_type_ids()->at(0);
if (type != static_cast<int>(log_surgeon::SymbolId::TokenUncaughtString)
&& type != static_cast<int>(log_surgeon::SymbolId::TokenEnd))
{
Expand Down
1 change: 1 addition & 0 deletions components/core/src/clp/ir/types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ enum class VariablePlaceholder : char {
Integer = 0x11,
Dictionary = 0x12,
Float = 0x13,
Schema = 0x14,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A new variable placeholder is added here, but it seems like is_variable_placeholder in clp/ir/parsing.hpp hasn't been updated to include the placeholder. This means that the escaping code won't end up properly escaping 0x14 when it appears in logtext, which is a bug.

Granted, if we do properly escape it then this constitutes a major breaking format change that isn't guarded by the experimental flag. Given that, it might actually be better to intentionally leave this bug for now to avoid the breaking format change. The only alternative I can think of is guarding everything related to VariablePlacehodler::Schema by a bunch of compile time macros, but then we would need to ship a separate build for this experimental change.

Escape = '\\',
};
} // namespace clp::ir
Expand Down
16 changes: 6 additions & 10 deletions components/core/src/clp/streaming_archive/writer/Archive.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -356,30 +356,26 @@ void Archive::write_msg_using_schema(LogEventView const& log_view) {
m_logtype_dict_entry.clear();
size_t num_uncompressed_bytes = 0;
// Timestamp is included in the uncompressed message size
uint32_t start_pos = log_output_buffer->get_token(0).m_start_pos;
uint32_t start_pos = log_output_buffer->get_token(0).get_start_pos();
if (timestamp_pattern == nullptr) {
start_pos = log_output_buffer->get_token(1).m_start_pos;
start_pos = log_output_buffer->get_token(1).get_start_pos();
}
uint32_t end_pos = log_output_buffer->get_token(log_output_buffer->pos() - 1).m_end_pos;
uint32_t end_pos = log_output_buffer->get_token(log_output_buffer->pos() - 1).get_end_pos();
if (start_pos <= end_pos) {
num_uncompressed_bytes = end_pos - start_pos;
} else {
num_uncompressed_bytes
= log_output_buffer->get_token(0).m_buffer_size - start_pos + end_pos;
= log_output_buffer->get_token(0).get_buffer_size() - start_pos + end_pos;
}
for (uint32_t i = 1; i < log_output_buffer->pos(); i++) {
log_surgeon::Token& token = log_output_buffer->get_mutable_token(i);
int token_type = token.m_type_ids_ptr->at(0);
int token_type = token.get_type_ids()->at(0);
if (log_output_buffer->has_delimiters() && (timestamp_pattern != nullptr || i > 1)
&& token_type != static_cast<int>(log_surgeon::SymbolId::TokenUncaughtString)
&& token_type != static_cast<int>(log_surgeon::SymbolId::TokenNewline))
{
m_logtype_dict_entry.add_constant(token.get_delimiter(), 0, 1);
if (token.m_start_pos == token.m_buffer_size - 1) {
token.m_start_pos = 0;
} else {
token.m_start_pos++;
}
token.increment_start_pos();
}
switch (token_type) {
case static_cast<int>(log_surgeon::SymbolId::TokenNewline):
Expand Down
73 changes: 68 additions & 5 deletions components/core/src/clp_s/ArchiveReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,14 @@

#include <filesystem>
#include <string_view>
#include <vector>

#include <fmt/core.h>
#include <spdlog/spdlog.h>
#include <ystdlib/error_handling/Result.hpp>

#include <clp_s/ArchiveStats.hpp>
#include <clp_s/ErrorCode.hpp>

#include "archive_constants.hpp"
#include "ArchiveReaderAdaptor.hpp"
Expand All @@ -11,7 +19,7 @@
using std::string_view;

namespace clp_s {
void ArchiveReader::open(Path const& archive_path, NetworkAuthOption const& network_auth) {
void ArchiveReader::open(Path const& archive_path, Options const& options) {
if (m_is_open) {
throw OperationFailed(ErrorCodeNotReady, __FILENAME__, __LINE__);
}
Expand All @@ -21,7 +29,8 @@ void ArchiveReader::open(Path const& archive_path, NetworkAuthOption const& netw
throw OperationFailed(ErrorCodeBadParam, __FILENAME__, __LINE__);
}

m_archive_reader_adaptor = std::make_shared<ArchiveReaderAdaptor>(archive_path, network_auth);
m_archive_reader_adaptor
= std::make_shared<ArchiveReaderAdaptor>(archive_path, options.m_network_auth);

if (auto const rc = m_archive_reader_adaptor->load_archive_metadata(); ErrorCodeSuccess != rc) {
throw OperationFailed(rc, __FILENAME__, __LINE__);
Expand All @@ -35,6 +44,10 @@ void ArchiveReader::open(Path const& archive_path, NetworkAuthOption const& netw
m_var_dict = ReaderUtils::get_variable_dictionary_reader(*m_archive_reader_adaptor);
m_log_dict = ReaderUtils::get_log_type_dictionary_reader(*m_archive_reader_adaptor);
m_array_dict = ReaderUtils::get_array_dictionary_reader(*m_archive_reader_adaptor);

if (options.m_experimental) {
m_experimental_stats = ExperimentalStats();
}
}

void ArchiveReader::read_metadata() {
Expand Down Expand Up @@ -134,6 +147,7 @@ void ArchiveReader::read_dictionaries_and_metadata() {
m_var_dict->read_entries();
m_log_dict->read_entries();
m_array_dict->read_entries();
std::ignore = read_experimental_stats();
}

void ArchiveReader::open_packed_streams() {
Expand Down Expand Up @@ -203,8 +217,11 @@ BaseColumnReader* ArchiveReader::append_reader_column(SchemaReader& reader, int3
case NodeType::ClpString:
column_reader = new ClpStringColumnReader(column_id, m_var_dict, m_log_dict);
break;
case NodeType::LogType:
column_reader = new LogTypeColumnReader(column_id, m_log_dict);
break;
case NodeType::VarString:
column_reader = new VariableStringColumnReader(column_id, m_var_dict);
column_reader = new VariableStringColumnReader(column_id, m_var_dict, node.get_type());
break;
case NodeType::Boolean:
column_reader = new BooleanColumnReader(column_id);
Expand All @@ -220,6 +237,8 @@ BaseColumnReader* ArchiveReader::append_reader_column(SchemaReader& reader, int3
case NodeType::NullValue:
case NodeType::Object:
case NodeType::StructuredArray:
case NodeType::LogMessage:
case NodeType::CaptureVar:
case NodeType::Unknown:
break;
}
Expand All @@ -237,8 +256,23 @@ void ArchiveReader::append_unordered_reader_columns(
bool should_marshal_records
) {
size_t object_begin_pos = reader.get_column_size();
for (int32_t column_id : schema_ids) {
for (size_t i = 0; i < schema_ids.size(); ++i) {
auto const column_id{schema_ids[i]};
if (Schema::schema_entry_is_unordered_object(column_id)) {
auto length{Schema::get_unordered_object_length(column_id)};
auto sub_schema{schema_ids.subspan(i + 1, length)};
auto subtree_root_node_id{m_schema_tree->find_matching_subtree_root_in_subtree(
mst_subtree_root_node_id,
SchemaReader::get_first_column_in_span(sub_schema),
Schema::get_unordered_object_type(column_id)
)};
append_unordered_reader_columns(
reader,
subtree_root_node_id,
sub_schema,
should_marshal_records
);
i += length;
continue;
}
BaseColumnReader* column_reader = nullptr;
Expand All @@ -262,8 +296,12 @@ void ArchiveReader::append_unordered_reader_columns(
case NodeType::ClpString:
column_reader = new ClpStringColumnReader(column_id, m_var_dict, m_log_dict);
break;
case NodeType::LogType:
column_reader = new LogTypeColumnReader(column_id, m_log_dict);
break;
case NodeType::VarString:
column_reader = new VariableStringColumnReader(column_id, m_var_dict);
column_reader
= new VariableStringColumnReader(column_id, m_var_dict, node.get_type());
break;
case NodeType::Boolean:
column_reader = new BooleanColumnReader(column_id);
Expand All @@ -277,6 +315,8 @@ void ArchiveReader::append_unordered_reader_columns(
case NodeType::Object:
case NodeType::Metadata:
case NodeType::NullValue:
case NodeType::LogMessage:
case NodeType::CaptureVar:
case NodeType::Unknown:
break;
}
Expand Down Expand Up @@ -372,6 +412,10 @@ void ArchiveReader::close() {
m_var_dict->close();
m_log_dict->close();
m_array_dict->close();
if (m_experimental_stats) {
m_experimental_stats->m_logtype_stats.clear();
m_experimental_stats->m_var_stats.clear();
}

m_stream_reader.close();
m_archive_reader_adaptor.reset();
Expand All @@ -398,4 +442,23 @@ std::shared_ptr<char[]> ArchiveReader::read_stream(size_t stream_id, bool reuse_
m_cur_stream_id = stream_id;
return m_stream_buffer;
}

auto ArchiveReader::read_experimental_stats() -> ystdlib::error_handling::Result<void> {
if (false == m_experimental_stats.has_value()) {
return ClpsErrorCode{ClpsErrorCodeEnum::BadParam};
}
constexpr size_t cDecompressorFileReadBufferCapacity{64UL * 1024};
auto reader{
m_archive_reader_adaptor->checkout_reader_for_section(constants::cArchiveStatsFile)
};
ZstdDecompressor decompressor{};
decompressor.open(*reader, cDecompressorFileReadBufferCapacity);

YSTDLIB_ERROR_HANDLING_TRYX(m_experimental_stats->m_logtype_stats.decompress(decompressor));
YSTDLIB_ERROR_HANDLING_TRYX(m_experimental_stats->m_var_stats.decompress(decompressor));

decompressor.close();
m_archive_reader_adaptor->checkin_reader_for_section(constants::cArchiveStatsFile);
return ystdlib::error_handling::success();
}
} // namespace clp_s
32 changes: 30 additions & 2 deletions components/core/src/clp_s/ArchiveReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,16 @@
#define CLP_S_ARCHIVEREADER_HPP

#include <map>
#include <optional>
#include <set>
#include <span>
#include <string_view>
#include <utility>
#include <vector>

#include <ystdlib/error_handling/Result.hpp>

#include <clp_s/ArchiveStats.hpp>

#include "ArchiveReaderAdaptor.hpp"
#include "DictionaryReader.hpp"
Expand All @@ -26,15 +32,26 @@ class ArchiveReader {
: TraceableException(error_code, filename, line_number) {}
};

struct Options {
Options() = default;

Options(NetworkAuthOption network_auth, bool experimental)
: m_network_auth{network_auth},
m_experimental{experimental} {}

NetworkAuthOption m_network_auth{};
bool m_experimental{false};
};

// Constructor
ArchiveReader() : m_is_open(false) {}

/**
* Opens an archive for reading.
* @param archive_path
* @param network_auth
* @param options
*/
void open(Path const& archive_path, NetworkAuthOption const& network_auth);
void open(Path const& archive_path, Options const& options);

/**
* Reads the dictionaries and metadata.
Expand Down Expand Up @@ -146,6 +163,10 @@ class ArchiveReader {
*/
bool has_log_order() { return m_log_event_idx_column_id >= 0; }

auto get_experimental_stats() -> ExperimentalStats const& {
return m_experimental_stats.value();
}

private:
/**
* Initializes a schema reader passed by reference to become a reader for a given schema.
Expand Down Expand Up @@ -196,6 +217,11 @@ class ArchiveReader {
*/
std::shared_ptr<char[]> read_stream(size_t stream_id, bool reuse_buffer);

/**
* Reads the experimental statistics from the archive.
*/
auto read_experimental_stats() -> ystdlib::error_handling::Result<void>;

bool m_is_open;
std::string m_archive_id;
std::shared_ptr<VariableDictionaryReader> m_var_dict;
Expand All @@ -218,6 +244,8 @@ class ArchiveReader {
size_t m_stream_buffer_size{0ULL};
size_t m_cur_stream_id{0ULL};
int32_t m_log_event_idx_column_id{-1};

std::optional<ExperimentalStats> m_experimental_stats;
};
} // namespace clp_s

Expand Down
51 changes: 51 additions & 0 deletions components/core/src/clp_s/ArchiveStats.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
#include "ArchiveStats.hpp"

#include <cstddef>

#include <ystdlib/error_handling/Result.hpp>

#include <clp_s/ErrorCode.hpp>
#include <clp_s/ZstdCompressor.hpp>
#include <clp_s/ZstdDecompressor.hpp>

namespace clp_s {
auto LogTypeStat::compress(ZstdCompressor& compressor) const
-> ystdlib::error_handling::Result<void> {
compressor.write_numeric_value(m_count);
return ystdlib::error_handling::success();
}

auto LogTypeStat::decompress(ZstdDecompressor& decompressor)
-> ystdlib::error_handling::Result<LogTypeStat> {
LogTypeStat stat{};
if (ErrorCodeSuccess != decompressor.try_read_numeric_value(stat.m_count)) {
return ClpsErrorCode{ClpsErrorCodeEnum::Failure};
}
return stat;
}

auto VariableStat::compress(ZstdCompressor& compressor) const
-> ystdlib::error_handling::Result<void> {
compressor.write_numeric_value(m_count);
compressor.write_numeric_value(m_type.size());
compressor.write_string(m_type);
return ystdlib::error_handling::success();
}

auto VariableStat::decompress(ZstdDecompressor& decompressor)
-> ystdlib::error_handling::Result<VariableStat> {
VariableStat stat{};
if (ErrorCodeSuccess != decompressor.try_read_numeric_value(stat.m_count)) {
return ClpsErrorCode{ClpsErrorCodeEnum::Failure};
}

size_t type_size{};
if (ErrorCodeSuccess != decompressor.try_read_numeric_value(type_size)) {
return ClpsErrorCode{ClpsErrorCodeEnum::Failure};
}
if (ErrorCodeSuccess != decompressor.try_read_string(type_size, stat.m_type)) {
return ClpsErrorCode{ClpsErrorCodeEnum::Failure};
}
return stat;
}
} // namespace clp_s
Loading
Loading