@@ -48,7 +48,13 @@ void JsonConstructor::store() {
4848 m_archive_reader = std::make_unique<ArchiveReader>();
4949 m_archive_reader->open (m_option.archives_dir , m_option.archive_id );
5050 m_archive_reader->read_dictionaries_and_metadata ();
51- if (false == m_option.ordered ) {
51+
52+ if (m_option.ordered && false == m_archive_reader->has_log_order ()) {
53+ SPDLOG_WARN (" This archive is missing ordering information and can not be decompressed in "
54+ " log order. Falling back to out of order decompression." );
55+ }
56+
57+ if (false == m_option.ordered || false == m_archive_reader->has_log_order ()) {
5258 FileWriter writer;
5359 writer.open (
5460 m_option.output_dir + " /original" ,
@@ -68,15 +74,15 @@ void JsonConstructor::construct_in_order() {
6874 auto tables = m_archive_reader->read_all_tables ();
6975 using ReaderPointer = std::shared_ptr<SchemaReader>;
7076 auto cmp = [](ReaderPointer& left, ReaderPointer& right) {
71- return left->get_next_timestamp () > right->get_next_timestamp ();
77+ return left->get_next_log_event_idx () > right->get_next_log_event_idx ();
7278 };
7379 std::priority_queue record_queue (tables.begin (), tables.end (), cmp);
7480 // Clear tables vector so that memory gets deallocated after we have marshalled all records for
7581 // a given table
7682 tables.clear ();
7783
78- epochtime_t first_timestamp {0 };
79- epochtime_t last_timestamp {0 };
84+ int64_t first_idx {0 };
85+ int64_t last_idx {0 };
8086 size_t num_records_marshalled{0 };
8187 auto src_path = std::filesystem::path (m_option.output_dir ) / m_option.archive_id ;
8288 FileWriter writer;
@@ -97,9 +103,11 @@ void JsonConstructor::construct_in_order() {
97103
98104 std::vector<bsoncxx::document::value> results;
99105 auto finalize_chunk = [&](bool open_new_writer) {
106+ // Add one to last_idx to match clp's behaviour of having the end index be exclusive
107+ ++last_idx;
100108 writer.close ();
101- std::string new_file_name = src_path.string () + " _" + std::to_string (first_timestamp ) + " _"
102- + std::to_string (last_timestamp ) + " .jsonl" ;
109+ std::string new_file_name = src_path.string () + " _" + std::to_string (first_idx ) + " _"
110+ + std::to_string (last_idx ) + " .jsonl" ;
103111 auto new_file_path = std::filesystem::path (new_file_name);
104112 std::error_code ec;
105113 std::filesystem::rename (src_path, new_file_path, ec);
@@ -119,11 +127,11 @@ void JsonConstructor::construct_in_order() {
119127 ),
120128 bsoncxx::builder::basic::kvp (
121129 constants::results_cache::decompression::cBeginMsgIx,
122- static_cast < int64_t >(first_timestamp)
130+ first_idx
123131 ),
124132 bsoncxx::builder::basic::kvp (
125133 constants::results_cache::decompression::cEndMsgIx,
126- static_cast < int64_t >(last_timestamp)
134+ last_idx
127135 ),
128136 bsoncxx::builder::basic::kvp (
129137 constants::results_cache::decompression::cIsLastIrChunk,
@@ -140,9 +148,9 @@ void JsonConstructor::construct_in_order() {
140148 while (false == record_queue.empty ()) {
141149 ReaderPointer next = record_queue.top ();
142150 record_queue.pop ();
143- last_timestamp = next->get_next_timestamp ();
151+ last_idx = next->get_next_log_event_idx ();
144152 if (0 == num_records_marshalled) {
145- first_timestamp = last_timestamp ;
153+ first_idx = last_idx ;
146154 }
147155 next->get_next_message (buffer);
148156 if (false == next->done ()) {
0 commit comments