11#include " JsonParser.hpp"
22
3- #include < iostream>
3+ #include < cstdint>
4+ #include < cstdlib>
5+ #include < optional>
46#include < stack>
7+ #include < unordered_map>
58
69#include < simdjson.h>
710#include < spdlog/spdlog.h>
811
9- #include " archive_constants.hpp"
12+ #include " ../clp/ffi/SchemaTree.hpp"
13+ #include " ../clp/ffi/SchemaTreeNode.hpp"
14+ #include " ../clp/ffi/utils.hpp"
15+ #include " ../clp/ffi/Value.hpp"
16+ #include " ../clp/ir/types.hpp"
17+ #include " ../clp/streaming_compression/zstd/Decompressor.hpp"
18+ #include " DictionaryWriter.hpp"
1019#include " JsonFileIterator.hpp"
20+ #include " ParsedMessage.hpp"
21+
22+ using namespace simdjson ;
1123
1224namespace clp_s {
25+
1326JsonParser::JsonParser (JsonParserOption const & option)
1427 : m_num_messages(0 ),
1528 m_target_encoded_size (option.target_encoded_size),
@@ -520,13 +533,13 @@ bool JsonParser::parse() {
520533 return true ;
521534}
522535
523- NodeType get_archive_node_type (
536+ auto JsonParser:: get_archive_node_type (
524537 clp::ffi::SchemaTreeNode::Type ir_node_type,
525538 bool node_has_value,
526539 std::optional<clp::ffi::Value> const & node_value
527- ) {
540+ ) -> NodeType {
528541 // figure out what type the node is in archive node type
529- NodeType archive_node_type;
542+ NodeType archive_node_type = NodeType::Unknown ;
530543 switch (ir_node_type) {
531544 case clp::ffi::SchemaTreeNode::Type::Int:
532545 archive_node_type = NodeType::Integer;
@@ -559,29 +572,33 @@ NodeType get_archive_node_type(
559572 }
560573 break ;
561574 default :
562- archive_node_type = NodeType::Unknown;
563575 break ;
564576 }
565577 return archive_node_type;
566578}
567579
568- //
569- int JsonParser::get_archive_node_id (
570- std::map<std::tuple< int32_t , NodeType>, int32_t >& ir_node_to_archive_node_map ,
571- int ir_node_id,
580+ auto JsonParser::get_archive_node_id (
581+ std::unordered_map< int32_t , std::vector<std::pair<NodeType, int32_t >>>&
582+ ir_node_to_archive_node_unordered_map ,
583+ int32_t ir_node_id,
572584 NodeType archive_node_type,
573585 clp::ffi::SchemaTree const & ir_tree
574- ) {
575- auto key = std::make_tuple (ir_node_id, archive_node_type);
576- auto map_location = ir_node_to_archive_node_map.find (key);
577- if (ir_node_to_archive_node_map.end () != map_location) {
578- return map_location->second ;
586+ ) -> int {
587+ auto unordered_map_location = ir_node_to_archive_node_unordered_map.find (ir_node_id);
588+ if (ir_node_to_archive_node_unordered_map.end () != unordered_map_location) {
589+ auto translation_vector = unordered_map_location->second ;
590+ for (int i = 0 ; i < translation_vector.size (); i++) {
591+ if (translation_vector[i].first == archive_node_type) {
592+ return translation_vector[i].second ;
593+ }
594+ }
579595 }
580- auto & curr_node = ir_tree.get_node (ir_node_id);
596+
597+ auto const & curr_node = ir_tree.get_node (ir_node_id);
581598 int32_t parent_node_id{-1 };
582599 if (ir_node_id != curr_node.get_parent_id ()) {
583600 parent_node_id = get_archive_node_id (
584- ir_node_to_archive_node_map ,
601+ ir_node_to_archive_node_unordered_map ,
585602 curr_node.get_parent_id (),
586603 NodeType::Object,
587604 ir_tree
@@ -597,16 +614,23 @@ int JsonParser::get_archive_node_id(
597614 }
598615 int curr_node_archive_id
599616 = m_archive_writer->add_node (parent_node_id, archive_node_type, node_key);
600- ir_node_to_archive_node_map.emplace (std::move (key), curr_node_archive_id);
617+ auto p = std::make_pair (archive_node_type, curr_node_archive_id);
618+ if (ir_node_to_archive_node_unordered_map.end () != unordered_map_location) {
619+ unordered_map_location->second .push_back (p);
620+ } else {
621+ std::vector<std::pair<NodeType, int32_t >> v;
622+ v.push_back (p);
623+ ir_node_to_archive_node_unordered_map.emplace (ir_node_id, v);
624+ }
601625 return curr_node_archive_id;
602626}
603627
604628void JsonParser::parse_kv_log_event (
605629 KeyValuePairLogEvent const & kv,
606- std::map<std::tuple<int32_t , NodeType>, int32_t >& ir_node_to_archive_node_map
630+ std::unordered_map<int32_t , std::vector<std::pair<NodeType, int32_t >>>&
631+ ir_node_to_archive_node_unordered_map
607632) {
608633 clp::ffi::SchemaTree const & tree = kv.get_schema_tree ();
609-
610634 for (auto const & pair : kv.get_node_id_value_pairs ()) {
611635 clp::ffi::SchemaTreeNode const & tree_node = tree.get_node (pair.first );
612636 clp::ffi::SchemaTreeNode::Type ir_node_type = tree_node.get_type ();
@@ -621,7 +645,7 @@ void JsonParser::parse_kv_log_event(
621645 int node_id;
622646 try {
623647 node_id = get_archive_node_id (
624- ir_node_to_archive_node_map ,
648+ ir_node_to_archive_node_unordered_map ,
625649 pair.first ,
626650 archive_node_type,
627651 tree
@@ -705,11 +729,11 @@ void JsonParser::parse_kv_log_event(
705729 int32_t current_schema_id = m_archive_writer->add_schema (m_current_schema);
706730 m_current_parsed_message.set_id (current_schema_id);
707731 m_archive_writer->append_message (current_schema_id, m_current_schema, m_current_parsed_message);
708- return ;
709732}
710733
711- bool JsonParser::parse_from_IR () {
712- std::map<std::tuple<int32_t , NodeType>, int32_t > ir_node_to_archive_node_map;
734+ auto JsonParser::parse_from_ir () -> bool {
735+ std::unordered_map<int32_t , std::vector<std::pair<NodeType, int32_t >>>
736+ ir_node_to_archive_node_unordered_map;
713737
714738 for (auto & file_path : m_file_paths) {
715739 int fsize = std::filesystem::file_size (file_path);
@@ -742,7 +766,7 @@ bool JsonParser::parse_from_IR() {
742766 m_current_schema.clear ();
743767 auto const & kv_log_event = kv_log_event_result.value ();
744768 try {
745- parse_kv_log_event (kv_log_event, ir_node_to_archive_node_map );
769+ parse_kv_log_event (kv_log_event, ir_node_to_archive_node_unordered_map );
746770 } catch (std::string msg) {
747771 SPDLOG_ERROR (" ERROR: {}" + msg);
748772 zd.close ();
@@ -754,14 +778,14 @@ bool JsonParser::parse_from_IR() {
754778 }
755779 m_num_messages++;
756780 if (m_archive_writer->get_data_size () >= m_target_encoded_size) {
757- ir_node_to_archive_node_map .clear ();
781+ ir_node_to_archive_node_unordered_map .clear ();
758782 split_archive ();
759783 }
760784
761785 m_current_parsed_message.clear ();
762786
763787 } while (true );
764- ir_node_to_archive_node_map .clear ();
788+ ir_node_to_archive_node_unordered_map .clear ();
765789 zd.close ();
766790 }
767791 return true ;
0 commit comments