Skip to content
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions components/core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,7 @@ set(SOURCE_FILES_unitTest
tests/TestOutputCleaner.hpp
tests/test-BoundedReader.cpp
tests/test-BufferedFileReader.cpp
tests/test-clp_s-delta-encode-log-order.cpp
tests/test-clp_s-end_to_end.cpp
tests/test-clp_s-range_index.cpp
tests/test-clp_s-search.cpp
Expand Down
12 changes: 8 additions & 4 deletions components/core/src/clp_s/ArchiveReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,9 @@ BaseColumnReader* ArchiveReader::append_reader_column(SchemaReader& reader, int3
case NodeType::Integer:
column_reader = new Int64ColumnReader(column_id);
break;
case NodeType::DeltaInteger:
column_reader = new DeltaEncodedInt64ColumnReader(column_id);
break;
case NodeType::Float:
column_reader = new FloatColumnReader(column_id);
break;
Expand Down Expand Up @@ -238,6 +241,9 @@ void ArchiveReader::append_unordered_reader_columns(
case NodeType::Integer:
column_reader = new Int64ColumnReader(column_id);
break;
case NodeType::DeltaInteger:
column_reader = new DeltaEncodedInt64ColumnReader(column_id);
break;
case NodeType::Float:
column_reader = new FloatColumnReader(column_id);
break;
Expand Down Expand Up @@ -324,10 +330,8 @@ void ArchiveReader::initialize_schema_reader(
}
BaseColumnReader* column_reader = append_reader_column(reader, column_id);

if (column_id == m_log_event_idx_column_id
&& nullptr != dynamic_cast<Int64ColumnReader*>(column_reader))
{
reader.mark_column_as_log_event_idx(static_cast<Int64ColumnReader*>(column_reader));
if (column_id == m_log_event_idx_column_id) {
reader.mark_column_as_log_event_idx(column_reader);
}

if (should_extract_timestamp && column_reader && timestamp_column_ids.count(column_id) > 0)
Expand Down
3 changes: 3 additions & 0 deletions components/core/src/clp_s/ArchiveWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,9 @@ void ArchiveWriter::initialize_schema_writer(SchemaWriter* writer, Schema const&
case NodeType::DateString:
writer->append_column(new DateStringColumnWriter(id));
break;
case NodeType::DeltaInteger:
writer->append_column(new DeltaEncodedInt64ColumnWriter(id));
break;
case NodeType::Metadata:
case NodeType::NullValue:
case NodeType::Object:
Expand Down
37 changes: 37 additions & 0 deletions components/core/src/clp_s/ColumnReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,36 @@ std::variant<int64_t, double, std::string, uint8_t> Int64ColumnReader::extract_v
return m_values[cur_message];
}

void DeltaEncodedInt64ColumnReader::load(BufferViewReader& reader, uint64_t num_messages) {
m_values = reader.read_unaligned_span<int64_t>(num_messages);
if (num_messages > 0) {
m_cur_idx = 0;
m_cur_value = m_values[0];
}
}

int64_t DeltaEncodedInt64ColumnReader::get_value_at_idx(size_t idx) {
if (m_cur_idx == idx) {
return m_cur_value;
}
if (idx > m_cur_idx) {
for (; m_cur_idx < idx; ++m_cur_idx) {
m_cur_value += m_values[m_cur_idx + 1];
}
return m_cur_value;
}
for (; m_cur_idx > idx; --m_cur_idx) {
m_cur_value -= m_values[m_cur_idx];
}
return m_cur_value;
}

std::variant<int64_t, double, std::string, uint8_t> DeltaEncodedInt64ColumnReader::extract_value(
uint64_t cur_message
) {
return get_value_at_idx(cur_message);
}

void FloatColumnReader::load(BufferViewReader& reader, uint64_t num_messages) {
m_values = reader.read_unaligned_span<double>(num_messages);
}
Expand All @@ -25,6 +55,13 @@ Int64ColumnReader::extract_string_value_into_buffer(uint64_t cur_message, std::s
buffer.append(std::to_string(m_values[cur_message]));
}

void DeltaEncodedInt64ColumnReader::extract_string_value_into_buffer(
uint64_t cur_message,
std::string& buffer
) {
buffer.append(std::to_string(get_value_at_idx(cur_message)));
}

std::variant<int64_t, double, std::string, uint8_t> FloatColumnReader::extract_value(
uint64_t cur_message
) {
Expand Down
33 changes: 33 additions & 0 deletions components/core/src/clp_s/ColumnReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,39 @@ class Int64ColumnReader : public BaseColumnReader {
UnalignedMemSpan<int64_t> m_values;
};

class DeltaEncodedInt64ColumnReader : public BaseColumnReader {
public:
// Constructor
explicit DeltaEncodedInt64ColumnReader(int32_t id) : BaseColumnReader(id) {}

// Destructor
~DeltaEncodedInt64ColumnReader() override = default;

// Methods inherited from BaseColumnReader
void load(BufferViewReader& reader, uint64_t num_messages) override;

NodeType get_type() override { return NodeType::DeltaInteger; }

std::variant<int64_t, double, std::string, uint8_t> extract_value(
uint64_t cur_message
) override;

void extract_string_value_into_buffer(uint64_t cur_message, std::string& buffer) override;

private:
/**
* Gets the value stored at a given index by summing up the stored deltas between the requested
* index and the last requested index.
* @param idx
* @return The value stored at the requested index.
*/
int64_t get_value_at_idx(size_t idx);

UnalignedMemSpan<int64_t> m_values;
int64_t m_cur_value{};
size_t m_cur_idx{};
};

class FloatColumnReader : public BaseColumnReader {
public:
// Constructor
Expand Down
17 changes: 17 additions & 0 deletions components/core/src/clp_s/ColumnWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,23 @@ void Int64ColumnWriter::store(ZstdCompressor& compressor) {
compressor.write(reinterpret_cast<char const*>(m_values.data()), size);
}

size_t DeltaEncodedInt64ColumnWriter::add_value(ParsedMessage::variable_t& value) {
if (0 == m_values.size()) {
m_cur = std::get<int64_t>(value);
m_values.push_back(m_cur);
} else {
auto next = std::get<int64_t>(value);
m_values.push_back(next - m_cur);
m_cur = next;
}
return sizeof(int64_t);
}

void DeltaEncodedInt64ColumnWriter::store(ZstdCompressor& compressor) {
size_t size = m_values.size() * sizeof(int64_t);
compressor.write(reinterpret_cast<char const*>(m_values.data()), size);
}

size_t FloatColumnWriter::add_value(ParsedMessage::variable_t& value) {
m_values.push_back(std::get<double>(value));
return sizeof(double);
Expand Down
18 changes: 18 additions & 0 deletions components/core/src/clp_s/ColumnWriter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,24 @@ class Int64ColumnWriter : public BaseColumnWriter {
std::vector<int64_t> m_values;
};

class DeltaEncodedInt64ColumnWriter : public BaseColumnWriter {
public:
// Constructor
explicit DeltaEncodedInt64ColumnWriter(int32_t id) : BaseColumnWriter(id) {}

// Destructor
~DeltaEncodedInt64ColumnWriter() override = default;

// Methods inherited from BaseColumnWriter
size_t add_value(ParsedMessage::variable_t& value) override;

void store(ZstdCompressor& compressor) override;

private:
std::vector<int64_t> m_values;
int64_t m_cur{};
};

class FloatColumnWriter : public BaseColumnWriter {
public:
// Constructor
Expand Down
4 changes: 2 additions & 2 deletions components/core/src/clp_s/JsonParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,7 @@ bool JsonParser::parse() {
auto initialize_fields_for_archive = [&]() -> bool {
if (m_record_log_order) {
log_event_idx_node_id
= add_metadata_field(constants::cLogEventIdxName, NodeType::Integer);
= add_metadata_field(constants::cLogEventIdxName, NodeType::DeltaInteger);
}
if (auto const rc = m_archive_writer->add_field_to_current_range(
std::string{constants::range_index::cFilename},
Expand Down Expand Up @@ -982,7 +982,7 @@ auto JsonParser::parse_from_ir() -> bool {
auto initialize_fields_for_archive = [&]() -> bool {
if (m_record_log_order) {
log_event_idx_node_id
= add_metadata_field(constants::cLogEventIdxName, NodeType::Integer);
= add_metadata_field(constants::cLogEventIdxName, NodeType::DeltaInteger);
}
if (auto const rc = m_archive_writer->add_field_to_current_range(
std::string{constants::range_index::cFilename},
Expand Down
8 changes: 8 additions & 0 deletions components/core/src/clp_s/SchemaReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ void SchemaReader::mark_column_as_timestamp(BaseColumnReader* column_reader) {
return std::get<int64_t>(static_cast<Int64ColumnReader*>(m_timestamp_column)
->extract_value(m_cur_message));
};
} else if (m_timestamp_column->get_type() == NodeType::DeltaInteger) {
m_get_timestamp = [this]() {
return std::get<int64_t>(static_cast<DeltaEncodedInt64ColumnReader*>(m_timestamp_column)
->extract_value(m_cur_message));
};
Comment on lines +32 to +36
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will we apply this encoding to an integer timestamp column in a future PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think that that will probably be worthwhile. We can experiment with that when we implement timestamp normalization.

} else if (m_timestamp_column->get_type() == NodeType::Float) {
m_get_timestamp = [this]() {
return static_cast<epochtime_t>(
Expand Down Expand Up @@ -428,6 +433,7 @@ size_t SchemaReader::generate_structured_array_template(
m_json_serializer.add_op(JsonSerializer::Op::EndArray);
break;
}
case NodeType::DeltaInteger:
case NodeType::Integer: {
m_json_serializer.add_op(JsonSerializer::Op::AddIntValue);
m_reordered_columns.push_back(m_columns[column_idx++]);
Expand Down Expand Up @@ -512,6 +518,7 @@ size_t SchemaReader::generate_structured_object_template(
m_json_serializer.add_op(JsonSerializer::Op::EndArray);
break;
}
case NodeType::DeltaInteger:
case NodeType::Integer: {
m_json_serializer.add_op(JsonSerializer::Op::AddIntField);
m_reordered_columns.push_back(m_columns[column_idx++]);
Expand Down Expand Up @@ -620,6 +627,7 @@ void SchemaReader::generate_json_template(int32_t id) {
m_json_serializer.add_op(JsonSerializer::Op::EndArray);
break;
}
case NodeType::DeltaInteger:
case NodeType::Integer: {
m_json_serializer.add_op(JsonSerializer::Op::AddIntField);
m_reordered_columns.push_back(m_column_map[child_global_id]);
Expand Down
4 changes: 2 additions & 2 deletions components/core/src/clp_s/SchemaReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ class SchemaReader {
/**
* Marks a column as the log_event_idx column.
*/
void mark_column_as_log_event_idx(Int64ColumnReader* column_reader) {
void mark_column_as_log_event_idx(BaseColumnReader* column_reader) {
m_log_event_idx_column = column_reader;
}

Expand Down Expand Up @@ -321,7 +321,7 @@ class SchemaReader {

BaseColumnReader* m_timestamp_column;
std::function<epochtime_t()> m_get_timestamp;
Int64ColumnReader* m_log_event_idx_column{nullptr};
BaseColumnReader* m_log_event_idx_column{nullptr};

std::shared_ptr<SchemaTree> m_global_schema_tree;
SchemaTree m_local_schema_tree;
Expand Down
1 change: 1 addition & 0 deletions components/core/src/clp_s/SchemaTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ auto node_to_literal_type(NodeType type) -> clp_s::search::ast::LiteralType {
// type-per-token support.
switch (type) {
case NodeType::Integer:
case NodeType::DeltaInteger:
return clp_s::search::ast::LiteralType::IntegerT;
case NodeType::Float:
return clp_s::search::ast::LiteralType::FloatT;
Expand Down
1 change: 1 addition & 0 deletions components/core/src/clp_s/SchemaTree.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ enum class NodeType : uint8_t {
DateString,
StructuredArray,
Metadata,
DeltaInteger,
Unknown = std::underlying_type<NodeType>::type(~0ULL)
};

Expand Down
2 changes: 1 addition & 1 deletion components/core/src/clp_s/SingleFileArchiveDefs.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ namespace clp_s {
// define the version
constexpr uint8_t cArchiveMajorVersion = 0;
constexpr uint8_t cArchiveMinorVersion = 3;
constexpr uint16_t cArchivePatchVersion = 1;
constexpr uint16_t cArchivePatchVersion = 2;

// define the magic number
constexpr uint8_t cStructuredSFAMagicNumber[] = {0xFD, 0x2F, 0xC5, 0x30};
Expand Down
Loading
Loading