Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ext_libs/vowpal_wabbit
Submodule vowpal_wabbit updated 227 files
34 changes: 19 additions & 15 deletions external_parser/joiners/example_joiner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,9 @@ bool example_joiner::process_interaction(const v2::Event &event,
metadata.payload_type() == v2::PayloadType_Slates) {
const v2::MultiSlotEvent *multislot = nullptr;
if (!typed_event::process_compression<v2::MultiSlotEvent>(
event.payload()->data(), event.payload()->size(), metadata, multislot,
_detached_buffer) || multislot == nullptr) {
event.payload()->data(), event.payload()->size(), metadata,
multislot, _detached_buffer) ||
multislot == nullptr) {
return false;
}

Expand All @@ -253,7 +254,8 @@ bool example_joiner::process_interaction(const v2::Event &event,

je = typed_event::event_processor<v2::MultiSlotEvent>::fill_in_joined_event(
*multislot, metadata, enqueued_time_utc,
typed_event::event_processor<v2::MultiSlotEvent>::get_context(*multislot));
typed_event::event_processor<v2::MultiSlotEvent>::get_context(
*multislot));
} else if (metadata.payload_type() == v2::PayloadType_CA) {
const v2::CaEvent *ca = nullptr;
if (!typed_event::process_compression<v2::CaEvent>(
Expand Down Expand Up @@ -286,13 +288,13 @@ bool example_joiner::process_interaction(const v2::Event &event,
std::string context(je.context);
try {
if (_vw->audit || _vw->hash_inv) {
VW::template read_line_json<true>(
*_vw, examples, const_cast<char *>(context.c_str()),
VW::template read_line_json_s<true>(
*_vw, examples, const_cast<char *>(context.c_str()), context.size(),
reinterpret_cast<VW::example_factory_t>(&VW::get_unused_example),
_vw, &_dedup_cache.dedup_examples);
} else {
VW::template read_line_json<false>(
*_vw, examples, const_cast<char *>(context.c_str()),
VW::template read_line_json_s<false>(
*_vw, examples, const_cast<char *>(context.c_str()), context.size(),
reinterpret_cast<VW::example_factory_t>(&VW::get_unused_example),
_vw, &_dedup_cache.dedup_examples);
}
Expand Down Expand Up @@ -386,15 +388,15 @@ bool example_joiner::process_dedup(const v2::Event &event,

try {
if (_vw->audit || _vw->hash_inv) {
VW::template read_line_json<true>(
VW::template read_line_json_s<true>(
*_vw, examples,
const_cast<char *>(dedup->values()->Get(i)->c_str()),
get_or_create_example_f, this);
dedup->values()->Get(i)->size(), get_or_create_example_f, this);
} else {
VW::template read_line_json<false>(
VW::template read_line_json_s<false>(
*_vw, examples,
const_cast<char *>(dedup->values()->Get(i)->c_str()),
get_or_create_example_f, this);
dedup->values()->Get(i)->size(), get_or_create_example_f, this);
}
} catch (VW::vw_exception &e) {
VW::io::logger::log_error("JSON parsing during dedup processing failed "
Expand Down Expand Up @@ -460,7 +462,8 @@ bool example_joiner::process_joined(v_array<example *> &examples) {
if (!je->is_joined_event_learnable()) {
_joiner_metrics.number_of_skipped_events++;
} else {
_joiner_metrics.sum_cost_original += -1. * je->get_sum_original_reward();
_joiner_metrics.sum_cost_original +=
-1. * je->get_sum_original_reward();
if (_joiner_metrics.first_event_id.empty()) {
_joiner_metrics.first_event_id =
std::move(je->interaction_metadata.event_id);
Expand Down Expand Up @@ -539,7 +542,8 @@ void example_joiner::persist_metrics() {
_vw->example_parser->metrics->NumberOfSkippedEvents =
_joiner_metrics.number_of_skipped_events;

_vw->example_parser->metrics->DsjsonSumCostOriginal = _joiner_metrics.sum_cost_original;
_vw->example_parser->metrics->DsjsonSumCostOriginal =
_joiner_metrics.sum_cost_original;

if (!_joiner_metrics.first_event_id.empty()) {
_vw->example_parser->metrics->FirstEventId =
Expand Down Expand Up @@ -571,5 +575,5 @@ metrics::joiner_metrics example_joiner::get_metrics() {
return _joiner_metrics;
}

void example_joiner::apply_cli_overrides(vw *all, const input_options &parsed_options) {
}
void example_joiner::apply_cli_overrides(vw *all,
const input_options &parsed_options) {}
10 changes: 5 additions & 5 deletions external_parser/joiners/multistep_example_joiner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ bool multistep_example_joiner::populate_order() {
}

reward::outcome_event multistep_example_joiner::process_outcome(
const TimePoint& timestamp, const v2::Metadata &metadata, const v2::OutcomeEvent& event) {
const TimePoint&, const v2::Metadata &metadata, const v2::OutcomeEvent& event) {
reward::outcome_event o_event;
o_event.metadata = {metadata.app_id() ? metadata.app_id()->str() : "",
metadata.payload_type(),
Expand Down Expand Up @@ -250,12 +250,12 @@ joined_event::multistep_joined_event multistep_example_joiner::process_interacti
event.context()->size());

if (_vw->audit || _vw->hash_inv) {
VW::template read_line_json<true>(
*_vw, examples, const_cast<char *>(line_vec.c_str()),
VW::template read_line_json_s<true>(
*_vw, examples, const_cast<char *>(line_vec.c_str()), line_vec.size(),
reinterpret_cast<VW::example_factory_t>(&VW::get_unused_example), _vw);
} else {
VW::template read_line_json<false>(
*_vw, examples, const_cast<char *>(line_vec.c_str()),
VW::template read_line_json_s<false>(
*_vw, examples, const_cast<char *>(line_vec.c_str()), line_vec.size(),
reinterpret_cast<VW::example_factory_t>(&VW::get_unused_example), _vw);
}

Expand Down
2 changes: 1 addition & 1 deletion external_parser/joiners/multistep_example_joiner.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ std::map<const char *, multistep_reward_funtion_type> const multistep_reward_fun

using MultistepRewardFunctionType = void (*)(std::deque<float> &);

inline void multistep_reward_identity(std::deque<float> &rewards) {}
inline void multistep_reward_identity(std::deque<float> &) {}

inline void multistep_reward_suffix_sum(std::deque<float> &rewards) {
for (size_t i = 1; i < rewards.size(); ++i) {
Expand Down
72 changes: 36 additions & 36 deletions external_parser/parse_example_binary.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
// use appropriate logger

// helpers start
bool read_payload_type(io_buf *input, unsigned int &payload_type) {
bool read_payload_type(io_buf &input, unsigned int &payload_type) {
char *line = nullptr;
auto len = input->buf_read(line, sizeof(unsigned int));
auto len = input.buf_read(line, sizeof(unsigned int));

if (len < sizeof(unsigned int) || line == nullptr) {
if (len == 0) {
Expand All @@ -41,9 +41,9 @@ bool read_payload_type(io_buf *input, unsigned int &payload_type) {
return true;
}

bool read_payload_size(io_buf *input, uint32_t &payload_size) {
bool read_payload_size(io_buf &input, uint32_t &payload_size) {
char *line = nullptr;
auto len = input->buf_read(line, sizeof(uint32_t));
auto len = input.buf_read(line, sizeof(uint32_t));
if (len < sizeof(uint32_t) || line == nullptr) {
return false;
}
Expand All @@ -52,9 +52,9 @@ bool read_payload_size(io_buf *input, uint32_t &payload_size) {
return true;
}

bool read_payload(io_buf *input, char *&payload, uint32_t payload_size) {
bool read_payload(io_buf &input, char *&payload, uint32_t payload_size) {
char *line = nullptr;
auto len = input->buf_read(line, payload_size);
auto len = input.buf_read(line, payload_size);

if (len < payload_size || line == nullptr) {
return false;
Expand All @@ -63,7 +63,7 @@ bool read_payload(io_buf *input, char *&payload, uint32_t payload_size) {
return true;
}

bool read_padding(io_buf *input, uint32_t previous_payload_size,
bool read_padding(io_buf &input, uint32_t previous_payload_size,
uint32_t &padding_bytes) {
char *line = nullptr;
padding_bytes = previous_payload_size % 8;
Expand All @@ -78,15 +78,13 @@ bool read_padding(io_buf *input, uint32_t previous_payload_size,

namespace VW {
namespace external {
binary_parser::binary_parser(std::unique_ptr<i_joiner>&& joiner)
: _example_joiner(std::move(joiner))
, _payload(nullptr)
, _payload_size(0)
, _total_size_read(0) {}
binary_parser::binary_parser(std::unique_ptr<i_joiner> &&joiner)
: _example_joiner(std::move(joiner)), _payload(nullptr), _payload_size(0),
_total_size_read(0) {}

binary_parser::~binary_parser() {}

bool binary_parser::read_version(io_buf *input) {
bool binary_parser::read_version(io_buf &input) {
_payload = nullptr;
const uint32_t buffer_length = 4 * sizeof(char);
if (!read_payload(input, _payload, buffer_length)) {
Expand All @@ -98,7 +96,8 @@ bool binary_parser::read_version(io_buf *input) {
}

_total_size_read += buffer_length;
_payload_size = 0; //this is used but the padding code, make it do the right thing.
_payload_size =
0; // this is used but the padding code, make it do the right thing.

if (*_payload != BINARY_PARSER_VERSION) {
VW::io::logger::log_critical(
Expand All @@ -109,7 +108,7 @@ bool binary_parser::read_version(io_buf *input) {
return true;
}

bool binary_parser::read_header(io_buf *input) {
bool binary_parser::read_header(io_buf &input) {
_payload = nullptr;

// read header size
Expand Down Expand Up @@ -139,7 +138,7 @@ bool binary_parser::read_header(io_buf *input) {
return true;
}

bool binary_parser::skip_over_unknown_payload(io_buf *input) {
bool binary_parser::skip_over_unknown_payload(io_buf &input) {
_payload = nullptr;
if (!read_payload_size(input, _payload_size)) {
VW::io::logger::log_critical(
Expand All @@ -152,10 +151,10 @@ bool binary_parser::skip_over_unknown_payload(io_buf *input) {
_total_size_read += sizeof(_payload_size);

if (!read_payload(input, _payload, _payload_size)) {
VW::io::logger::log_critical(
"Failed to read unknown message payload of size [{}], after having read "
"[{}] bytes from the file",
_payload_size, _total_size_read);
VW::io::logger::log_critical("Failed to read unknown message payload of "
"size [{}], after having read "
"[{}] bytes from the file",
_payload_size, _total_size_read);
return false;
}

Expand All @@ -164,7 +163,7 @@ bool binary_parser::skip_over_unknown_payload(io_buf *input) {
return true;
}

bool binary_parser::read_checkpoint_msg(io_buf *input) {
bool binary_parser::read_checkpoint_msg(io_buf &input) {
_payload = nullptr;
if (!read_payload_size(input, _payload_size)) {
VW::io::logger::log_critical(
Expand Down Expand Up @@ -200,8 +199,9 @@ bool binary_parser::read_checkpoint_msg(io_buf *input) {
return true;
}

bool binary_parser::read_regular_msg(io_buf *input,
v_array<example *> &examples, bool &ignore_msg) {
bool binary_parser::read_regular_msg(io_buf &input,
v_array<example *> &examples,
bool &ignore_msg) {
_payload = nullptr;
ignore_msg = false;

Expand All @@ -225,8 +225,9 @@ bool binary_parser::read_regular_msg(io_buf *input,

_total_size_read += _payload_size;

if(!_example_joiner->joiner_ready()) {
VW::io::logger::log_warn("Read regular message before any checkpoint data "
if (!_example_joiner->joiner_ready()) {
VW::io::logger::log_warn(
"Read regular message before any checkpoint data "
"after having read [{}] bytes from the file. Events will be ignored.",
_total_size_read);
ignore_msg = true;
Expand All @@ -246,7 +247,7 @@ bool binary_parser::read_regular_msg(io_buf *input,
}
_example_joiner->on_new_batch();

for (const auto* event : *joined_payload->events()) {
for (const auto *event : *joined_payload->events()) {
// process and group events in batch
if (!_example_joiner->process_event(*event)) {
VW::io::logger::log_error("Processing of an event from JoinedPayload "
Expand Down Expand Up @@ -280,7 +281,7 @@ bool binary_parser::process_next_in_batch(v_array<example *> &examples) {
return false;
}

bool binary_parser::advance_to_next_payload_type(io_buf *input,
bool binary_parser::advance_to_next_payload_type(io_buf &input,
unsigned int &payload_type) {
// read potential excess padding after last payload read
uint32_t padding;
Expand Down Expand Up @@ -310,37 +311,36 @@ void binary_parser::persist_metrics(
_example_joiner->persist_metrics();
}

bool binary_parser::parse_examples(vw *all, v_array<example *> &examples) {
bool binary_parser::parse_examples(vw *, io_buf &io_buf,
v_array<example *> &examples) {
if (process_next_in_batch(examples)) {
return true;
}

unsigned int payload_type;
while (advance_to_next_payload_type(all->example_parser->input.get(),
payload_type)) {
while (advance_to_next_payload_type(io_buf, payload_type)) {
switch (payload_type) {
case MSG_TYPE_FILEMAGIC: {
if (!read_version(all->example_parser->input.get())) {
if (!read_version(io_buf)) {
return false;
}
break;
}
case MSG_TYPE_HEADER: {
if (!read_header(all->example_parser->input.get())) {
if (!read_header(io_buf)) {
return false;
}
break;
}
case MSG_TYPE_CHECKPOINT: {
if (!read_checkpoint_msg(all->example_parser->input.get())) {
if (!read_checkpoint_msg(io_buf)) {
return false;
}
break;
}
case MSG_TYPE_REGULAR: {
bool ignore_msg = false;
if (read_regular_msg(all->example_parser->input.get(), examples,
ignore_msg)) {
if (read_regular_msg(io_buf, examples, ignore_msg)) {
if (!ignore_msg) {
return true;
}
Expand All @@ -356,7 +356,7 @@ bool binary_parser::parse_examples(vw *all, v_array<example *> &examples) {
"Payload type not recognized [0x{:x}], after having read [{}] "
"bytes from the file, attempting to skip payload",
payload_type, _total_size_read);
if (!skip_over_unknown_payload(all->example_parser->input.get())) {
if (!skip_over_unknown_payload(io_buf)) {
return false;
}
continue;
Expand Down
22 changes: 13 additions & 9 deletions external_parser/parse_example_binary.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,20 @@ namespace external {

class binary_parser : public parser {
public:
binary_parser(std::unique_ptr<i_joiner>&& joiner); //taking ownership of joiner
binary_parser(
std::unique_ptr<i_joiner> &&joiner); // taking ownership of joiner
~binary_parser();
bool parse_examples(vw *all, v_array<example *> &examples) override;
bool read_version(io_buf *input);
bool read_header(io_buf *input);
bool read_checkpoint_msg(io_buf *input);
bool read_regular_msg(io_buf *input, v_array<example *> &examples, bool &ignore_msg);
bool skip_over_unknown_payload(io_buf *input);
bool advance_to_next_payload_type(io_buf *input, unsigned int &payload_type);
void persist_metrics(std::vector<std::pair<std::string, size_t>>& list_metrics) override;
bool parse_examples(vw *all, io_buf &io_buf,
v_array<example *> &examples) override;
bool read_version(io_buf &input);
bool read_header(io_buf &input);
bool read_checkpoint_msg(io_buf &input);
bool read_regular_msg(io_buf &input, v_array<example *> &examples,
bool &ignore_msg);
bool skip_over_unknown_payload(io_buf &input);
bool advance_to_next_payload_type(io_buf &input, unsigned int &payload_type);
void persist_metrics(
std::vector<std::pair<std::string, size_t>> &list_metrics) override;

private:
bool process_next_in_batch(v_array<example *> &examples);
Expand Down
7 changes: 4 additions & 3 deletions external_parser/parse_example_converter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,17 @@ binary_json_converter::binary_json_converter(std::unique_ptr<i_joiner> &&joiner)

binary_json_converter::~binary_json_converter() = default;

bool binary_json_converter::parse_examples(vw *all,
bool binary_json_converter::parse_examples(vw *all, io_buf &io_buf,
v_array<example *> &examples) {
while (_parser.parse_examples(all, examples)) {
while (_parser.parse_examples(all, io_buf, examples)) {
// do nothing
}
// vw will not learn, just exit
return false;
}

void binary_json_converter::persist_metrics(std::vector<std::pair<std::string, size_t>>&) {
void binary_json_converter::persist_metrics(
std::vector<std::pair<std::string, size_t>> &) {
// do we want metrics here?
}

Expand Down
2 changes: 1 addition & 1 deletion external_parser/parse_example_converter.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class binary_json_converter : public parser {
public:
binary_json_converter(std::unique_ptr<i_joiner>&& joiner); //taking ownership of joiner
~binary_json_converter();
bool parse_examples(vw *all, v_array<example *> &examples) override;
bool parse_examples(vw *all, io_buf& io_buf, v_array<example *> &examples) override;
void persist_metrics(std::vector<std::pair<std::string, size_t>>& list_metrics) override;

private:
Expand Down
Loading