Skip to content

Commit f5005ec

Browse files
committed
Added more error handling and handled most of the remaining comments
1 parent 2a1b929 commit f5005ec

File tree

3 files changed

+93
-52
lines changed

3 files changed

+93
-52
lines changed

components/core/src/clp_s/CMakeLists.txt

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,6 @@ set(
2323
../clp/ReadOnlyMemoryMappedFile.cpp
2424
../clp/ReadOnlyMemoryMappedFile.hpp
2525
../clp/ReaderInterface.cpp
26-
../clp/ReaderInterface.cpp
27-
../clp/ReaderInterface.hpp
2826
../clp/ReaderInterface.hpp
2927
../clp/TraceableException.hpp
3028
../clp/WriterInterface.cpp
@@ -39,7 +37,6 @@ set(
3937
../clp/ffi/SchemaTree.hpp
4038
../clp/ffi/SchemaTreeNode.hpp
4139
../clp/ffi/Value.hpp
42-
../clp/ffi/Value.hpp
4340
../clp/ffi/ir_stream/Deserializer.cpp
4441
../clp/ffi/ir_stream/Deserializer.hpp
4542
../clp/ffi/ir_stream/Serializer.cpp
@@ -66,7 +63,6 @@ set(
6663
../clp/streaming_compression/zstd/Decompressor.hpp
6764
../clp/time_types.hpp
6865
../clp/type_utils.hpp
69-
../clp/type_utils.hpp
7066
../clp/utf8_utils.cpp
7167
../clp/utf8_utils.hpp
7268
)

components/core/src/clp_s/JsonParser.cpp

Lines changed: 52 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -541,7 +541,7 @@ NodeType get_archive_node_type(
541541
archive_node_type = NodeType::UnstructuredArray;
542542
break;
543543
case clp::ffi::SchemaTreeNode::Type::Str:
544-
if (node_value->is<std::string>()) {
544+
if (node_value && node_value->is<std::string>()) {
545545
archive_node_type = NodeType::VarString;
546546
} else {
547547
archive_node_type = NodeType::ClpString;
@@ -592,6 +592,8 @@ int JsonParser::get_archive_node_id(
592592
std::string node_key = "";
593593
if (validated_escaped_key.has_value()) {
594594
node_key = validated_escaped_key.value();
595+
} else {
596+
throw "Key is not utf8 compliant";
595597
}
596598
int curr_node_archive_id
597599
= m_archive_writer->add_node(parent_node_id, archive_node_type, node_key);
@@ -616,12 +618,17 @@ void JsonParser::parse_kv_log_event(
616618
} else {
617619
archive_node_type = get_archive_node_type(ir_node_type, node_has_value, {});
618620
}
619-
int node_id = get_archive_node_id(
620-
ir_node_to_archive_node_map,
621-
pair.first,
622-
archive_node_type,
623-
tree
624-
);
621+
int node_id;
622+
try {
623+
node_id = get_archive_node_id(
624+
ir_node_to_archive_node_map,
625+
pair.first,
626+
archive_node_type,
627+
tree
628+
);
629+
} catch (...) {
630+
throw;
631+
}
625632

626633
switch (archive_node_type) {
627634
case NodeType::Integer: {
@@ -637,30 +644,38 @@ void JsonParser::parse_kv_log_event(
637644
m_current_parsed_message.add_value(node_id, b_value);
638645
} break;
639646
case NodeType::VarString: {
640-
std::string str = clp::ffi::validate_and_escape_utf8_string(
641-
pair.second.value().get_immutable_view<std::string>()
642-
)
643-
.value();
647+
auto validated_escaped_string = clp::ffi::validate_and_escape_utf8_string(
648+
pair.second.value().get_immutable_view<std::string>()
649+
);
650+
std::string str = "";
651+
if (validated_escaped_string.has_value()) {
652+
str = validated_escaped_string.value();
653+
} else {
654+
throw "String is not utf8 compliant";
655+
}
644656
m_current_parsed_message.add_value(node_id, str);
645657
} break;
646658
case NodeType::ClpString: {
647-
std::string encoded_str;
659+
std::string encoded_str = "";
660+
std::string decodedValue = "";
648661
if (pair.second.value().is<clp::ir::EightByteEncodedTextAst>()) {
649-
std::string decodedValue
650-
= pair.second.value()
651-
.get_immutable_view<clp::ir::EightByteEncodedTextAst>()
652-
.decode_and_unparse()
653-
.value();
654-
encoded_str = clp::ffi::validate_and_escape_utf8_string(decodedValue.c_str())
655-
.value();
662+
decodedValue = pair.second.value()
663+
.get_immutable_view<clp::ir::EightByteEncodedTextAst>()
664+
.decode_and_unparse()
665+
.value();
666+
656667
} else {
657-
std::string decodedValue
658-
= pair.second.value()
659-
.get_immutable_view<clp::ir::FourByteEncodedTextAst>()
660-
.decode_and_unparse()
661-
.value();
662-
encoded_str = clp::ffi::validate_and_escape_utf8_string(decodedValue.c_str())
663-
.value();
668+
decodedValue = pair.second.value()
669+
.get_immutable_view<clp::ir::FourByteEncodedTextAst>()
670+
.decode_and_unparse()
671+
.value();
672+
}
673+
auto validated_escaped_encoded_string
674+
= clp::ffi::validate_and_escape_utf8_string(decodedValue.c_str());
675+
if (validated_escaped_encoded_string.has_value()) {
676+
encoded_str = validated_escaped_encoded_string.value();
677+
} else {
678+
throw "Encoded string is not utf8 compliant";
664679
}
665680
m_current_parsed_message.add_value(node_id, encoded_str);
666681
} break;
@@ -695,7 +710,6 @@ void JsonParser::parse_kv_log_event(
695710

696711
bool JsonParser::parse_from_IR() {
697712
std::map<std::tuple<int32_t, NodeType>, int32_t> ir_node_to_archive_node_map;
698-
//m_archive_writer->add_node(-1, NodeType::Unknown, "root");
699713

700714
for (auto& file_path : m_file_paths) {
701715
int fsize = std::filesystem::file_size(file_path);
@@ -727,13 +741,20 @@ bool JsonParser::parse_from_IR() {
727741

728742
m_current_schema.clear();
729743
auto const& kv_log_event = kv_log_event_result.value();
730-
731-
parse_kv_log_event(kv_log_event, ir_node_to_archive_node_map);
732-
744+
try {
745+
parse_kv_log_event(kv_log_event, ir_node_to_archive_node_map);
746+
} catch (std::string msg) {
747+
SPDLOG_ERROR("ERROR: {}" + msg);
748+
zd.close();
749+
return false;
750+
} catch (...) {
751+
SPDLOG_ERROR("ERROR: Encountered error while parsing a kv log event");
752+
zd.close();
753+
return false;
754+
}
733755
m_num_messages++;
734756
if (m_archive_writer->get_data_size() >= m_target_encoded_size) {
735757
ir_node_to_archive_node_map.clear();
736-
//m_archive_writer->add_node(-1, NodeType::Unknown, "root");
737758
split_archive();
738759
}
739760

components/core/src/clp_s/clp-s.cpp

Lines changed: 41 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -143,15 +143,20 @@ auto unpack_and_serialize_msgpack_bytes(
143143
std::vector<uint8_t> const& msgpack_bytes,
144144
Serializer<encoded_variable_t>& serializer
145145
) -> bool {
146-
auto const msgpack_obj_handle{msgpack::unpack(
147-
clp::size_checked_pointer_cast<char const>(msgpack_bytes.data()),
148-
msgpack_bytes.size()
149-
)};
150-
auto const msgpack_obj{msgpack_obj_handle.get()};
151-
if (msgpack::type::MAP != msgpack_obj.type) {
146+
try {
147+
auto const msgpack_obj_handle{msgpack::unpack(
148+
clp::size_checked_pointer_cast<char const>(msgpack_bytes.data()),
149+
msgpack_bytes.size()
150+
)};
151+
auto const msgpack_obj{msgpack_obj_handle.get()};
152+
if (msgpack::type::MAP != msgpack_obj.type) {
153+
return false;
154+
}
155+
return serializer.serialize_msgpack_map(msgpack_obj.via.map);
156+
} catch (std::exception const& e) {
157+
SPDLOG_ERROR("Failed to unpack msgpack bytes: {}", e.what());
152158
return false;
153159
}
154-
return serializer.serialize_msgpack_map(msgpack_obj.via.map);
155160
}
156161

157162
template <typename T>
@@ -168,13 +173,17 @@ auto run_serializer(clp_s::JsonToIRParserOption option, std::string path) {
168173
std::ifstream in_file;
169174
in_file.open(path, std::ifstream::in);
170175

171-
std::string out_path = "";
176+
/* std::string out_path = "";
172177
int index = path.find_last_of('/');
173178
if (std::string::npos == index) {
174179
out_path = option.irs_dir + "/" + path + ".ir";
175180
} else {
176181
out_path = option.irs_dir + "/" + path.substr(index, path.length() - index) + ".ir";
177-
}
182+
} */
183+
std::filesystem::path input_path{path};
184+
std::string filename = input_path.filename().string();
185+
std::string out_path = option.irs_dir + "/" + filename + ".ir";
186+
178187
clp_s::FileWriter out_file;
179188
out_file.open(out_path, clp_s::FileWriter::OpenMode::CreateForWriting);
180189
clp_s::ZstdCompressor zc;
@@ -185,14 +194,29 @@ auto run_serializer(clp_s::JsonToIRParserOption option, std::string path) {
185194

186195
if (in_file.is_open()) {
187196
while (getline(in_file, line)) {
188-
auto j_obj = nlohmann::json::parse(line);
189-
unpack_and_serialize_msgpack_bytes(nlohmann::json::to_msgpack(j_obj), serializer);
190-
flush_and_clear_serializer_buffer(serializer, ir_buf);
191-
if (ir_buf.size() >= 1'000'000'000) {
192-
total_size = total_size + ir_buf.size();
193-
zc.write(reinterpret_cast<char*>(ir_buf.data()), ir_buf.size());
194-
zc.flush();
195-
ir_buf.clear();
197+
try {
198+
auto j_obj = nlohmann::json::parse(line);
199+
if (!unpack_and_serialize_msgpack_bytes(
200+
nlohmann::json::to_msgpack(j_obj),
201+
serializer
202+
))
203+
{
204+
SPDLOG_ERROR("Failed to serialize msgpack bytes for line: {}", line);
205+
return false;
206+
}
207+
flush_and_clear_serializer_buffer(serializer, ir_buf);
208+
if (ir_buf.size() >= 1'000'000'000) {
209+
total_size = total_size + ir_buf.size();
210+
zc.write(reinterpret_cast<char*>(ir_buf.data()), ir_buf.size());
211+
zc.flush();
212+
ir_buf.clear();
213+
}
214+
} catch (nlohmann::json::parse_error const& e) {
215+
SPDLOG_ERROR("JSON parsing error: {}", e.what());
216+
return false;
217+
} catch (std::exception const& e) {
218+
SPDLOG_ERROR("Error during serialization: {}", e.what());
219+
return false;
196220
}
197221
}
198222
total_size = total_size + ir_buf.size();

0 commit comments

Comments
 (0)