diff --git a/components/core/CMakeLists.txt b/components/core/CMakeLists.txt index 1be2fb3363..31da939b59 100644 --- a/components/core/CMakeLists.txt +++ b/components/core/CMakeLists.txt @@ -343,8 +343,11 @@ set(SOURCE_FILES_clp_s_unitTest src/clp_s/ArchiveReader.hpp src/clp_s/ArchiveReaderAdaptor.cpp src/clp_s/ArchiveReaderAdaptor.hpp + src/clp_s/ArchiveStats.cpp + src/clp_s/ArchiveStats.hpp src/clp_s/ArchiveWriter.cpp src/clp_s/ArchiveWriter.hpp + src/clp_s/Array.hpp src/clp_s/ColumnReader.cpp src/clp_s/ColumnReader.hpp src/clp_s/ColumnWriter.cpp diff --git a/components/core/src/clp/EncodedVariableInterpreter.hpp b/components/core/src/clp/EncodedVariableInterpreter.hpp index 7834a9b968..d33ca46dcc 100644 --- a/components/core/src/clp/EncodedVariableInterpreter.hpp +++ b/components/core/src/clp/EncodedVariableInterpreter.hpp @@ -83,6 +83,14 @@ class EncodedVariableInterpreter { logtype.push_back(enum_to_underlying_type(ir::VariablePlaceholder::Float)); } + /** + * Adds a clps schema node variable placeholder to the given logtype. + * @param logtype + */ + static void add_schema_var(std::string& logtype) { + logtype.push_back(enum_to_underlying_type(ir::VariablePlaceholder::Schema)); + } + /** * Adds an escape character to the given logtype * @param logtype diff --git a/components/core/src/clp/GrepCore.cpp b/components/core/src/clp/GrepCore.cpp index 1a4bf499e2..b8abf5a980 100644 --- a/components/core/src/clp/GrepCore.cpp +++ b/components/core/src/clp/GrepCore.cpp @@ -257,9 +257,9 @@ bool GrepCore::get_bounds_of_next_potential_var( return false; } search_token = SearchToken{token.value()}; - search_token.m_type_ids_set.insert(search_token.m_type_ids_ptr->at(0)); + search_token.m_type_ids_set.insert(search_token.get_type_ids()->at(0)); } - auto const& type = search_token.m_type_ids_ptr->at(0); + auto const& type = search_token.get_type_ids()->at(0); if (type != static_cast(log_surgeon::SymbolId::TokenUncaughtString) && type != static_cast(log_surgeon::SymbolId::TokenEnd)) { diff --git a/components/core/src/clp/ir/types.hpp b/components/core/src/clp/ir/types.hpp index 2529ddc330..c50dea65c2 100644 --- a/components/core/src/clp/ir/types.hpp +++ b/components/core/src/clp/ir/types.hpp @@ -18,6 +18,7 @@ enum class VariablePlaceholder : char { Integer = 0x11, Dictionary = 0x12, Float = 0x13, + Schema = 0x14, Escape = '\\', }; } // namespace clp::ir diff --git a/components/core/src/clp/streaming_archive/writer/Archive.cpp b/components/core/src/clp/streaming_archive/writer/Archive.cpp index 3b8df0d73a..dadcf1b546 100644 --- a/components/core/src/clp/streaming_archive/writer/Archive.cpp +++ b/components/core/src/clp/streaming_archive/writer/Archive.cpp @@ -356,30 +356,26 @@ void Archive::write_msg_using_schema(LogEventView const& log_view) { m_logtype_dict_entry.clear(); size_t num_uncompressed_bytes = 0; // Timestamp is included in the uncompressed message size - uint32_t start_pos = log_output_buffer->get_token(0).m_start_pos; + uint32_t start_pos = log_output_buffer->get_token(0).get_start_pos(); if (timestamp_pattern == nullptr) { - start_pos = log_output_buffer->get_token(1).m_start_pos; + start_pos = log_output_buffer->get_token(1).get_start_pos(); } - uint32_t end_pos = log_output_buffer->get_token(log_output_buffer->pos() - 1).m_end_pos; + uint32_t end_pos = log_output_buffer->get_token(log_output_buffer->pos() - 1).get_end_pos(); if (start_pos <= end_pos) { num_uncompressed_bytes = end_pos - start_pos; } else { num_uncompressed_bytes - = log_output_buffer->get_token(0).m_buffer_size - start_pos + end_pos; + = log_output_buffer->get_token(0).get_buffer_size() - start_pos + end_pos; } for (uint32_t i = 1; i < log_output_buffer->pos(); i++) { log_surgeon::Token& token = log_output_buffer->get_mutable_token(i); - int token_type = token.m_type_ids_ptr->at(0); + int token_type = token.get_type_ids()->at(0); if (log_output_buffer->has_delimiters() && (timestamp_pattern != nullptr || i > 1) && token_type != static_cast(log_surgeon::SymbolId::TokenUncaughtString) && token_type != static_cast(log_surgeon::SymbolId::TokenNewline)) { m_logtype_dict_entry.add_constant(token.get_delimiter(), 0, 1); - if (token.m_start_pos == token.m_buffer_size - 1) { - token.m_start_pos = 0; - } else { - token.m_start_pos++; - } + token.increment_start_pos(); } switch (token_type) { case static_cast(log_surgeon::SymbolId::TokenNewline): diff --git a/components/core/src/clp_s/ArchiveReader.cpp b/components/core/src/clp_s/ArchiveReader.cpp index 712ef45e39..1c6b49f360 100644 --- a/components/core/src/clp_s/ArchiveReader.cpp +++ b/components/core/src/clp_s/ArchiveReader.cpp @@ -2,6 +2,14 @@ #include #include +#include + +#include +#include +#include + +#include +#include #include "archive_constants.hpp" #include "ArchiveReaderAdaptor.hpp" @@ -11,7 +19,7 @@ using std::string_view; namespace clp_s { -void ArchiveReader::open(Path const& archive_path, NetworkAuthOption const& network_auth) { +void ArchiveReader::open(Path const& archive_path, Options const& options) { if (m_is_open) { throw OperationFailed(ErrorCodeNotReady, __FILENAME__, __LINE__); } @@ -21,7 +29,8 @@ void ArchiveReader::open(Path const& archive_path, NetworkAuthOption const& netw throw OperationFailed(ErrorCodeBadParam, __FILENAME__, __LINE__); } - m_archive_reader_adaptor = std::make_shared(archive_path, network_auth); + m_archive_reader_adaptor + = std::make_shared(archive_path, options.m_network_auth); if (auto const rc = m_archive_reader_adaptor->load_archive_metadata(); ErrorCodeSuccess != rc) { throw OperationFailed(rc, __FILENAME__, __LINE__); @@ -35,6 +44,10 @@ void ArchiveReader::open(Path const& archive_path, NetworkAuthOption const& netw m_var_dict = ReaderUtils::get_variable_dictionary_reader(*m_archive_reader_adaptor); m_log_dict = ReaderUtils::get_log_type_dictionary_reader(*m_archive_reader_adaptor); m_array_dict = ReaderUtils::get_array_dictionary_reader(*m_archive_reader_adaptor); + + if (options.m_experimental) { + m_experimental_stats = ExperimentalStats(); + } } void ArchiveReader::read_metadata() { @@ -134,6 +147,7 @@ void ArchiveReader::read_dictionaries_and_metadata() { m_var_dict->read_entries(); m_log_dict->read_entries(); m_array_dict->read_entries(); + std::ignore = read_experimental_stats(); } void ArchiveReader::open_packed_streams() { @@ -203,8 +217,11 @@ BaseColumnReader* ArchiveReader::append_reader_column(SchemaReader& reader, int3 case NodeType::ClpString: column_reader = new ClpStringColumnReader(column_id, m_var_dict, m_log_dict); break; + case NodeType::LogType: + column_reader = new LogTypeColumnReader(column_id, m_log_dict); + break; case NodeType::VarString: - column_reader = new VariableStringColumnReader(column_id, m_var_dict); + column_reader = new VariableStringColumnReader(column_id, m_var_dict, node.get_type()); break; case NodeType::Boolean: column_reader = new BooleanColumnReader(column_id); @@ -220,6 +237,8 @@ BaseColumnReader* ArchiveReader::append_reader_column(SchemaReader& reader, int3 case NodeType::NullValue: case NodeType::Object: case NodeType::StructuredArray: + case NodeType::LogMessage: + case NodeType::CaptureVar: case NodeType::Unknown: break; } @@ -237,8 +256,23 @@ void ArchiveReader::append_unordered_reader_columns( bool should_marshal_records ) { size_t object_begin_pos = reader.get_column_size(); - for (int32_t column_id : schema_ids) { + for (size_t i = 0; i < schema_ids.size(); ++i) { + auto const column_id{schema_ids[i]}; if (Schema::schema_entry_is_unordered_object(column_id)) { + auto length{Schema::get_unordered_object_length(column_id)}; + auto sub_schema{schema_ids.subspan(i + 1, length)}; + auto subtree_root_node_id{m_schema_tree->find_matching_subtree_root_in_subtree( + mst_subtree_root_node_id, + SchemaReader::get_first_column_in_span(sub_schema), + Schema::get_unordered_object_type(column_id) + )}; + append_unordered_reader_columns( + reader, + subtree_root_node_id, + sub_schema, + should_marshal_records + ); + i += length; continue; } BaseColumnReader* column_reader = nullptr; @@ -262,8 +296,12 @@ void ArchiveReader::append_unordered_reader_columns( case NodeType::ClpString: column_reader = new ClpStringColumnReader(column_id, m_var_dict, m_log_dict); break; + case NodeType::LogType: + column_reader = new LogTypeColumnReader(column_id, m_log_dict); + break; case NodeType::VarString: - column_reader = new VariableStringColumnReader(column_id, m_var_dict); + column_reader + = new VariableStringColumnReader(column_id, m_var_dict, node.get_type()); break; case NodeType::Boolean: column_reader = new BooleanColumnReader(column_id); @@ -277,6 +315,8 @@ void ArchiveReader::append_unordered_reader_columns( case NodeType::Object: case NodeType::Metadata: case NodeType::NullValue: + case NodeType::LogMessage: + case NodeType::CaptureVar: case NodeType::Unknown: break; } @@ -372,6 +412,10 @@ void ArchiveReader::close() { m_var_dict->close(); m_log_dict->close(); m_array_dict->close(); + if (m_experimental_stats) { + m_experimental_stats->m_logtype_stats.clear(); + m_experimental_stats->m_var_stats.clear(); + } m_stream_reader.close(); m_archive_reader_adaptor.reset(); @@ -398,4 +442,23 @@ std::shared_ptr ArchiveReader::read_stream(size_t stream_id, bool reuse_ m_cur_stream_id = stream_id; return m_stream_buffer; } + +auto ArchiveReader::read_experimental_stats() -> ystdlib::error_handling::Result { + if (false == m_experimental_stats.has_value()) { + return ClpsErrorCode{ClpsErrorCodeEnum::BadParam}; + } + constexpr size_t cDecompressorFileReadBufferCapacity{64UL * 1024}; + auto reader{ + m_archive_reader_adaptor->checkout_reader_for_section(constants::cArchiveStatsFile) + }; + ZstdDecompressor decompressor{}; + decompressor.open(*reader, cDecompressorFileReadBufferCapacity); + + YSTDLIB_ERROR_HANDLING_TRYX(m_experimental_stats->m_logtype_stats.decompress(decompressor)); + YSTDLIB_ERROR_HANDLING_TRYX(m_experimental_stats->m_var_stats.decompress(decompressor)); + + decompressor.close(); + m_archive_reader_adaptor->checkin_reader_for_section(constants::cArchiveStatsFile); + return ystdlib::error_handling::success(); +} } // namespace clp_s diff --git a/components/core/src/clp_s/ArchiveReader.hpp b/components/core/src/clp_s/ArchiveReader.hpp index 40cee82bb5..23e821535b 100644 --- a/components/core/src/clp_s/ArchiveReader.hpp +++ b/components/core/src/clp_s/ArchiveReader.hpp @@ -2,10 +2,16 @@ #define CLP_S_ARCHIVEREADER_HPP #include +#include #include #include #include #include +#include + +#include + +#include #include "ArchiveReaderAdaptor.hpp" #include "DictionaryReader.hpp" @@ -26,15 +32,26 @@ class ArchiveReader { : TraceableException(error_code, filename, line_number) {} }; + struct Options { + Options() = default; + + Options(NetworkAuthOption network_auth, bool experimental) + : m_network_auth{network_auth}, + m_experimental{experimental} {} + + NetworkAuthOption m_network_auth{}; + bool m_experimental{false}; + }; + // Constructor ArchiveReader() : m_is_open(false) {} /** * Opens an archive for reading. * @param archive_path - * @param network_auth + * @param options */ - void open(Path const& archive_path, NetworkAuthOption const& network_auth); + void open(Path const& archive_path, Options const& options); /** * Reads the dictionaries and metadata. @@ -146,6 +163,10 @@ class ArchiveReader { */ bool has_log_order() { return m_log_event_idx_column_id >= 0; } + auto get_experimental_stats() -> ExperimentalStats const& { + return m_experimental_stats.value(); + } + private: /** * Initializes a schema reader passed by reference to become a reader for a given schema. @@ -196,6 +217,11 @@ class ArchiveReader { */ std::shared_ptr read_stream(size_t stream_id, bool reuse_buffer); + /** + * Reads the experimental statistics from the archive. + */ + auto read_experimental_stats() -> ystdlib::error_handling::Result; + bool m_is_open; std::string m_archive_id; std::shared_ptr m_var_dict; @@ -218,6 +244,8 @@ class ArchiveReader { size_t m_stream_buffer_size{0ULL}; size_t m_cur_stream_id{0ULL}; int32_t m_log_event_idx_column_id{-1}; + + std::optional m_experimental_stats; }; } // namespace clp_s diff --git a/components/core/src/clp_s/ArchiveStats.cpp b/components/core/src/clp_s/ArchiveStats.cpp new file mode 100644 index 0000000000..e67cbe73ee --- /dev/null +++ b/components/core/src/clp_s/ArchiveStats.cpp @@ -0,0 +1,51 @@ +#include "ArchiveStats.hpp" + +#include + +#include + +#include +#include +#include + +namespace clp_s { +auto LogTypeStat::compress(ZstdCompressor& compressor) const + -> ystdlib::error_handling::Result { + compressor.write_numeric_value(m_count); + return ystdlib::error_handling::success(); +} + +auto LogTypeStat::decompress(ZstdDecompressor& decompressor) + -> ystdlib::error_handling::Result { + LogTypeStat stat{}; + if (ErrorCodeSuccess != decompressor.try_read_numeric_value(stat.m_count)) { + return ClpsErrorCode{ClpsErrorCodeEnum::Failure}; + } + return stat; +} + +auto VariableStat::compress(ZstdCompressor& compressor) const + -> ystdlib::error_handling::Result { + compressor.write_numeric_value(m_count); + compressor.write_numeric_value(m_type.size()); + compressor.write_string(m_type); + return ystdlib::error_handling::success(); +} + +auto VariableStat::decompress(ZstdDecompressor& decompressor) + -> ystdlib::error_handling::Result { + VariableStat stat{}; + if (ErrorCodeSuccess != decompressor.try_read_numeric_value(stat.m_count)) { + return ClpsErrorCode{ClpsErrorCodeEnum::Failure}; + } + + size_t type_size{}; + if (ErrorCodeSuccess != decompressor.try_read_numeric_value(type_size)) { + return ClpsErrorCode{ClpsErrorCodeEnum::Failure}; + } + if (ErrorCodeSuccess != decompressor.try_read_string(type_size, stat.m_type)) { + return ClpsErrorCode{ClpsErrorCodeEnum::Failure}; + } + return stat; +} +} // namespace clp_s diff --git a/components/core/src/clp_s/ArchiveStats.hpp b/components/core/src/clp_s/ArchiveStats.hpp new file mode 100644 index 0000000000..374cc5a718 --- /dev/null +++ b/components/core/src/clp_s/ArchiveStats.hpp @@ -0,0 +1,142 @@ +#ifndef CLP_S_ARCHIVESTATS_HPP +#define CLP_S_ARCHIVESTATS_HPP + +#include +#include +#include +#include +#include + +#include +#include + +#include +#include +#include +#include +#include +#include + +namespace clp_s { +class ArchiveStats { +public: + // Constructors + explicit ArchiveStats( + std::string id, + epochtime_t begin_timestamp, + epochtime_t end_timestamp, + size_t uncompressed_size, + size_t compressed_size, + nlohmann::json range_index, + bool is_split + ) + : m_id{std::move(id)}, + m_begin_timestamp{begin_timestamp}, + m_end_timestamp{end_timestamp}, + m_uncompressed_size{uncompressed_size}, + m_compressed_size{compressed_size}, + m_range_index(std::move(range_index)), // Avoid {} to prevent wrapping in JSON array. + m_is_split{is_split} {} + + // Methods + /** + * @return The contents of `ArchiveStats` as a JSON object in a string. + */ + [[nodiscard]] auto as_string() const -> std::string { + namespace archive = clp::streaming_archive::cMetadataDB::Archive; + namespace file = clp::streaming_archive::cMetadataDB::File; + constexpr std::string_view cRangeIndex{"range_index"}; + + nlohmann::json const json_msg + = {{archive::Id, m_id}, + {archive::BeginTimestamp, m_begin_timestamp}, + {archive::EndTimestamp, m_end_timestamp}, + {archive::UncompressedSize, m_uncompressed_size}, + {archive::Size, m_compressed_size}, + {file::IsSplit, m_is_split}, + {cRangeIndex, m_range_index}}; + return json_msg.dump(-1, ' ', false, nlohmann::json::error_handler_t::ignore); + } + + [[nodiscard]] auto get_id() const -> std::string const& { return m_id; } + + [[nodiscard]] auto get_begin_timestamp() const -> epochtime_t { return m_begin_timestamp; } + + [[nodiscard]] auto get_end_timestamp() const -> epochtime_t { return m_end_timestamp; } + + [[nodiscard]] auto get_uncompressed_size() const -> size_t { return m_uncompressed_size; } + + [[nodiscard]] auto get_compressed_size() const -> size_t { return m_compressed_size; } + + [[nodiscard]] auto get_range_index() const -> nlohmann::json const& { return m_range_index; } + + [[nodiscard]] auto get_is_split() const -> bool { return m_is_split; } + +private: + std::string m_id; + epochtime_t m_begin_timestamp{}; + epochtime_t m_end_timestamp{}; + size_t m_uncompressed_size{}; + size_t m_compressed_size{}; + nlohmann::json m_range_index; + bool m_is_split{}; +}; + +/* + * Tracks the stats for a log type in an archive. + */ +class LogTypeStat { +public: + [[nodiscard]] auto compress(ZstdCompressor& compressor) const + -> ystdlib::error_handling::Result; + + [[nodiscard]] static auto decompress(ZstdDecompressor& decompressor) + -> ystdlib::error_handling::Result; + + [[nodiscard]] auto get_count() const -> size_t { return m_count; } + + auto increment_count() -> void { ++m_count; } + +private: + size_t m_count{}; +}; + +using LogTypeStats = Array; + +/* + * Tracks the stats for a dictionary variables in an archive. + */ +class VariableStat { +public: + VariableStat() = default; + + VariableStat(std::string_view type) : m_type(type) {} + + [[nodiscard]] auto compress(ZstdCompressor& compressor) const + -> ystdlib::error_handling::Result; + + [[nodiscard]] static auto decompress(ZstdDecompressor& decompressor) + -> ystdlib::error_handling::Result; + + [[nodiscard]] auto get_count() const -> size_t { return m_count; } + + [[nodiscard]] auto get_type() const -> std::string_view { return m_type; } + + auto increment_count() -> void { ++m_count; } + + auto set_type(std::string_view type) -> void { m_type = type; } + +private: + std::string m_type; + size_t m_count{}; +}; + +using VariableStats = Array; + +struct ExperimentalStats { + LogTypeStats m_logtype_stats; + VariableStats m_var_stats; +}; +} // namespace clp_s + +#endif // CLP_S_ARCHIVEWRITER_HPP diff --git a/components/core/src/clp_s/ArchiveWriter.cpp b/components/core/src/clp_s/ArchiveWriter.cpp index 4446148d9e..92fc728853 100644 --- a/components/core/src/clp_s/ArchiveWriter.cpp +++ b/components/core/src/clp_s/ArchiveWriter.cpp @@ -2,14 +2,22 @@ #include #include +#include #include +#include #include #include +#include -#include "archive_constants.hpp" -#include "Defs.hpp" -#include "SchemaTree.hpp" +#include +#include +#include +#include +#include +#include +#include +#include namespace clp_s { void ArchiveWriter::open(ArchiveWriterOption const& option) { @@ -55,6 +63,10 @@ void ArchiveWriter::open(ArchiveWriterOption const& option) { std::string array_dict_path = m_archive_path + constants::cArchiveArrayDictFile; m_array_dict = std::make_shared(); m_array_dict->open(array_dict_path, m_compression_level, UINT64_MAX); + + if (option.experimental) { + m_experimental_stats = ExperimentalStats(); + } } auto ArchiveWriter::close(bool is_split) -> ArchiveStats { @@ -79,6 +91,18 @@ auto ArchiveWriter::close(bool is_split) -> ArchiveStats { {constants::cArchiveArrayDictFile, array_dict_compressed_size}, {constants::cArchiveTablesFile, table_compressed_size} }; + + if (m_experimental_stats) { + auto stats_compressed_size{close_experimenal_stats()}; + if (stats_compressed_size.has_error()) { + throw OperationFailed(ErrorCodeFailure, __FILENAME__, __LINE__); + } + files.emplace_back( + std::string(constants::cArchiveStatsFile), + stats_compressed_size.value() + ); + } + uint64_t offset = 0; for (auto& file : files) { uint64_t original_size = file.o; @@ -325,6 +349,9 @@ void ArchiveWriter::initialize_schema_writer(SchemaWriter* writer, Schema const& case NodeType::ClpString: writer->append_column(new ClpStringColumnWriter(id, m_var_dict, m_log_dict)); break; + case NodeType::LogType: + writer->append_column(new LogTypeColumnWriter(id, m_log_dict)); + break; case NodeType::VarString: writer->append_column(new VariableStringColumnWriter(id, m_var_dict)); break; @@ -344,6 +371,8 @@ void ArchiveWriter::initialize_schema_writer(SchemaWriter* writer, Schema const& case NodeType::NullValue: case NodeType::Object: case NodeType::StructuredArray: + case NodeType::LogMessage: + case NodeType::CaptureVar: case NodeType::Unknown: break; } @@ -468,4 +497,57 @@ std::pair ArchiveWriter::store_tables() { return {table_metadata_compressed_size, table_compressed_size}; } + +auto ArchiveWriter::update_logtype_stats(clp_s::LogTypeDictionaryEntry logtype) + -> ystdlib::error_handling::Result { + if (false == m_experimental_stats.has_value()) { + return ClpsErrorCode{ClpsErrorCodeEnum::Unsupported}; + } + + clp::logtype_dictionary_id_t id{}; + auto new_entry{m_log_dict->add_entry(logtype, id)}; + auto& logtype_stats{m_experimental_stats.value().m_logtype_stats}; + if (new_entry) { + logtype_stats.at_or_create(id); + } + logtype_stats.at(id).increment_count(); + return id; +} + +auto ArchiveWriter::update_var_stats(std::string_view value, std::string_view type) + -> ystdlib::error_handling::Result { + if (false == m_experimental_stats.has_value()) { + return ClpsErrorCode{ClpsErrorCodeEnum::Unsupported}; + } + + clp::variable_dictionary_id_t id{}; + auto new_entry{m_var_dict->add_entry(value, id)}; + auto& var_stats{m_experimental_stats.value().m_var_stats}; + if (new_entry) { + var_stats.at_or_create(id).set_type(type); + } + var_stats.at(id).increment_count(); + return id; +} + +auto ArchiveWriter::close_experimenal_stats() -> ystdlib::error_handling::Result { + FileWriter writer{}; + writer.open( + m_archive_path + std::string{constants::cArchiveStatsFile}, + FileWriter::OpenMode::CreateForWriting + ); + + ZstdCompressor compressor{}; + compressor.open(writer, m_compression_level); + YSTDLIB_ERROR_HANDLING_TRYX(m_experimental_stats->m_logtype_stats.compress(compressor)); + YSTDLIB_ERROR_HANDLING_TRYX(m_experimental_stats->m_var_stats.compress(compressor)); + + compressor.close(); + auto compressed_size{writer.get_pos()}; + writer.close(); + + m_experimental_stats->m_logtype_stats.clear(); + m_experimental_stats->m_var_stats.clear(); + return compressed_size; +} } // namespace clp_s diff --git a/components/core/src/clp_s/ArchiveWriter.hpp b/components/core/src/clp_s/ArchiveWriter.hpp index 6b55fd6f2f..31d8ebe936 100644 --- a/components/core/src/clp_s/ArchiveWriter.hpp +++ b/components/core/src/clp_s/ArchiveWriter.hpp @@ -5,10 +5,16 @@ #include #include #include +#include #include #include #include +#include + +#include +#include +#include #include "../clp/streaming_archive/Constants.hpp" #include "archive_constants.hpp" @@ -31,70 +37,8 @@ struct ArchiveWriterOption { size_t min_table_size; std::vector authoritative_timestamp; std::string authoritative_timestamp_namespace; -}; - -class ArchiveStats { -public: - // Constructors - explicit ArchiveStats( - std::string id, - epochtime_t begin_timestamp, - epochtime_t end_timestamp, - size_t uncompressed_size, - size_t compressed_size, - nlohmann::json range_index, - bool is_split - ) - : m_id{id}, - m_begin_timestamp{begin_timestamp}, - m_end_timestamp{end_timestamp}, - m_uncompressed_size{uncompressed_size}, - m_compressed_size{compressed_size}, - m_range_index(std::move(range_index)), // Avoid {} to prevent wrapping in JSON array. - m_is_split{is_split} {} - - // Methods - /** - * @return The contents of `ArchiveStats` as a JSON object in a string. - */ - [[nodiscard]] auto as_string() const -> std::string { - namespace Archive = clp::streaming_archive::cMetadataDB::Archive; - namespace File = clp::streaming_archive::cMetadataDB::File; - constexpr std::string_view cRangeIndex{"range_index"}; - - nlohmann::json json_msg - = {{Archive::Id, m_id}, - {Archive::BeginTimestamp, m_begin_timestamp}, - {Archive::EndTimestamp, m_end_timestamp}, - {Archive::UncompressedSize, m_uncompressed_size}, - {Archive::Size, m_compressed_size}, - {File::IsSplit, m_is_split}, - {cRangeIndex, m_range_index}}; - return json_msg.dump(-1, ' ', false, nlohmann::json::error_handler_t::ignore); - } - - auto get_id() const -> std::string const& { return m_id; } - - auto get_begin_timestamp() const -> epochtime_t { return m_begin_timestamp; } - - auto get_end_timestamp() const -> epochtime_t { return m_end_timestamp; } - - auto get_uncompressed_size() const -> size_t { return m_uncompressed_size; } - - auto get_compressed_size() const -> size_t { return m_compressed_size; } - - auto get_range_index() const -> nlohmann::json const& { return m_range_index; } - - auto get_is_split() const -> bool { return m_is_split; } -private: - std::string m_id; - epochtime_t m_begin_timestamp{}; - epochtime_t m_end_timestamp{}; - size_t m_uncompressed_size{}; - size_t m_compressed_size{}; - nlohmann::json m_range_index; - bool m_is_split{}; + bool experimental{false}; }; class ArchiveWriter { @@ -269,6 +213,25 @@ class ArchiveWriter { return rc; } + /** + * Update the stats for the given log type, adding it to the log type dictionary if necessary. + * @param logtype + * @return The log type ID on success. + * @return ClpsErrorCodeEnum::Unsupported if experimental stats are not enabled. + */ + auto update_logtype_stats(clp_s::LogTypeDictionaryEntry logtype) + -> ystdlib::error_handling::Result; + + /** + * Update the stats for the given variable, adding it to the variable dictionary if necessary. + * @param value The variable value. + * @param type The variable type. + * @return The variable ID on success. + * @return ClpsErrorCodeEnum::Unsupported if experimental stats are not enabled. + */ + auto update_var_stats(std::string_view value, std::string_view type) + -> ystdlib::error_handling::Result; + private: /** * Initializes the schema writer @@ -285,6 +248,13 @@ class ArchiveWriter { */ [[nodiscard]] std::pair store_tables(); + /** + * Compresses, stores, and clear the experimental statistics. The stats vectors are not cleared + * if the result is an error. + * @return The size of the compressed statistics metadata in bytes. + */ + [[nodiscard]] auto close_experimenal_stats() -> ystdlib::error_handling::Result; + /** * Writes the archive to a single file * @param files @@ -356,6 +326,8 @@ class ArchiveWriter { RangeIndexWriter m_range_index_writer; bool m_range_open{false}; + + std::optional m_experimental_stats; }; } // namespace clp_s diff --git a/components/core/src/clp_s/Array.hpp b/components/core/src/clp_s/Array.hpp new file mode 100644 index 0000000000..17a5a61ddb --- /dev/null +++ b/components/core/src/clp_s/Array.hpp @@ -0,0 +1,70 @@ +#ifndef CLP_S_ARRAY_HPP +#define CLP_S_ARRAY_HPP + +#include +#include + +#include + +#include +#include +#include + +namespace clp_s { +template +class Array { +public: + [[nodiscard]] auto at(Index i) -> Element& { return m_array.at(i); } + + auto at_or_create(Index id) -> Element&; + + auto clear() -> void { return m_array.clear(); } + + template + auto emplace_back(Args&&... args) -> Element& { + return m_array.emplace_back(std::forward(args)...); + } + + auto compress(ZstdCompressor& compressor) -> ystdlib::error_handling::Result; + + auto decompress(ZstdDecompressor& decompressor) -> ystdlib::error_handling::Result; + + [[nodiscard]] auto size() const -> size_t { return m_array.size(); } + +private: + std::vector m_array; +}; + +template +auto Array::compress(ZstdCompressor& compressor) + -> ystdlib::error_handling::Result { + compressor.write_numeric_value(m_array.size()); + for (auto const& stat : m_array) { + YSTDLIB_ERROR_HANDLING_TRYV(stat.compress(compressor)); + } + return ystdlib::error_handling::success(); +} + +template +auto Array::decompress(ZstdDecompressor& decompressor) + -> ystdlib::error_handling::Result { + size_t size{}; + if (ErrorCodeSuccess != decompressor.try_read_numeric_value(size)) { + return ClpsErrorCode{ClpsErrorCodeEnum::Failure}; + } + m_array.reserve(size); + for (size_t i{0}; i < size; ++i) { + m_array.emplace_back(YSTDLIB_ERROR_HANDLING_TRYX(Element::decompress(decompressor))); + } + return ystdlib::error_handling::success(); +} + +template +auto Array::at_or_create(Index id) -> Element& { + if (m_array.size() <= id) { + m_array.resize(id + 1); + } + return m_array.at(id); +} +} // namespace clp_s +#endif // CLP_S_ARRAY_HPP diff --git a/components/core/src/clp_s/CMakeLists.txt b/components/core/src/clp_s/CMakeLists.txt index d38c28ade5..1617b375d2 100644 --- a/components/core/src/clp_s/CMakeLists.txt +++ b/components/core/src/clp_s/CMakeLists.txt @@ -130,6 +130,7 @@ if(CLP_BUILD_CLP_S_CLP_DEPENDENCIES) clp::string_utils log_surgeon::log_surgeon ystdlib::containers + ystdlib::error_handling ${zstd_TARGET} PRIVATE Boost::regex @@ -139,7 +140,6 @@ if(CLP_BUILD_CLP_S_CLP_DEPENDENCIES) nlohmann_json::nlohmann_json OpenSSL::Crypto spdlog::spdlog - ystdlib::error_handling ) endif() @@ -188,6 +188,7 @@ set( CLP_S_IO_SOURCES Compressor.hpp Decompressor.hpp + ErrorCode.cpp ErrorCode.hpp FileReader.cpp FileReader.hpp @@ -227,8 +228,11 @@ endif() set( CLP_S_ARCHIVE_WRITER_SOURCES archive_constants.hpp + ArchiveStats.cpp + ArchiveStats.hpp ArchiveWriter.cpp ArchiveWriter.hpp + Array.hpp ColumnWriter.cpp ColumnWriter.hpp Defs.hpp @@ -236,6 +240,7 @@ set( DictionaryEntry.hpp DictionaryWriter.cpp DictionaryWriter.hpp + ErrorCode.cpp ErrorCode.hpp FloatFormatEncoding.cpp FloatFormatEncoding.hpp @@ -306,6 +311,7 @@ set( DictionaryEntry.cpp DictionaryEntry.hpp DictionaryReader.hpp + ErrorCode.cpp ErrorCode.hpp FloatFormatEncoding.cpp FloatFormatEncoding.hpp @@ -358,6 +364,7 @@ endif() set( CLP_S_JSON_CONSTRUCTOR_SOURCES + ErrorCode.cpp ErrorCode.hpp JsonConstructor.cpp JsonConstructor.hpp @@ -376,6 +383,7 @@ if(CLP_BUILD_CLP_S_JSONCONSTRUCTOR) clp_s_json_constructor PUBLIC clp_s::archive_reader + ystdlib::error_handling ${zstd_TARGET} PRIVATE fmt::fmt @@ -387,6 +395,7 @@ endif() set( CLP_S_TIMESTAMP_PATTERN_SOURCES Defs.hpp + ErrorCode.cpp ErrorCode.hpp TimestampPattern.cpp TimestampPattern.hpp @@ -403,6 +412,8 @@ if(CLP_BUILD_CLP_S_TIMESTAMPPATTERN) target_include_directories(clp_s_timestamp_pattern PUBLIC ../) target_link_libraries( clp_s_timestamp_pattern + PUBLIC + ystdlib::error_handling PRIVATE clp::string_utils date::date @@ -414,6 +425,7 @@ set( CLP_S_EXE_SOURCES CommandLineArguments.cpp CommandLineArguments.hpp + ErrorCode.cpp ErrorCode.hpp kv_ir_search.cpp kv_ir_search.hpp @@ -432,6 +444,7 @@ if(CLP_BUILD_EXECUTABLES) target_link_libraries( clp-s PRIVATE + Boost::filesystem Boost::program_options clp_s::archive_reader clp_s::archive_writer diff --git a/components/core/src/clp_s/ColumnReader.cpp b/components/core/src/clp_s/ColumnReader.cpp index 2d2a2d7ee6..33d6e9f176 100644 --- a/components/core/src/clp_s/ColumnReader.cpp +++ b/components/core/src/clp_s/ColumnReader.cpp @@ -198,6 +198,42 @@ UnalignedMemSpan ClpStringColumnReader::get_encoded_vars(uint64_t cur_m return m_encoded_vars.sub_span(encoded_vars_offset, entry.get_num_variables()); } +auto LogTypeColumnReader::load(BufferViewReader& reader, uint64_t num_messages) -> void { + m_logtypes = reader.read_unaligned_span(num_messages); +} + +auto LogTypeColumnReader::extract_value(uint64_t cur_message) + -> std::variant { + std::string message; + extract_string_value_into_buffer(cur_message, message); + return message; +} + +auto +LogTypeColumnReader::extract_string_value_into_buffer(uint64_t cur_message, std::string& buffer) + -> void { + auto const value{m_logtypes[cur_message]}; + // auto const logtype_id{LogTypeColumnWriter::get_encoded_log_dict_id(value)}; + // auto& entry{m_log_dict->get_entry(logtype_id)}; + auto& entry{m_log_dict->get_entry(static_cast(value))}; + + if (false == entry.initialized()) { + entry.decode_log_type(); + } + + buffer.append(entry.get_value()); +} + +auto LogTypeColumnReader::extract_escaped_string_value_into_buffer( + uint64_t cur_message, + std::string& buffer +) -> void { + // TODO: escape while decoding instead of after. + std::string tmp; + extract_string_value_into_buffer(cur_message, tmp); + StringUtils::escape_json_string(buffer, tmp); +} + void VariableStringColumnReader::load(BufferViewReader& reader, uint64_t num_messages) { m_variables = reader.read_unaligned_span(num_messages); } diff --git a/components/core/src/clp_s/ColumnReader.hpp b/components/core/src/clp_s/ColumnReader.hpp index 6170a3f161..175701ad29 100644 --- a/components/core/src/clp_s/ColumnReader.hpp +++ b/components/core/src/clp_s/ColumnReader.hpp @@ -290,12 +290,41 @@ class ClpStringColumnReader : public BaseColumnReader { bool m_is_array; }; +class LogTypeColumnReader : public BaseColumnReader { +public: + LogTypeColumnReader(int32_t id, std::shared_ptr log_dict) + : BaseColumnReader(id), + m_log_dict(std::move(log_dict)) {} + + auto load(BufferViewReader& reader, uint64_t num_messages) -> void override; + + auto get_type() -> NodeType override { return NodeType::LogType; } + + auto extract_value(uint64_t cur_message) + -> std::variant override; + + auto extract_string_value_into_buffer(uint64_t cur_message, std::string& buffer) + -> void override; + + auto extract_escaped_string_value_into_buffer(uint64_t cur_message, std::string& buffer) + -> void override; + +private: + std::shared_ptr m_log_dict; + UnalignedMemSpan m_logtypes; +}; + class VariableStringColumnReader : public BaseColumnReader { public: // Constructor - VariableStringColumnReader(int32_t id, std::shared_ptr var_dict) + VariableStringColumnReader( + int32_t id, + std::shared_ptr var_dict, + NodeType type + ) : BaseColumnReader(id), - m_var_dict(std::move(var_dict)) {} + m_var_dict(std::move(var_dict)), + m_type(type) {} // Destructor ~VariableStringColumnReader() override = default; @@ -303,7 +332,7 @@ class VariableStringColumnReader : public BaseColumnReader { // Methods inherited from BaseColumnReader void load(BufferViewReader& reader, uint64_t num_messages) override; - NodeType get_type() override { return NodeType::VarString; } + NodeType get_type() override { return m_type; } std::variant extract_value( uint64_t cur_message @@ -325,6 +354,8 @@ class VariableStringColumnReader : public BaseColumnReader { std::shared_ptr m_var_dict; UnalignedMemSpan m_variables; + + NodeType m_type; }; class DateStringColumnReader : public BaseColumnReader { diff --git a/components/core/src/clp_s/ColumnWriter.cpp b/components/core/src/clp_s/ColumnWriter.cpp index 1f57f5c454..47dab53bdc 100644 --- a/components/core/src/clp_s/ColumnWriter.cpp +++ b/components/core/src/clp_s/ColumnWriter.cpp @@ -155,6 +155,22 @@ void ClpStringColumnWriter::store(ZstdCompressor& compressor) { compressor.write(reinterpret_cast(m_encoded_vars.data()), encoded_vars_size); } +auto LogTypeColumnWriter::add_value(ParsedMessage::variable_t& value) -> size_t { + auto logtype_dict_entry{std::get(value)}; + clp::logtype_dictionary_id_t id{}; + m_log_dict->add_entry(logtype_dict_entry, id); + m_logtypes.push_back(id); + return sizeof(clp::logtype_dictionary_id_t); +} + +auto LogTypeColumnWriter::store(ZstdCompressor& compressor) -> void { + compressor.write( + // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast) + reinterpret_cast(m_logtypes.data()), + m_logtypes.size() * sizeof(clp::logtype_dictionary_id_t) + ); +} + size_t VariableStringColumnWriter::add_value(ParsedMessage::variable_t& value) { clp::variable_dictionary_id_t id{}; m_var_dict->add_entry(std::get(value), id); diff --git a/components/core/src/clp_s/ColumnWriter.hpp b/components/core/src/clp_s/ColumnWriter.hpp index 78b7665493..04e6212a8e 100644 --- a/components/core/src/clp_s/ColumnWriter.hpp +++ b/components/core/src/clp_s/ColumnWriter.hpp @@ -3,6 +3,9 @@ #include #include +#include + +#include #include "../clp/Defs.h" #include "DictionaryWriter.hpp" @@ -156,10 +159,8 @@ class BooleanColumnWriter : public BaseColumnWriter { class ClpStringColumnWriter : public BaseColumnWriter { public: - // Types using encoded_log_dict_id_t = uint64_t; - // Constructor ClpStringColumnWriter( int32_t id, std::shared_ptr var_dict, @@ -169,21 +170,18 @@ class ClpStringColumnWriter : public BaseColumnWriter { m_var_dict(std::move(var_dict)), m_log_dict(std::move(log_dict)) {} - // Destructor - ~ClpStringColumnWriter() override = default; - - // Methods inherited from BaseColumnWriter - size_t add_value(ParsedMessage::variable_t& value) override; + auto add_value(ParsedMessage::variable_t& value) -> size_t override; - void store(ZstdCompressor& compressor) override; + auto store(ZstdCompressor& compressor) -> void override; - size_t get_total_header_size() const override { return sizeof(size_t); } + auto get_total_header_size() const -> size_t override { return sizeof(size_t); } /** * @param encoded_id * @return the encoded log dict id */ - static clp::logtype_dictionary_id_t get_encoded_log_dict_id(encoded_log_dict_id_t encoded_id) { + static auto get_encoded_log_dict_id(encoded_log_dict_id_t encoded_id) + -> clp::logtype_dictionary_id_t { return static_cast(encoded_id & cLogDictIdMask); } @@ -191,7 +189,7 @@ class ClpStringColumnWriter : public BaseColumnWriter { * @param encoded_id * @return The encoded offset */ - static uint64_t get_encoded_offset(encoded_log_dict_id_t encoded_id) { + static auto get_encoded_offset(encoded_log_dict_id_t encoded_id) -> uint64_t { return (encoded_id & cOffsetMask) >> cOffsetBitPosition; } @@ -202,8 +200,8 @@ class ClpStringColumnWriter : public BaseColumnWriter { * @param offset * @return The encoded log dict id */ - static encoded_log_dict_id_t - encode_log_dict_id(clp::logtype_dictionary_id_t id, uint64_t offset) { + static auto encode_log_dict_id(clp::logtype_dictionary_id_t id, uint64_t offset) + -> encoded_log_dict_id_t { return static_cast(id) | (offset << cOffsetBitPosition); } @@ -219,6 +217,21 @@ class ClpStringColumnWriter : public BaseColumnWriter { std::vector m_encoded_vars; }; +class LogTypeColumnWriter : public BaseColumnWriter { +public: + LogTypeColumnWriter(int32_t id, std::shared_ptr log_dict) + : BaseColumnWriter(id), + m_log_dict{std::move(log_dict)} {} + + auto add_value(ParsedMessage::variable_t& value) -> size_t override; + + auto store(ZstdCompressor& compressor) -> void override; + +private: + std::shared_ptr m_log_dict; + std::vector m_logtypes; +}; + class VariableStringColumnWriter : public BaseColumnWriter { public: // Constructor diff --git a/components/core/src/clp_s/CommandLineArguments.cpp b/components/core/src/clp_s/CommandLineArguments.cpp index 72c0d50d80..ebb1b2363e 100644 --- a/components/core/src/clp_s/CommandLineArguments.cpp +++ b/components/core/src/clp_s/CommandLineArguments.cpp @@ -2,11 +2,20 @@ #include #include +#include #include +#include #include #include #include +#include + +#include +#include +#include +#include +#include #include "../clp/cli_utils.hpp" #include "../clp/type_utils.hpp" @@ -123,7 +132,11 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) { } po::options_description general_options("General options"); - general_options.add_options()("help,h", "Print help"); + general_options.add_options()("help,h", "Print help")( + "experimental", + po::bool_switch(&m_experimental), + "Enable experimental features to be used." + ); char command_input; po::options_description general_positional_options("General positional options"); @@ -272,12 +285,21 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) { ); // clang-format on + po::options_description experimental_options("Experimental Options"); + std::string log_surgeon_schema_path{}; + experimental_options.add_options()( + "schema-path", + po::value(&log_surgeon_schema_path)->value_name("PATH"), + "Path to a log surgeon schema. See README-Schema.md for details." + ); + po::positional_options_description positional_options; positional_options.add("archives-dir", 1); positional_options.add("input-paths", -1); po::options_description all_compression_options; all_compression_options.add(compression_options); + all_compression_options.add(experimental_options); all_compression_options.add(compression_positional_options); std::vector unrecognized_options @@ -329,6 +351,12 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) { } validate_network_auth(auth, m_network_auth); + + if (false == log_surgeon_schema_path.empty()) { + m_log_surgeon_schema_path = get_path_object_for_raw_path(log_surgeon_schema_path); + } + + validate_experimental(); } else if ((char)Command::Extract == command_input) { po::options_description extraction_options; std::string archive_path; @@ -707,6 +735,8 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) { validate_network_auth(auth, m_network_auth); + validate_experimental(); + if (m_query.empty()) { throw std::invalid_argument("No query specified"); } @@ -940,4 +970,87 @@ void CommandLineArguments::print_search_usage() const { " [OUTPUT_HANDLER [OUTPUT_HANDLER_OPTIONS]]" << std::endl; } + +auto CommandLineArguments::validate_experimental() const -> void { + if (m_experimental) { + return; + } + if (m_log_surgeon_schema_path.has_value()) { + throw std::invalid_argument("Set --experimental to parse text with log-surgeon."); + } + if (ExperimentalQueries::cLogTypeStatsQuery == m_query) { + throw std::invalid_argument("Set --experimental to access the logtype stats."); + } + if (ExperimentalQueries::cVariableStatsQuery == m_query) { + throw std::invalid_argument("Set --experimental to access the variable stats."); + } +} + +auto CommandLineArguments::create_output_handler() const + -> ystdlib::error_handling::Result> { + std::unique_ptr output_handler; + try { + switch (get_output_handler_type()) { + case CommandLineArguments::OutputHandlerType::File: + output_handler + = std::make_unique(get_file_output_path(), true); + break; + case CommandLineArguments::OutputHandlerType::Network: { + output_handler = std::make_unique( + get_network_dest_host(), + get_network_dest_port() + ); + break; + } + case CommandLineArguments::OutputHandlerType::Reducer: { + int reducer_socket_fd{-1}; + if (get_output_handler_type() == CommandLineArguments::OutputHandlerType::Reducer) { + reducer_socket_fd = reducer::connect_to_reducer( + get_reducer_host(), + get_reducer_port(), + get_job_id() + ); + if (-1 == reducer_socket_fd) { + SPDLOG_ERROR("Failed to connect to reducer."); + return ClpsErrorCode{ClpsErrorCodeEnum::BadParam}; + } + } + + if (do_count_results_aggregation()) { + output_handler = std::make_unique(reducer_socket_fd); + } else if (do_count_by_time_aggregation()) { + output_handler = std::make_unique( + reducer_socket_fd, + get_count_by_time_bucket_size() + ); + } else { + SPDLOG_ERROR("Unhandled aggregation type."); + return ClpsErrorCode{ClpsErrorCodeEnum::BadParam}; + } + break; + } + case CommandLineArguments::OutputHandlerType::ResultsCache: { + output_handler = std::make_unique( + get_mongodb_uri(), + get_mongodb_collection(), + get_batch_size(), + get_max_num_results() + ); + break; + } + case CommandLineArguments::OutputHandlerType::Stdout: { + output_handler = std::make_unique(); + break; + } + default: { + SPDLOG_ERROR("Unhandled OutputHandlerType."); + return ClpsErrorCode{ClpsErrorCodeEnum::BadParam}; + } + } + } catch (std::exception const& e) { + SPDLOG_ERROR("Failed to create output handler - {}", e.what()); + return ClpsErrorCode{ClpsErrorCodeEnum::Failure}; + } + return output_handler; +} } // namespace clp_s diff --git a/components/core/src/clp_s/CommandLineArguments.hpp b/components/core/src/clp_s/CommandLineArguments.hpp index 3c287a2c41..9de5d213e8 100644 --- a/components/core/src/clp_s/CommandLineArguments.hpp +++ b/components/core/src/clp_s/CommandLineArguments.hpp @@ -1,13 +1,18 @@ #ifndef CLP_S_COMMANDLINEARGUMENTS_HPP #define CLP_S_COMMANDLINEARGUMENTS_HPP +#include #include #include +#include #include #include #include #include +#include + +#include #include "../reducer/types.hpp" #include "Defs.hpp" @@ -37,6 +42,11 @@ class CommandLineArguments { Stdout, }; + struct ExperimentalQueries { + static constexpr std::string_view cLogTypeStatsQuery{"stats.logtypes"}; + static constexpr std::string_view cVariableStatsQuery{"stats.variables"}; + }; + // Constructors explicit CommandLineArguments(std::string const& program_name) : m_program_name(program_name) {} @@ -123,6 +133,18 @@ class CommandLineArguments { bool get_record_log_order() const { return false == m_disable_log_order; } + [[nodiscard]] auto experimental() const -> bool { return m_experimental; } + + [[nodiscard]] auto get_log_surgeon_schema_path() const -> std::optional { + return m_log_surgeon_schema_path; + } + + /** + * Create the appropriate OutputHandler based on the cli arguments supplied. + */ + [[nodiscard]] auto create_output_handler() const + -> ystdlib::error_handling::Result>; + private: // Methods /** @@ -185,6 +207,13 @@ class CommandLineArguments { void print_search_usage() const; + /** + * Validate the use of experimental features. Requires the program options to have been parsed. + * @throws std::invalid_argument if any experimental feature is used without setting the + * experimetnal flag. + */ + auto validate_experimental() const -> void; + // Variables std::string m_program_name; Command m_command; @@ -237,6 +266,10 @@ class CommandLineArguments { int64_t m_count_by_time_bucket_size{0}; // Milliseconds OutputHandlerType m_output_handler_type{OutputHandlerType::Stdout}; + + // clpsls Prototype + bool m_experimental{false}; + std::optional m_log_surgeon_schema_path; }; } // namespace clp_s diff --git a/components/core/src/clp_s/DictionaryEntry.cpp b/components/core/src/clp_s/DictionaryEntry.cpp index 02cec0a4da..8e6296487b 100644 --- a/components/core/src/clp_s/DictionaryEntry.cpp +++ b/components/core/src/clp_s/DictionaryEntry.cpp @@ -66,6 +66,11 @@ void LogTypeDictionaryEntry::add_float_var() { EncodedVariableInterpreter::add_float_var(m_value); } +auto LogTypeDictionaryEntry::add_schema_var() -> void { + m_placeholder_positions.push_back(m_value.length()); + EncodedVariableInterpreter::add_schema_var(m_value); +} + void LogTypeDictionaryEntry::add_escape() { m_placeholder_positions.push_back(m_value.length()); EncodedVariableInterpreter::add_escape(m_value); @@ -79,17 +84,9 @@ bool LogTypeDictionaryEntry::parse_next_var( string_view& var ) { auto last_var_end_pos = var_end_pos; - // clang-format off - auto escape_handler = [&]( - [[maybe_unused]] string_view constant, - [[maybe_unused]] size_t char_to_escape_pos, - string& logtype - ) -> void { - m_placeholder_positions.push_back(logtype.size()); - ++m_num_escaped_placeholders; - logtype += enum_to_underlying_type(VariablePlaceholder::Escape); - }; - // clang-format on + auto escape_handler = [&]([[maybe_unused]] string_view constant, + [[maybe_unused]] size_t char_to_escape_pos, + [[maybe_unused]] string& logtype) -> void { add_escape(); }; if (get_bounds_of_next_var(msg, var_begin_pos, var_end_pos)) { // Append to log type: from end of last variable to start of current variable auto constant = msg.substr(last_var_end_pos, var_begin_pos - last_var_end_pos); @@ -163,7 +160,7 @@ void LogTypeDictionaryEntry::read_from_file( void LogTypeDictionaryEntry::decode_log_type(string& escaped_value) { bool is_escaped = false; string constant; - for (char c : escaped_value) { + for (auto const c : escaped_value) { if (is_escaped) { constant += c; is_escaped = false; @@ -185,6 +182,10 @@ void LogTypeDictionaryEntry::decode_log_type(string& escaped_value) { add_constant(constant, 0, constant.length()); constant.clear(); add_dictionary_var(); + } else if (enum_to_underlying_type(VariablePlaceholder::Schema) == c) { + add_constant(constant, 0, constant.length()); + constant.clear(); + add_schema_var(); } else { constant += c; } @@ -202,6 +203,13 @@ void LogTypeDictionaryEntry::decode_log_type() { decode_log_type(escaped_value); } +auto LogTypeDictionaryEntry::encode_constant(std::string_view constant) -> void { + auto escape_handler = [&]([[maybe_unused]] string_view constant, + [[maybe_unused]] size_t char_to_escape_pos, + [[maybe_unused]] string& logtype) -> void { add_escape(); }; + append_constant_to_logtype(constant, escape_handler, m_value); +} + size_t VariableDictionaryEntry::get_data_size() const { return sizeof(m_id) + m_value.length(); } diff --git a/components/core/src/clp_s/DictionaryEntry.hpp b/components/core/src/clp_s/DictionaryEntry.hpp index 5f3b5ade0e..f130a3292e 100644 --- a/components/core/src/clp_s/DictionaryEntry.hpp +++ b/components/core/src/clp_s/DictionaryEntry.hpp @@ -98,6 +98,10 @@ class LogTypeDictionaryEntry : public DictionaryEntry void; /** * Adds a dictionary variable placeholder */ @@ -171,6 +175,11 @@ class LogTypeDictionaryEntry : public DictionaryEntry void; + /** * Checks if the entry has been initialized * @return true if the entry has been initialized, false otherwise diff --git a/components/core/src/clp_s/ErrorCode.cpp b/components/core/src/clp_s/ErrorCode.cpp new file mode 100644 index 0000000000..f18b520f55 --- /dev/null +++ b/components/core/src/clp_s/ErrorCode.cpp @@ -0,0 +1,64 @@ +#include "ErrorCode.hpp" + +#include + +#include + +using clp_s::ClpsErrorCodeEnum; + +using ClpsErrorCategory = ystdlib::error_handling::ErrorCategory; + +template <> +auto ClpsErrorCategory::name() const noexcept -> char const* { + return "Clp-s"; +} + +template <> +auto ClpsErrorCategory::message(ClpsErrorCodeEnum error_enum) const -> std::string { + switch (error_enum) { + case ClpsErrorCodeEnum::Success: + return "Clps Success"; + case ClpsErrorCodeEnum::BadParam: + return "Clps BadParam"; + case ClpsErrorCodeEnum::BadParamDbUri: + return "Clps BadParamDbUri"; + case ClpsErrorCodeEnum::Corrupt: + return "Clps Corrupt"; + case ClpsErrorCodeEnum::Errno: + return "Clps Errno"; + case ClpsErrorCodeEnum::EndOfFile: + return "Clps EndOfFile"; + case ClpsErrorCodeEnum::FileExists: + return "Clps FileExists"; + case ClpsErrorCodeEnum::FileNotFound: + return "Clps FileNotFound"; + case ClpsErrorCodeEnum::NoMem: + return "Clps NoMem"; + case ClpsErrorCodeEnum::NotInit: + return "Clps NotInit"; + case ClpsErrorCodeEnum::NotReady: + return "Clps NotReady"; + case ClpsErrorCodeEnum::OutOfBounds: + return "Clps OutOfBounds"; + case ClpsErrorCodeEnum::TooLong: + return "Clps TooLong"; + case ClpsErrorCodeEnum::Truncated: + return "Clps Truncated"; + case ClpsErrorCodeEnum::Unsupported: + return "Clps Unsupported"; + case ClpsErrorCodeEnum::NoAccess: + return "Clps NoAccess"; + case ClpsErrorCodeEnum::Failure: + return "Clps Failure"; + case ClpsErrorCodeEnum::FailureMetadataCorrupted: + return "Clps FailureMetadataCorrupted"; + case ClpsErrorCodeEnum::MetadataCorrupted: + return "Clps MetadataCorrupted"; + case ClpsErrorCodeEnum::FailureDbBulkWrite: + return "Clps FailureDbBulkWrite"; + case ClpsErrorCodeEnum::FailureNetwork: + return "Clps FailureNetwork"; + default: + return "Unrecognized ClpsErrorCode"; + } +} diff --git a/components/core/src/clp_s/ErrorCode.hpp b/components/core/src/clp_s/ErrorCode.hpp index 68c4892dbd..507d0df3e3 100644 --- a/components/core/src/clp_s/ErrorCode.hpp +++ b/components/core/src/clp_s/ErrorCode.hpp @@ -3,6 +3,10 @@ #ifndef CLP_S_ERRORCODE_HPP #define CLP_S_ERRORCODE_HPP +#include + +#include + namespace clp_s { typedef enum { ErrorCodeSuccess = 0, @@ -27,6 +31,34 @@ typedef enum { ErrorCodeFailureDbBulkWrite, ErrorCodeFailureNetwork, } ErrorCode; + +enum class ClpsErrorCodeEnum : uint8_t { + Success = 0, + BadParam, + BadParamDbUri, + Corrupt, + Errno, + EndOfFile, + FileExists, + FileNotFound, + NoMem, + NotInit, + NotReady, + OutOfBounds, + TooLong, + Truncated, + Unsupported, + NoAccess, + Failure, + FailureMetadataCorrupted, + MetadataCorrupted, + FailureDbBulkWrite, + FailureNetwork, +}; + +using ClpsErrorCode = ystdlib::error_handling::ErrorCode; } // namespace clp_s +YSTDLIB_ERROR_HANDLING_MARK_AS_ERROR_CODE_ENUM(clp_s::ClpsErrorCodeEnum); + #endif // CLP_S_ERRORCODE_HPP diff --git a/components/core/src/clp_s/JsonConstructor.cpp b/components/core/src/clp_s/JsonConstructor.cpp index 67f66c0c4c..9a615addbd 100644 --- a/components/core/src/clp_s/JsonConstructor.cpp +++ b/components/core/src/clp_s/JsonConstructor.cpp @@ -35,7 +35,10 @@ JsonConstructor::JsonConstructor(JsonConstructorOption const& option) : m_option void JsonConstructor::store() { m_archive_reader = std::make_unique(); - m_archive_reader->open(m_option.archive_path, m_option.network_auth); + m_archive_reader->open( + m_option.archive_path, + ArchiveReader::Options{m_option.network_auth, false} + ); m_archive_reader->read_dictionaries_and_metadata(); if (m_option.ordered && false == m_archive_reader->has_log_order()) { diff --git a/components/core/src/clp_s/JsonParser.cpp b/components/core/src/clp_s/JsonParser.cpp index c0e2123e7f..4ca318f299 100644 --- a/components/core/src/clp_s/JsonParser.cpp +++ b/components/core/src/clp_s/JsonParser.cpp @@ -1,5 +1,6 @@ #include "JsonParser.hpp" +#include #include #include #include @@ -15,9 +16,16 @@ #include #include #include +#include +#include +#include +#include +#include #include #include +#include +#include #include #include #include @@ -31,10 +39,15 @@ #include #include #include +#include +#include +#include #include #include #include #include +#include +#include #include #include @@ -72,6 +85,12 @@ auto trim_trailing_whitespace(std::string_view str) -> std::string_view; auto round_trip_is_identical(std::string_view float_str, double value, float_format_t format) -> bool; +/** + * @return A log surgeon parser created from the provided schema. + */ +auto create_log_surgeon_parser(Path const& schema_path, NetworkAuthOption const& network_auth) + -> ystdlib::error_handling::Result>; + /** * Class that implements `clp::ffi::ir_stream::IrUnitHandlerReq` for Key-Value IR compression. */ @@ -135,6 +154,36 @@ auto round_trip_is_identical(std::string_view float_str, double value, float_for auto const restore_result{restore_encoded_float(value, format)}; return false == restore_result.has_error() && float_str == restore_result.value(); } + +/* + * Log surgeon currently does not expose a way to create a parser using a Reader. To support any + * Reader and not directly take the file path we read the entire schema file into a string and + * create the parser from it. + */ +auto create_log_surgeon_parser(Path const& schema_path, NetworkAuthOption const& network_auth) + -> ystdlib::error_handling::Result> { + auto schema_reader{try_create_reader(schema_path, network_auth)}; + if (nullptr == schema_reader) { + return ClpsErrorCode{ClpsErrorCodeEnum::BadParam}; + } + std::string schema_contents{}; + constexpr size_t cBufSize{4096}; + std::array buf{}; + size_t bytes_read{}; + while (true) { + auto code{schema_reader->try_read(buf.data(), buf.size(), bytes_read)}; + if (clp::ErrorCode_EndOfFile == code) { + break; + } + if (clp::ErrorCode_Success != code) { + return ClpsErrorCode{ClpsErrorCodeEnum::Failure}; + } + schema_contents.append(buf.data(), bytes_read); + } + return std::make_unique( + log_surgeon::SchemaParser::try_schema_string(schema_contents) + ); +} } // namespace JsonParser::JsonParser(JsonParserOption const& option) @@ -146,6 +195,19 @@ JsonParser::JsonParser(JsonParserOption const& option) m_retain_float_format(option.retain_float_format), m_input_paths(option.input_paths), m_network_auth(option.network_auth) { + if (option.log_surgeon_schema_path.has_value()) { + auto const schema_path{option.log_surgeon_schema_path.value()}; + auto result{create_log_surgeon_parser(schema_path, m_network_auth)}; + if (result.has_error()) { + SPDLOG_ERROR( + "Failed to create log surgeon parser from: \"{}\" due to: \"{}\"", + schema_path.path, + result.error().message() + ); + throw OperationFailed(ErrorCodeBadParam, __FILENAME__, __LINE__); + } + m_log_surgeon_parser = std::move(result.value()); + } if (false == m_timestamp_key.empty()) { if (false == clp_s::search::ast::tokenize_column_descriptor( @@ -182,6 +244,7 @@ JsonParser::JsonParser(JsonParserOption const& option) m_archive_options.id = m_generator(); m_archive_options.authoritative_timestamp = m_timestamp_column; m_archive_options.authoritative_timestamp_namespace = m_timestamp_namespace; + m_archive_options.experimental = option.experimental; m_archive_writer = std::make_unique(); m_archive_writer->open(m_archive_options); @@ -304,13 +367,38 @@ void JsonParser::parse_obj_in_array(simdjson::ondemand::object line, int32_t par case simdjson::ondemand::json_type::string: { std::string_view value = cur_value.get_string(true); if (value.find(' ') != std::string::npos) { - node_id = m_archive_writer - ->add_node(node_id_stack.top(), NodeType::ClpString, cur_key); + if (m_archive_options.experimental) { + // TODO clpsls: check if any delim from schema exists + // Doesn't seem possible to get through the parser atm (could store + // separately... after parsing schema file). + + node_id = m_archive_writer->add_node( + node_id_stack.top(), + NodeType::LogMessage, + cur_key + ); + if (auto const result{parse_log_message(node_id, value)}; + result.has_error()) + { + // TODO clpsls: if parsing fails we could try to treat the string as a + // VarString + throw(std::runtime_error( + "parse_log_message failed with: " + result.error().message() + )); + } + } else { + node_id = m_archive_writer->add_node( + node_id_stack.top(), + NodeType::ClpString, + cur_key + ); + m_current_parsed_message.add_unordered_value(value); + } } else { node_id = m_archive_writer ->add_node(node_id_stack.top(), NodeType::VarString, cur_key); + m_current_parsed_message.add_unordered_value(value); } - m_current_parsed_message.add_unordered_value(value); m_current_schema.insert_unordered(node_id); break; } @@ -409,11 +497,31 @@ void JsonParser::parse_array(simdjson::ondemand::array array, int32_t parent_nod case simdjson::ondemand::json_type::string: { std::string_view value = cur_value.get_string(true); if (value.find(' ') != std::string::npos) { - node_id = m_archive_writer->add_node(parent_node_id, NodeType::ClpString, ""); + if (m_archive_options.experimental) { + // TODO clpsls: check if any delim from schema exists + // Doesn't seem possible to get through the parser atm (could store + // separately... after parsing schema file). + + node_id = m_archive_writer + ->add_node(parent_node_id, NodeType::LogMessage, ""); + if (auto const result{parse_log_message(node_id, value)}; + result.has_error()) + { + // TODO clpsls: if parsing fails we could try to treat the string as a + // VarString + throw(std::runtime_error( + "parse_log_message failed with: " + result.error().message() + )); + } + } else { + node_id = m_archive_writer + ->add_node(parent_node_id, NodeType::ClpString, ""); + m_current_parsed_message.add_unordered_value(value); + } } else { node_id = m_archive_writer->add_node(parent_node_id, NodeType::VarString, ""); + m_current_parsed_message.add_unordered_value(value); } - m_current_parsed_message.add_unordered_value(value); m_current_schema.insert_unordered(node_id); break; } @@ -572,17 +680,43 @@ void JsonParser::parse_line( encoding_id ); m_current_parsed_message.add_value(node_id, encoding_id, timestamp); + m_current_schema.insert_ordered(node_id); } else if (value.find(' ') != std::string::npos) { - node_id = m_archive_writer - ->add_node(node_id_stack.top(), NodeType::ClpString, cur_key); - m_current_parsed_message.add_value(node_id, value); + if (m_archive_options.experimental) { + // TODO clpsls: check if any delim from schema exists + // Doesn't seem possible to get through the parser atm (could store + // separately... after parsing schema file). + + node_id = m_archive_writer->add_node( + node_id_stack.top(), + NodeType::LogMessage, + cur_key + ); + if (auto const result{parse_log_message(node_id, value)}; + result.has_error()) + { + // TODO clpsls: if parsing fails we could try to treat the string as a + // VarString + throw(std::runtime_error( + "parse_log_message failed with: " + result.error().message() + )); + } + m_current_schema.insert_unordered(node_id); + } else { + node_id = m_archive_writer->add_node( + node_id_stack.top(), + NodeType::ClpString, + cur_key + ); + m_current_parsed_message.add_value(node_id, value); + m_current_schema.insert_ordered(node_id); + } } else { node_id = m_archive_writer ->add_node(node_id_stack.top(), NodeType::VarString, cur_key); m_current_parsed_message.add_value(node_id, value); + m_current_schema.insert_ordered(node_id); } - - m_current_schema.insert_ordered(node_id); break; } case simdjson::ondemand::json_type::boolean: { @@ -1337,4 +1471,239 @@ bool JsonParser::check_and_log_curl_error( } return false; } + +// TODO clpsls: variable repetition inside a log message +// It is possible to find multiple variables of the same type (therefore having the same name) +// inside the a single log event. This would result in the same node id in the clps schema tree. It +// is invalid JSON to have the same key in an object meaning we either need to create an array of +// values for the key or append to the key (variable type name / token name) to make it unique. +// Storing as an array complicates (named) search as the type of a variable's node would now be +// T|Array[T] (rather than just T). +// +// If it is possible to efficiently search all keys starting with a prefix, we could uniquely store +// each variable instance by appending to the key name (e.g. a node in a log message with no +// repetition would be named "var" while nodes in a message with repetition would be named var.0 +// var.1, ..., var.n). +// +// +// TODO clpsls: capture group repetition +// Similarly, to variable repetition storing the capture group variable node as an array in cases +// with repetition creates a node type of T|Array[T]. +// +// TODO clpsls: +// Storing both the full match and capture groups creates potential duplication of the capture +// group's substring. +// One option is to re-write the full match similar to a logtype where the capture groups are +// replaced with encoded placeholders. The full match would need to be rebuilt using its capture +// groups to display. +// Additionally wildcard search must know to avoid searching both the full match and capture groups. +// Using placeholders allows for rebuilding the variable (and log message) through node +// position/ordering as the placeholders in the full match enable you to know how many subsequent +// nodes are captures. Otherwise we either need to specially type the node (e.g. VarString and +// CaptureString) or do some other indexing/checking. +// +// ... Storing as a separate type seems the most sensible. +// There is a possible trade-off where de-duplicating the capture group string using placeholders in +// the full match node improves compression ration, but slows down search in certain cases as you +// maybe need to rebuild the full match's value. + +// Because the variable dictionary is shared between unstructured and structured logs, but the +// variable stats are only for unstructured logs it is possible to have "holes" in the variable +// stats array. + +// TODO clpsls: probably want string view API to ls +auto JsonParser::parse_log_message(int32_t parent_node_id, std::string_view view) + -> ystdlib::error_handling::Result { + static constexpr std::string_view cFullMatch{"FullMatch"}; + static constexpr std::string_view cLogType{"LogType"}; + + m_log_surgeon_parser->reset(); + size_t offset{}; + // TODO clpsls: add string_view api to log surgeon + std::string str{view}; + if (log_surgeon::ErrorCode::Success + != m_log_surgeon_parser->parse_next_event(str.data(), str.size(), offset, true)) + { + return ClpsErrorCode{ClpsErrorCodeEnum::Failure}; + } + + auto msg_start{m_current_schema.start_unordered_object(NodeType::LogMessage)}; + + auto const& log_parser{m_log_surgeon_parser->get_log_parser()}; + auto const& event{log_parser.get_log_event_view()}; + auto const& log_buf = event.get_log_output_buffer(); + + clp_s::LogTypeDictionaryEntry logtype_dict_entry{}; + logtype_dict_entry.reserve_constant_length(view.size()); + auto starting_token_idx{log_buf->has_timestamp() ? 0 : 1}; + for (auto token_idx{starting_token_idx}; token_idx < log_buf->pos(); token_idx++) { + auto token_view{log_buf->get_token(token_idx)}; + auto const token_type{token_view.get_type_ids()->at(0)}; + // TODO clpsls: if we've seen the token_name already we either need to append to it or store + // as an array... + auto const token_name{log_parser.get_id_symbol(token_type)}; + + // handle delim before token (TODO clpsls: fix/hide this in log surgeon) + if (log_buf->has_delimiters() && (log_buf->has_timestamp() || token_idx > 1) + && token_type != static_cast(log_surgeon::SymbolId::TokenUncaughtString) + && token_type != static_cast(log_surgeon::SymbolId::TokenNewline)) + { + logtype_dict_entry.encode_constant(token_view.get_delimiter()); + token_view.increment_start_pos(); + } + + SPDLOG_INFO( + "[clpsls] token name: {} ({}) value: {}", + token_name, + token_type, + token_view.to_string() + ); + switch (token_type) { + case static_cast(log_surgeon::SymbolId::TokenNewline): + case static_cast(log_surgeon::SymbolId::TokenUncaughtString): { + logtype_dict_entry.encode_constant(token_view.to_string()); + break; + } + case static_cast(log_surgeon::SymbolId::TokenInt): { + int32_t node_id{}; + clp_s::encoded_variable_t encoded_var{}; + if (clp::EncodedVariableInterpreter::convert_string_to_representable_integer_var( + token_view.to_string(), + encoded_var + )) + { + node_id = m_archive_writer + ->add_node(parent_node_id, NodeType::Integer, token_name); + m_current_parsed_message.add_unordered_value(encoded_var); + } else { + node_id = m_archive_writer + ->add_node(parent_node_id, NodeType::VarString, token_name); + m_current_parsed_message.add_unordered_value(token_view.to_string()); + } + m_current_schema.insert_unordered(node_id); + logtype_dict_entry.add_schema_var(); + break; + } + case static_cast(log_surgeon::SymbolId::TokenFloat): { + int32_t node_id{}; + auto token_str{token_view.to_string()}; + auto const float_format_result{get_float_encoding(token_str)}; + if (false == float_format_result.has_error() + && round_trip_is_identical( + token_str, + std::stod(token_str), + float_format_result.value() + )) + { + m_current_parsed_message.add_unordered_value( + std::stod(token_str), + float_format_result.value() + ); + node_id = m_archive_writer->add_node( + parent_node_id, + NodeType::FormattedFloat, + token_name + ); + } else { + m_current_parsed_message.add_unordered_value(token_str); + node_id = m_archive_writer->add_node( + parent_node_id, + NodeType::DictionaryFloat, + token_name + ); + } + m_current_schema.insert_unordered(node_id); + logtype_dict_entry.add_schema_var(); + break; + } + default: { + auto const& lexer{event.get_log_parser().m_lexer}; + auto capture_ids{lexer.get_capture_ids_from_rule_id(token_type)}; + if (false == capture_ids.has_value()) { + logtype_dict_entry.add_schema_var(); + m_current_parsed_message.add_unordered_value(token_view.to_string()); + m_current_schema.insert_unordered(m_archive_writer->add_node( + parent_node_id, + NodeType::VarString, + token_name + )); + YSTDLIB_ERROR_HANDLING_TRYV(m_archive_writer->update_var_stats( + token_view.to_string_view(), + token_name + )); + break; + } + + auto capture_node_id{ + m_archive_writer->add_node(parent_node_id, NodeType::CaptureVar, token_name) + }; + auto capture_start{m_current_schema.start_unordered_object(NodeType::CaptureVar)}; + + logtype_dict_entry.add_schema_var(); + // clpsls TODO: remove full match completely? rebuild on demand? + m_current_parsed_message.add_unordered_value(token_view.to_string()); + m_current_schema.insert_unordered( + m_archive_writer->add_node(capture_node_id, NodeType::VarString, cFullMatch) + ); + YSTDLIB_ERROR_HANDLING_TRYV( + m_archive_writer->update_var_stats(token_view.to_string_view(), cFullMatch) + ); + + for (auto const capture_id : capture_ids.value()) { + auto const register_ids{lexer.get_reg_ids_from_capture_id(capture_id)}; + if (false == register_ids.has_value()) { + return ClpsErrorCode{ClpsErrorCodeEnum::Failure}; + } + + auto const [start_reg_id, end_reg_id]{register_ids.value()}; + auto const start_positions{token_view.get_reversed_reg_positions(start_reg_id)}; + auto const end_positions{token_view.get_reversed_reg_positions(end_reg_id)}; + + // TODO clpsls: change to store as an array? + auto capture_name{lexer.m_id_symbol.at(capture_id)}; + if (false == start_positions.empty() && -1 < start_positions[0] + && false == end_positions.empty() && -1 < end_positions[0]) + { + auto capture_view{token_view}; + capture_view.set_start_pos(start_positions[0]); + capture_view.set_end_pos(end_positions[0]); + + logtype_dict_entry.add_schema_var(); + m_current_parsed_message.add_unordered_value(capture_view.to_string()); + m_current_schema.insert_unordered(m_archive_writer->add_node( + capture_node_id, + NodeType::VarString, + capture_name + )); + YSTDLIB_ERROR_HANDLING_TRYV(m_archive_writer->update_var_stats( + token_view.to_string_view(), + capture_name + )); + SPDLOG_INFO( + "[clpsls]\tcapture name: {} value: {}", + capture_name, + capture_view.to_string() + ); + } + } + + m_current_schema.end_unordered_object(capture_start); + m_current_schema.insert_unordered(capture_node_id); + break; + } + } + } + + if (logtype_dict_entry.get_value().empty()) { + return ClpsErrorCode{ClpsErrorCodeEnum::Failure}; + } + + m_current_parsed_message.add_unordered_value(logtype_dict_entry); + m_current_schema.insert_unordered( + m_archive_writer->add_node(parent_node_id, NodeType::LogType, cLogType) + ); + YSTDLIB_ERROR_HANDLING_TRYV(m_archive_writer->update_logtype_stats(logtype_dict_entry)); + m_current_schema.end_unordered_object(msg_start); + return ystdlib::error_handling::success(); +} } // namespace clp_s diff --git a/components/core/src/clp_s/JsonParser.hpp b/components/core/src/clp_s/JsonParser.hpp index 82f8d04786..e0cdca9237 100644 --- a/components/core/src/clp_s/JsonParser.hpp +++ b/components/core/src/clp_s/JsonParser.hpp @@ -12,7 +12,9 @@ #include #include +#include #include +#include #include #include @@ -41,6 +43,8 @@ struct JsonParserOption { bool retain_float_format{false}; bool single_file_archive{false}; NetworkAuthOption network_auth{}; + bool experimental{false}; + std::optional log_surgeon_schema_path; }; class JsonParser { @@ -221,6 +225,14 @@ class JsonParser { static bool check_and_log_curl_error(Path const& path, std::shared_ptr reader); + /** + * Parse an unstructured log message using log surgeon and structure it in JSON. + * @param str + * @return + */ + auto parse_log_message(int32_t parent_node_id, std::string_view view) + -> ystdlib::error_handling::Result; + std::vector m_input_paths; NetworkAuthOption m_network_auth{}; @@ -246,6 +258,9 @@ class JsonParser { m_autogen_ir_node_to_archive_node_id_mapping; std::vector m_archive_stats; + + // clpsls + std::unique_ptr m_log_surgeon_parser; }; } // namespace clp_s diff --git a/components/core/src/clp_s/ParsedMessage.hpp b/components/core/src/clp_s/ParsedMessage.hpp index 45ac3e0304..4f1aca5ec4 100644 --- a/components/core/src/clp_s/ParsedMessage.hpp +++ b/components/core/src/clp_s/ParsedMessage.hpp @@ -7,6 +7,9 @@ #include #include #include +#include + +#include #include "../clp/ffi/EncodedTextAst.hpp" #include "Defs.hpp" @@ -24,7 +27,8 @@ class ParsedMessage { clp::ffi::FourByteEncodedTextAst, bool, std::pair, - std::pair>; + std::pair, + clp_s::LogTypeDictionaryEntry>; // Constructor ParsedMessage() : m_schema_id(-1) {} diff --git a/components/core/src/clp_s/SchemaReader.cpp b/components/core/src/clp_s/SchemaReader.cpp index a2a4ab6dcb..72aea0de7e 100644 --- a/components/core/src/clp_s/SchemaReader.cpp +++ b/components/core/src/clp_s/SchemaReader.cpp @@ -3,6 +3,12 @@ #include #include +#include +#include +#include + +#include + #include "archive_constants.hpp" #include "BufferViewReader.hpp" #include "Schema.hpp" @@ -465,6 +471,7 @@ size_t SchemaReader::generate_structured_array_template( break; } case NodeType::ClpString: + case NodeType::LogType: case NodeType::VarString: { m_json_serializer.add_op(JsonSerializer::Op::AddStringValue); m_reordered_columns.push_back(m_columns[column_idx++]); @@ -556,6 +563,7 @@ size_t SchemaReader::generate_structured_object_template( break; } case NodeType::ClpString: + case NodeType::LogType: case NodeType::VarString: { m_json_serializer.add_op(JsonSerializer::Op::AddStringField); m_reordered_columns.push_back(m_columns[column_idx++]); @@ -648,6 +656,16 @@ void SchemaReader::generate_json_template(int32_t id) { m_json_serializer.add_op(JsonSerializer::Op::EndArray); break; } + case NodeType::LogMessage: { + if (auto const result{generate_log_message_template(child_global_id)}; + result.has_error()) + { + throw(std::runtime_error( + "generate_log_message_template failed with: " + result.error().message() + )); + } + break; + } case NodeType::DeltaInteger: case NodeType::Integer: { m_json_serializer.add_op(JsonSerializer::Op::AddIntField); @@ -671,8 +689,9 @@ void SchemaReader::generate_json_template(int32_t id) { break; } case NodeType::ClpString: - case NodeType::VarString: - case NodeType::DateString: { + case NodeType::DateString: + case NodeType::LogType: + case NodeType::VarString: { m_json_serializer.add_op(JsonSerializer::Op::AddStringField); m_reordered_columns.push_back(m_column_map[child_global_id]); break; @@ -682,10 +701,174 @@ void SchemaReader::generate_json_template(int32_t id) { m_json_serializer.add_special_key(key); break; } + case NodeType::CaptureVar: { + throw(std::runtime_error( + "generate_json_template found CaptureVar outside of LogMessage." + )); + } + case NodeType::Metadata: + case NodeType::Unknown: { + break; + } + } + } +} + +auto SchemaReader::generate_log_message_template(int32_t log_msg_id) + -> ystdlib::error_handling::Result { + auto log_msg_it{m_global_id_to_unordered_object.find(log_msg_id)}; + if (m_global_id_to_unordered_object.end() == log_msg_it) { + return ClpsErrorCode{ClpsErrorCodeEnum::Failure}; + } + + m_json_serializer.add_op(JsonSerializer::Op::BeginObject); + m_json_serializer.add_special_key(m_global_schema_tree->get_node(log_msg_id).get_key_name()); + + auto column_idx{log_msg_it->second.first}; + auto const schema{log_msg_it->second.second}; + for (size_t schema_idx{0}; schema_idx < schema.size(); schema_idx++) { + auto const global_column_id{schema[schema_idx]}; + if (Schema::schema_entry_is_unordered_object(global_column_id)) { + if (Schema::get_unordered_object_type(global_column_id) != NodeType::CaptureVar) { + return ClpsErrorCode{ClpsErrorCodeEnum::Unsupported}; + } + + auto length{Schema::get_unordered_object_length(global_column_id)}; + auto const sub_object_schema{schema.subspan(schema_idx + 1, length)}; + auto const capture_id{m_global_schema_tree->find_matching_subtree_root_in_subtree( + log_msg_id, + get_first_column_in_span(sub_object_schema), + NodeType::CaptureVar + )}; + column_idx = YSTDLIB_ERROR_HANDLING_TRYX(generate_capture_var_template(capture_id)); + schema_idx += length; + } else { + auto const& node{m_global_schema_tree->get_node(global_column_id)}; + switch (node.get_type()) { + case NodeType::DeltaInteger: + case NodeType::Integer: { + m_json_serializer.add_op(JsonSerializer::Op::AddIntField); + m_reordered_columns.push_back(m_columns[column_idx]); + column_idx++; + break; + } + case NodeType::Float: { + m_json_serializer.add_op(JsonSerializer::Op::AddFloatField); + m_reordered_columns.push_back(m_columns[column_idx]); + column_idx++; + break; + } + case NodeType::FormattedFloat: + case NodeType::DictionaryFloat: { + m_json_serializer.add_op(JsonSerializer::Op::AddFormattedFloatField); + m_reordered_columns.push_back(m_columns[column_idx]); + column_idx++; + break; + } + case NodeType::Boolean: { + m_json_serializer.add_op(JsonSerializer::Op::AddBoolField); + m_reordered_columns.push_back(m_columns[column_idx]); + column_idx++; + break; + } + case NodeType::LogType: + case NodeType::VarString: { + m_json_serializer.add_op(JsonSerializer::Op::AddStringField); + m_reordered_columns.push_back(m_columns[column_idx]); + column_idx++; + break; + } + case NodeType::CaptureVar: + case NodeType::LogMessage: + case NodeType::Object: + case NodeType::StructuredArray: + case NodeType::ClpString: + case NodeType::NullValue: + case NodeType::DateString: + case NodeType::UnstructuredArray: + case NodeType::Metadata: + case NodeType::Unknown: { + break; + } + } + } + } + m_json_serializer.add_op(JsonSerializer::Op::EndObject); + return column_idx; +} + +auto SchemaReader::generate_capture_var_template(int32_t capture_id) + -> ystdlib::error_handling::Result { + auto capture_it{m_global_id_to_unordered_object.find(capture_id)}; + if (m_global_id_to_unordered_object.end() == capture_it) { + return ClpsErrorCode{ClpsErrorCodeEnum::Failure}; + } + + m_json_serializer.add_op(JsonSerializer::Op::BeginObject); + m_json_serializer.add_special_key(m_global_schema_tree->get_node(capture_id).get_key_name()); + + auto column_idx{capture_it->second.first}; + auto const schema{capture_it->second.second}; + for (size_t schema_idx{0}; schema_idx < schema.size(); schema_idx++) { + auto const global_column_id{schema[schema_idx]}; + if (Schema::schema_entry_is_unordered_object(global_column_id)) { + return ClpsErrorCode{ClpsErrorCodeEnum::Unsupported}; + // TODO clpsls support nested capture groups + // if (Schema::get_unordered_object_type(global_column_id) != NodeType::CaptureVar) { + // return ClpsErrorCode{ClpsErrorCodeEnum::Unsupported}; + // } + } + auto const& node{m_global_schema_tree->get_node(global_column_id)}; + switch (node.get_type()) { + case NodeType::DeltaInteger: + case NodeType::Integer: { + m_json_serializer.add_op(JsonSerializer::Op::AddIntField); + m_reordered_columns.push_back(m_columns[column_idx]); + column_idx++; + break; + } + case NodeType::Float: { + m_json_serializer.add_op(JsonSerializer::Op::AddFloatField); + m_reordered_columns.push_back(m_columns[column_idx]); + column_idx++; + break; + } + case NodeType::FormattedFloat: + case NodeType::DictionaryFloat: { + m_json_serializer.add_op(JsonSerializer::Op::AddFormattedFloatField); + m_reordered_columns.push_back(m_columns[column_idx]); + column_idx++; + break; + } + case NodeType::Boolean: { + m_json_serializer.add_op(JsonSerializer::Op::AddBoolField); + m_reordered_columns.push_back(m_columns[column_idx]); + column_idx++; + break; + } + case NodeType::LogType: + case NodeType::VarString: { + m_json_serializer.add_op(JsonSerializer::Op::AddStringField); + m_reordered_columns.push_back(m_columns[column_idx]); + column_idx++; + break; + } + case NodeType::CaptureVar: + case NodeType::LogMessage: + case NodeType::Object: + case NodeType::StructuredArray: + case NodeType::ClpString: + case NodeType::NullValue: + case NodeType::DateString: + case NodeType::UnstructuredArray: case NodeType::Metadata: - case NodeType::Unknown: + case NodeType::Unknown: { + return ClpsErrorCode{ClpsErrorCodeEnum::Unsupported}; break; + } } } + m_json_serializer.add_op(JsonSerializer::Op::EndObject); + return column_idx; } } // namespace clp_s diff --git a/components/core/src/clp_s/SchemaReader.hpp b/components/core/src/clp_s/SchemaReader.hpp index 33cb75b285..d919e2f3ce 100644 --- a/components/core/src/clp_s/SchemaReader.hpp +++ b/components/core/src/clp_s/SchemaReader.hpp @@ -282,6 +282,24 @@ class SchemaReader { size_t generate_structured_object_template(int32_t id, size_t column_start, std::span schema); + /** + * Generates a json template for a LogMessage. + * @param root_id Node ID for the root of the LogMessage object. + * @return A Result containing the index of the next reader in m_columns after those consumed by + * this object. + */ + auto generate_log_message_template(int32_t log_msg_id) + -> ystdlib::error_handling::Result; + + /** + * Generates a json template for a CaptureVar. + * @param root_id Node ID for the root of the CaptureVar object. + * @return A Result containing the index of the next reader in m_columns after those consumed by + * this object. + */ + auto generate_capture_var_template(int32_t capture_id) + -> ystdlib::error_handling::Result; + /** * Finds the common root of the subtree containing cur_root and next_root, and adds brackets * and keys to m_json_serializer as necessary so that the json object is correct between the diff --git a/components/core/src/clp_s/SchemaTree.cpp b/components/core/src/clp_s/SchemaTree.cpp index bfb5508ec2..b1d52e0403 100644 --- a/components/core/src/clp_s/SchemaTree.cpp +++ b/components/core/src/clp_s/SchemaTree.cpp @@ -1,5 +1,7 @@ #include "SchemaTree.hpp" +#include + #include "archive_constants.hpp" #include "FileWriter.hpp" #include "search/ast/Literal.hpp" @@ -18,6 +20,7 @@ auto node_to_literal_type(NodeType type) -> clp_s::search::ast::LiteralType { case NodeType::DictionaryFloat: return clp_s::search::ast::LiteralType::FloatT; case NodeType::ClpString: + case NodeType::LogType: return clp_s::search::ast::LiteralType::ClpStringT; case NodeType::VarString: return clp_s::search::ast::LiteralType::VarStringT; diff --git a/components/core/src/clp_s/SchemaTree.hpp b/components/core/src/clp_s/SchemaTree.hpp index deafcdafef..0addfe8a04 100644 --- a/components/core/src/clp_s/SchemaTree.hpp +++ b/components/core/src/clp_s/SchemaTree.hpp @@ -28,6 +28,24 @@ namespace clp_s { * demarcate data needed by the implementation that is not part of the log record. In particular, * the implementation may create a special subtree of the MPT which contains fields used to record * things like original log order. + * + * TODO clpsls + * + * LogMessage: Stores a structured/parsed version of a log message (currently detected as a string + * containing delimiters). It is made up of a single LogType node and as many CaptureVar and + * primitive type nodes as appear in the message. + * + * LogType: Functionally similar to a ClpString, but has no variable dictionary component as the + * variables are stored in their own nodes unlike a ClpsString. The logtype dictionary component is + * identical. + * + * CaptureVar: Contains a VarString node representing the entire log surgeon schema variable (a + * "full match" of the regex pattern) and any number of primitive type nodes for each named capture + * group in the variable. + * + * The actual captures are stored in nodes of their appropriate type (currently just VarString). It + * may be improve compression to store the "full match" of a variable similar to a log type using + * placeholders for the capture groups. */ enum class NodeType : uint8_t { Integer, @@ -44,6 +62,9 @@ enum class NodeType : uint8_t { DeltaInteger, FormattedFloat, DictionaryFloat, + LogMessage = 100, + LogType, + CaptureVar, Unknown = std::underlying_type::type(~0ULL) }; diff --git a/components/core/src/clp_s/archive_constants.hpp b/components/core/src/clp_s/archive_constants.hpp index 1bba06ecf5..b94407d9e5 100644 --- a/components/core/src/clp_s/archive_constants.hpp +++ b/components/core/src/clp_s/archive_constants.hpp @@ -24,6 +24,9 @@ constexpr char cArchiveArrayDictFile[] = "/array.dict"; constexpr char cArchiveLogDictFile[] = "/log.dict"; constexpr char cArchiveVarDictFile[] = "/var.dict"; +// Experimental stats +constexpr std::string_view cArchiveStatsFile{"/stats"}; + // Schema tree constants constexpr char cRootNodeName[] = ""; constexpr int32_t cRootNodeId = -1; diff --git a/components/core/src/clp_s/clp-s.cpp b/components/core/src/clp_s/clp-s.cpp index cdb16f9c9b..c5cc321621 100644 --- a/components/core/src/clp_s/clp-s.cpp +++ b/components/core/src/clp_s/clp-s.cpp @@ -1,23 +1,27 @@ #include #include #include -#include #include #include #include #include #include +#include #include #include #include #include +#include + +#include #include "../clp/CurlGlobalInstance.hpp" #include "../clp/ir/constants.hpp" #include "../clp/streaming_archive/ArchiveMetadata.hpp" #include "../reducer/network_utils.hpp" #include "CommandLineArguments.hpp" +#include "Defs.h" #include "Defs.hpp" #include "JsonConstructor.hpp" #include "JsonParser.hpp" @@ -66,16 +70,19 @@ void decompress_archive(clp_s::JsonConstructorOption const& json_constructor_opt * @param command_line_arguments * @param archive_reader * @param expr A copy of the search AST which may be modified - * @param reducer_socket_fd * @return Whether the search succeeded */ bool search_archive( CommandLineArguments const& command_line_arguments, std::shared_ptr const& archive_reader, - std::shared_ptr expr, - int reducer_socket_fd + std::shared_ptr expr ); +/** + * @return -1 if no experimental query found, 0 on success, >0 on failure + */ +auto handle_experimental_queries(CommandLineArguments const& cli_args) -> int; + bool compress(CommandLineArguments const& command_line_arguments) { auto archives_dir = std::filesystem::path(command_line_arguments.get_archives_dir()); @@ -105,6 +112,8 @@ bool compress(CommandLineArguments const& command_line_arguments) { option.single_file_archive = command_line_arguments.get_single_file_archive(); option.structurize_arrays = command_line_arguments.get_structurize_arrays(); option.record_log_order = command_line_arguments.get_record_log_order(); + option.experimental = command_line_arguments.experimental(); + option.log_surgeon_schema_path = command_line_arguments.get_log_surgeon_schema_path(); clp_s::JsonParser parser(option); if (false == parser.ingest()) { @@ -123,8 +132,7 @@ void decompress_archive(clp_s::JsonConstructorOption const& json_constructor_opt bool search_archive( CommandLineArguments const& command_line_arguments, std::shared_ptr const& archive_reader, - std::shared_ptr expr, - int reducer_socket_fd + std::shared_ptr expr ) { auto const& query = command_line_arguments.get_query(); @@ -225,51 +233,8 @@ bool search_archive( projection->resolve_columns(archive_reader->get_schema_tree()); archive_reader->set_projection(projection); - std::unique_ptr output_handler; - try { - switch (command_line_arguments.get_output_handler_type()) { - case CommandLineArguments::OutputHandlerType::File: - output_handler = std::make_unique( - command_line_arguments.get_file_output_path(), - true - ); - break; - case CommandLineArguments::OutputHandlerType::Network: - output_handler = std::make_unique( - command_line_arguments.get_network_dest_host(), - command_line_arguments.get_network_dest_port() - ); - break; - case CommandLineArguments::OutputHandlerType::Reducer: - if (command_line_arguments.do_count_results_aggregation()) { - output_handler = std::make_unique(reducer_socket_fd); - } else if (command_line_arguments.do_count_by_time_aggregation()) { - output_handler = std::make_unique( - reducer_socket_fd, - command_line_arguments.get_count_by_time_bucket_size() - ); - } else { - SPDLOG_ERROR("Unhandled aggregation type."); - return false; - } - break; - case CommandLineArguments::OutputHandlerType::ResultsCache: - output_handler = std::make_unique( - command_line_arguments.get_mongodb_uri(), - command_line_arguments.get_mongodb_collection(), - command_line_arguments.get_batch_size(), - command_line_arguments.get_max_num_results() - ); - break; - case CommandLineArguments::OutputHandlerType::Stdout: - output_handler = std::make_unique(); - break; - default: - SPDLOG_ERROR("Unhandled OutputHandlerType."); - return false; - } - } catch (std::exception const& e) { - SPDLOG_ERROR("Failed to create output handler - {}", e.what()); + auto output_handler{command_line_arguments.create_output_handler()}; + if (output_handler.has_error()) { return false; } @@ -278,11 +243,75 @@ bool search_archive( match_pass, expr, archive_reader, - std::move(output_handler), + std::move(output_handler.value()), command_line_arguments.get_ignore_case() ); return output.filter(); } + +auto handle_experimental_queries(CommandLineArguments const& cli_args) -> int { + auto const& query = cli_args.get_query(); + if (CommandLineArguments::ExperimentalQueries::cLogTypeStatsQuery != query + && CommandLineArguments::ExperimentalQueries::cVariableStatsQuery != query) + { + return -1; + } + auto output_handler{cli_args.create_output_handler()}; + if (output_handler.has_error()) { + SPDLOG_ERROR("Failed to create output handler - {}", output_handler.error().message()); + return 1; + } + auto archive_reader = std::make_shared(); + for (auto const& input_path : cli_args.get_input_paths()) { + try { + archive_reader->open( + input_path, + clp_s::ArchiveReader::Options{ + cli_args.get_network_auth(), + cli_args.experimental() + } + ); + } catch (std::exception const& e) { + SPDLOG_ERROR("Failed to open archive - {}", e.what()); + return 2; + } + archive_reader->read_dictionaries_and_metadata(); + if (CommandLineArguments::ExperimentalQueries::cLogTypeStatsQuery == query) { + auto logtype_dict{archive_reader->get_log_type_dictionary()}; + auto logtype_stats{archive_reader->get_experimental_stats().m_logtype_stats}; + for (clp::logtype_dictionary_id_t i{0}; i < logtype_stats.size(); ++i) { + auto stat{logtype_stats.at(i)}; + auto message{fmt::format( + "{{\"id\":{},\"count\":{},\"logtype\":\"{}\"}}\n", + i, + stat.get_count(), + logtype_dict->get_value(i) + )}; + output_handler.value()->write(message); + } + } else if (CommandLineArguments::ExperimentalQueries::cVariableStatsQuery == query) { + auto var_dict{archive_reader->get_variable_dictionary()}; + auto var_stats{archive_reader->get_experimental_stats().m_var_stats}; + for (clp::variable_dictionary_id_t i{0}; i < var_stats.size(); ++i) { + auto stat{var_stats.at(i)}; + auto message{fmt::format( + "{{\"id\":{},\"count\":{},\"type\":\"{}\",\"variable\":\"{}\"}}\n", + i, + stat.get_count(), + stat.get_type(), + var_dict->get_value(i) + )}; + output_handler.value()->write(message); + } + } + if (auto ec{output_handler.value()->flush()}; clp_s::ErrorCode::ErrorCodeSuccess != ec) { + SPDLOG_ERROR("Failed to flush output handler. Error code: {}", std::to_string(ec)); + return 3; + } + archive_reader->close(); + } + return 0; +} } // namespace int main(int argc, char const* argv[]) { @@ -344,6 +373,10 @@ int main(int argc, char const* argv[]) { } } else { auto const& query = command_line_arguments.get_query(); + if (auto const result{handle_experimental_queries(command_line_arguments)}; -1 < result) { + return result; + } + auto query_stream = std::istringstream(query); auto expr = kql::parse_kql_expression(query_stream); if (nullptr == expr) { @@ -355,30 +388,12 @@ int main(int argc, char const* argv[]) { return 1; } - int reducer_socket_fd{-1}; - if (command_line_arguments.get_output_handler_type() - == CommandLineArguments::OutputHandlerType::Reducer) - { - reducer_socket_fd = reducer::connect_to_reducer( - command_line_arguments.get_reducer_host(), - command_line_arguments.get_reducer_port(), - command_line_arguments.get_job_id() - ); - if (-1 == reducer_socket_fd) { - SPDLOG_ERROR("Failed to connect to reducer"); - return 1; - } - } - auto archive_reader = std::make_shared(); for (auto const& input_path : command_line_arguments.get_input_paths()) { if (std::string::npos != input_path.path.find(clp::ir::cIrFileExtension)) { - auto const result{clp_s::search_kv_ir_stream( - input_path, - command_line_arguments, - expr->copy(), - reducer_socket_fd - )}; + auto const result{ + clp_s::search_kv_ir_stream(input_path, command_line_arguments, expr->copy()) + }; if (false == result.has_error()) { continue; } @@ -421,19 +436,18 @@ int main(int argc, char const* argv[]) { } try { - archive_reader->open(input_path, command_line_arguments.get_network_auth()); + archive_reader->open( + input_path, + clp_s::ArchiveReader::Options{ + command_line_arguments.get_network_auth(), + command_line_arguments.experimental() + } + ); } catch (std::exception const& e) { SPDLOG_ERROR("Failed to open archive - {}", e.what()); return 1; } - if (false - == search_archive( - command_line_arguments, - archive_reader, - expr->copy(), - reducer_socket_fd - )) - { + if (false == search_archive(command_line_arguments, archive_reader, expr->copy())) { return 1; } archive_reader->close(); diff --git a/components/core/src/clp_s/indexer/CMakeLists.txt b/components/core/src/clp_s/indexer/CMakeLists.txt index ba21f50a26..bed547527e 100644 --- a/components/core/src/clp_s/indexer/CMakeLists.txt +++ b/components/core/src/clp_s/indexer/CMakeLists.txt @@ -86,6 +86,8 @@ set( ../DictionaryReader.hpp ../DictionaryEntry.cpp ../DictionaryEntry.hpp + ../ErrorCode.cpp + ../ErrorCode.hpp ../FileReader.cpp ../FileReader.hpp ../FileWriter.cpp @@ -126,6 +128,7 @@ set( if(CLP_BUILD_EXECUTABLES) add_executable(indexer ${INDEXER_SOURCES}) target_compile_features(indexer PRIVATE cxx_std_20) + target_include_directories(indexer PUBLIC ../..) target_link_libraries(indexer PRIVATE absl::flat_hash_map diff --git a/components/core/src/clp_s/indexer/IndexManager.cpp b/components/core/src/clp_s/indexer/IndexManager.cpp index 4b2882e29e..929063809d 100644 --- a/components/core/src/clp_s/indexer/IndexManager.cpp +++ b/components/core/src/clp_s/indexer/IndexManager.cpp @@ -46,7 +46,7 @@ void IndexManager::update_metadata(std::string const& dataset_name, Path const& m_mysql_index_storage->init(dataset_name, m_should_create_table); ArchiveReader archive_reader; - archive_reader.open(archive_path, NetworkAuthOption{}); + archive_reader.open(archive_path, ArchiveReader::Options{}); traverse_schema_tree_and_update_metadata( archive_reader.get_schema_tree(), diff --git a/components/core/src/clp_s/kv_ir_search.cpp b/components/core/src/clp_s/kv_ir_search.cpp index 4041231961..8abdb9aad9 100644 --- a/components/core/src/clp_s/kv_ir_search.cpp +++ b/components/core/src/clp_s/kv_ir_search.cpp @@ -52,14 +52,12 @@ class IrUnitHandler { // Factory function /** * @param command_line_arguments - * @param reducer_socket_fd * @return A result containing the created IrUnitHandler on success, or an error code indicating * the failure: * - KvIrSearchErrorEnum::UnsupportedOutputHandlerType if the output handler type is not * supported. */ - [[nodiscard]] static auto - create(CommandLineArguments const& command_line_arguments, int reducer_socket_fd) + [[nodiscard]] static auto create(CommandLineArguments const& command_line_arguments) -> ystdlib::error_handling::Result; // Delete copy constructor and assignment operator @@ -107,7 +105,6 @@ class IrUnitHandler { * @param stream_reader The stream reader to read the kv-pair IR stream from. * @param command_line_arguments * @param query - * @param reducer_socket_fd * @return A void result on success, or an error code indicating the failure: * - KvIrSearchErrorEnum::DeserializerCreationFailure if `clp::ffi::ir_stream::Deserializer::create` * failed. This specific error code is returned instead of propagating the return values of @@ -120,14 +117,11 @@ class IrUnitHandler { [[nodiscard]] auto deserialize_and_search_kv_ir_stream( clp::ReaderInterface& stream_reader, CommandLineArguments const& command_line_arguments, - std::shared_ptr query, - int reducer_socket_fd + std::shared_ptr query ) -> ystdlib::error_handling::Result; -auto IrUnitHandler::create( - CommandLineArguments const& command_line_arguments, - [[maybe_unused]] int reducer_socket_fd -) -> ystdlib::error_handling::Result { +auto IrUnitHandler::create(CommandLineArguments const& command_line_arguments) + -> ystdlib::error_handling::Result { switch (command_line_arguments.get_output_handler_type()) { case CommandLineArguments::OutputHandlerType::Stdout: break; @@ -194,8 +188,7 @@ auto IrUnitHandler::handle_log_event( auto deserialize_and_search_kv_ir_stream( clp::ReaderInterface& stream_reader, CommandLineArguments const& command_line_arguments, - std::shared_ptr query, - int reducer_socket_fd + std::shared_ptr query ) -> ystdlib::error_handling::Result { auto trivial_new_projected_schema_tree_node_callback = []([[maybe_unused]] bool is_auto_generated, @@ -205,9 +198,9 @@ auto deserialize_and_search_kv_ir_stream( using QueryHandlerType = clp::ffi::ir_stream::search:: QueryHandler; - auto ir_unit_handler{YSTDLIB_ERROR_HANDLING_TRYX( - IrUnitHandler::create(command_line_arguments, reducer_socket_fd) - )}; + auto ir_unit_handler{ + YSTDLIB_ERROR_HANDLING_TRYX(IrUnitHandler::create(command_line_arguments)) + }; auto query_handler{YSTDLIB_ERROR_HANDLING_TRYX( QueryHandlerType::create( trivial_new_projected_schema_tree_node_callback, @@ -237,8 +230,7 @@ auto deserialize_and_search_kv_ir_stream( auto search_kv_ir_stream( Path const& stream_path, CommandLineArguments const& command_line_arguments, - std::shared_ptr query, - int reducer_socket_fd + std::shared_ptr query ) -> ystdlib::error_handling::Result { if (false == command_line_arguments.get_projection_columns().empty()) { SPDLOG_ERROR("kv-ir search: Projection support is not implemented."); @@ -275,8 +267,7 @@ auto search_kv_ir_stream( YSTDLIB_ERROR_HANDLING_TRYV(deserialize_and_search_kv_ir_stream( decompressor, command_line_arguments, - std::move(query), - reducer_socket_fd + std::move(query) )); decompressor.close(); } catch (clp::TraceableException const& ex) { diff --git a/components/core/src/clp_s/kv_ir_search.hpp b/components/core/src/clp_s/kv_ir_search.hpp index 4d5d9e98a7..9c7488c9c9 100644 --- a/components/core/src/clp_s/kv_ir_search.hpp +++ b/components/core/src/clp_s/kv_ir_search.hpp @@ -28,7 +28,6 @@ using KvIrSearchError = ystdlib::error_handling::ErrorCode; * @param stream_path The path to the kv-pair IR stream. * @param command_line_arguments * @param query - * @param reducer_socket_fd * @return A void result on success, or an error code indicating the failure: * - KvIrSearchErrorEnum::ClpLegacyError if a `clp::TraceableException` is caught. * - KvIrSearchErrorEnum::CountSupportNotImplemented if count-related features are enabled. @@ -40,8 +39,7 @@ using KvIrSearchError = ystdlib::error_handling::ErrorCode; [[nodiscard]] auto search_kv_ir_stream( Path const& stream_path, CommandLineArguments const& command_line_arguments, - std::shared_ptr query, - int reducer_socket_fd + std::shared_ptr query ) -> ystdlib::error_handling::Result; } // namespace clp_s diff --git a/components/core/src/clp_s/search/QueryRunner.cpp b/components/core/src/clp_s/search/QueryRunner.cpp index 70e0774f06..1193322570 100644 --- a/components/core/src/clp_s/search/QueryRunner.cpp +++ b/components/core/src/clp_s/search/QueryRunner.cpp @@ -1027,6 +1027,7 @@ EvaluatedValue QueryRunner::constant_propagate(std::shared_ptr const } return EvaluatedValue::Unknown; } else if (auto filter = std::dynamic_pointer_cast(expr)) { + // TODO clpsls: skip if expression points to log type column if ((filter->get_operation() == FilterOperation::EXISTS || filter->get_operation() == FilterOperation::NEXISTS) && (!filter->get_column()->has_unresolved_tokens() diff --git a/components/core/src/clp_s/search/ast/CMakeLists.txt b/components/core/src/clp_s/search/ast/CMakeLists.txt index eda1c26a3f..ab1b00d5c1 100644 --- a/components/core/src/clp_s/search/ast/CMakeLists.txt +++ b/components/core/src/clp_s/search/ast/CMakeLists.txt @@ -45,6 +45,7 @@ set( CLP_S_CORE_SOURCES ../../archive_constants.hpp ../../Defs.hpp + ../../ErrorCode.cpp ../../ErrorCode.hpp ../../TraceableException.hpp ) diff --git a/components/core/tests/test-ParserWithUserSchema.cpp b/components/core/tests/test-ParserWithUserSchema.cpp index 3ab9bfad72..9ca64f0166 100644 --- a/components/core/tests/test-ParserWithUserSchema.cpp +++ b/components/core/tests/test-ParserWithUserSchema.cpp @@ -192,10 +192,11 @@ TEST_CASE("Test lexer", "[Search]") { auto [error_code, opt_token] = lexer.scan(parser_input_buffer); REQUIRE(error_code == log_surgeon::ErrorCode::Success); Token token{opt_token.value()}; - while (token.m_type_ids_ptr->at(0) != static_cast(log_surgeon::SymbolId::TokenEnd)) { + while (token.get_type_ids()->at(0) != static_cast(log_surgeon::SymbolId::TokenEnd)) { SPDLOG_INFO("token:" + token.to_string() + "\n"); SPDLOG_INFO( - "token.m_type_ids->back():" + lexer.m_id_symbol[token.m_type_ids_ptr->back()] + "\n" + "token.get_type_ids()->back():" + lexer.m_id_symbol[token.get_type_ids()->back()] + + "\n" ); auto [error_code, opt_token] = lexer.scan(parser_input_buffer); REQUIRE(error_code == log_surgeon::ErrorCode::Success); diff --git a/components/core/tests/test-clp_s-delta-encode-log-order.cpp b/components/core/tests/test-clp_s-delta-encode-log-order.cpp index 4cb129841a..eb2475c9f7 100644 --- a/components/core/tests/test-clp_s-delta-encode-log-order.cpp +++ b/components/core/tests/test-clp_s-delta-encode-log-order.cpp @@ -83,7 +83,7 @@ TEST_CASE("clp-s-delta-encode-log-order", "[clp-s][delta-encode-log-order]") { REQUIRE(1 == archive_paths.size()); clp_s::ArchiveReader archive_reader; - REQUIRE_NOTHROW(archive_reader.open(archive_paths.back(), clp_s::NetworkAuthOption{})); + REQUIRE_NOTHROW(archive_reader.open(archive_paths.back(), clp_s::ArchiveReader::Options{})); REQUIRE_NOTHROW(archive_reader.read_dictionaries_and_metadata()); REQUIRE_NOTHROW(archive_reader.open_packed_streams()); auto mpt = archive_reader.get_schema_tree(); diff --git a/components/core/tests/test-clp_s-end_to_end.cpp b/components/core/tests/test-clp_s-end_to_end.cpp index c31984f793..969b3761c8 100644 --- a/components/core/tests/test-clp_s-end_to_end.cpp +++ b/components/core/tests/test-clp_s-end_to_end.cpp @@ -68,7 +68,7 @@ void check_all_leaf_nodes_match_types(std::set const& types) { .source = clp_s::InputSource::Filesystem, .path = entry.path().string() }, - clp_s::NetworkAuthOption{} + clp_s::ArchiveReader::Options{} )); auto const schema_tree{archive_reader.get_schema_tree()}; REQUIRE(nullptr != schema_tree); diff --git a/components/core/tests/test-clp_s-range_index.cpp b/components/core/tests/test-clp_s-range_index.cpp index 024410aed1..c10bb06000 100644 --- a/components/core/tests/test-clp_s-range_index.cpp +++ b/components/core/tests/test-clp_s-range_index.cpp @@ -133,7 +133,7 @@ void read_and_check_archive_metadata(bool from_ir) { .source{clp_s::InputSource::Filesystem}, .path{entry.path().string()} }; - REQUIRE_NOTHROW(archive_reader.open(archive_path, clp_s::NetworkAuthOption{})); + REQUIRE_NOTHROW(archive_reader.open(archive_path, clp_s::ArchiveReader::Options{})); auto const& range_index = archive_reader.get_range_index(); check_archive_range_index(range_index, from_ir); REQUIRE_NOTHROW(archive_reader.close()); diff --git a/components/core/tests/test-clp_s-search.cpp b/components/core/tests/test-clp_s-search.cpp index 526d2d45a8..ebc12a4ffc 100644 --- a/components/core/tests/test-clp_s-search.cpp +++ b/components/core/tests/test-clp_s-search.cpp @@ -163,7 +163,7 @@ void search( .source{clp_s::InputSource::Filesystem}, .path{entry.path().string()} }; - archive_reader->open(archive_path, clp_s::NetworkAuthOption{}); + archive_reader->open(archive_path, clp_s::ArchiveReader::Options{}); auto archive_expr = expr->copy(); diff --git a/components/core/tests/test_schema_files/search_schema.txt b/components/core/tests/test_schema_files/search_schema.txt index 60d0c12f00..f49a6dbfa4 100644 --- a/components/core/tests/test_schema_files/search_schema.txt +++ b/components/core/tests/test_schema_files/search_schema.txt @@ -1,5 +1,5 @@ // Delimiters -delimiters: \r\n:,=!;%\? +delimiters: \r\n:,=!;%? // First set of variables timestamp:[0-9]{4}\-[0-9]{2}\-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2}(\.[0-9]{3}){0,1} diff --git a/taskfiles/deps/main.yaml b/taskfiles/deps/main.yaml index 927cedb2a1..7331a27c3b 100644 --- a/taskfiles/deps/main.yaml +++ b/taskfiles/deps/main.yaml @@ -367,8 +367,8 @@ tasks: - "-DCMAKE_INSTALL_MESSAGE=LAZY" - "-Dlog_surgeon_BUILD_TESTING=OFF" LIB_NAME: "log_surgeon" - TARBALL_SHA256: "4551ea50cd22e8423770fd66a167e1c86053b1f4957f72c582a2da93e7820210" - TARBALL_URL: "https://github.com/y-scope/log-surgeon/archive/840f262.tar.gz" + TARBALL_SHA256: "3a6a0dfdab40c0bed261b2afb45b915dae859ae410fd52769df4d53059453aa5" + TARBALL_URL: "https://github.com/y-scope/log-surgeon/archive/b1c4fea.tar.gz" lz4: internal: true