From ddcdca244855cdb23c8704516449bd32cd75a76b Mon Sep 17 00:00:00 2001 From: Olga Vrousgou Date: Thu, 12 Aug 2021 15:56:39 +0100 Subject: [PATCH 1/2] Use client time utc and add tests --- external_parser/event_processors/loop.h | 3 +- .../event_processors/timestamp_helper.cc | 5 + .../event_processors/timestamp_helper.h | 3 +- external_parser/joiners/example_joiner.cc | 54 +++++--- external_parser/joiners/example_joiner.h | 2 + external_parser/joiners/i_joiner.h | 1 + .../joiners/multistep_example_joiner.cc | 4 + .../joiners/multistep_example_joiner.h | 1 + external_parser/parse_example_binary.cc | 1 + external_parser/parse_example_external.cc | 6 + external_parser/parse_example_external.h | 1 + external_parser/unit_tests/CMakeLists.txt | 1 + .../test_client_and_enqueued_time.cc | 121 ++++++++++++++++++ .../unit_tests/test_files/README.md | 9 ++ .../client_time/cb_v2_client_time.fb | Bin 0 -> 384 bytes .../f-reward_3obs_v2_client_time.fb | Bin 0 -> 548 bytes rlclientlib/schema/v2/FileFormat.fbs | 1 + test_tools/log_parser/joiner.py | 12 ++ test_tools/log_parser/log_gen.py | 3 +- 19 files changed, 204 insertions(+), 24 deletions(-) create mode 100644 external_parser/unit_tests/test_client_and_enqueued_time.cc create mode 100644 external_parser/unit_tests/test_files/client_time/cb_v2_client_time.fb create mode 100644 external_parser/unit_tests/test_files/client_time/f-reward_3obs_v2_client_time.fb diff --git a/external_parser/event_processors/loop.h b/external_parser/event_processors/loop.h index 3f0cb80c0..7a3ef2e54 100644 --- a/external_parser/event_processors/loop.h +++ b/external_parser/event_processors/loop.h @@ -37,9 +37,10 @@ struct loop_info { sticky_value default_reward = sticky_value(0.f); sticky_value learning_mode_config = sticky_value(v2::LearningModeType_Online); sticky_value problem_type_config; + sticky_value use_client_time = sticky_value(false); bool is_configured() const { - return default_reward.is_valid() && learning_mode_config.is_valid() && problem_type_config.is_valid(); + return default_reward.is_valid() && learning_mode_config.is_valid() && problem_type_config.is_valid() && use_client_time.is_valid(); }//&& type.is_valid() }; } // namespace loop \ No newline at end of file diff --git a/external_parser/event_processors/timestamp_helper.cc b/external_parser/event_processors/timestamp_helper.cc index 482be3946..74b397c39 100644 --- a/external_parser/event_processors/timestamp_helper.cc +++ b/external_parser/event_processors/timestamp_helper.cc @@ -14,4 +14,9 @@ TimePoint timestamp_to_chrono(const v2::TimeStamp &ts) { date::day(ts.day())) + std::chrono::hours(ts.hour()) + std::chrono::minutes(ts.minute()) + std::chrono::seconds(ts.second()); +} + +bool is_empty_timestamp(const v2::TimeStamp &ts) { + return (ts.year() == 0 && ts.month() == 0 && ts.day() == 0 && + ts.hour() == 0 && ts.minute() == 0 && ts.second() == 0); } \ No newline at end of file diff --git a/external_parser/event_processors/timestamp_helper.h b/external_parser/event_processors/timestamp_helper.h index 88ecbebb3..5101a005f 100644 --- a/external_parser/event_processors/timestamp_helper.h +++ b/external_parser/event_processors/timestamp_helper.h @@ -7,4 +7,5 @@ namespace v2 = reinforcement_learning::messages::flatbuff::v2; using TimePoint = std::chrono::time_point; -TimePoint timestamp_to_chrono(const v2::TimeStamp &ts); \ No newline at end of file +TimePoint timestamp_to_chrono(const v2::TimeStamp &ts); +bool is_empty_timestamp(const v2::TimeStamp& ts); \ No newline at end of file diff --git a/external_parser/joiners/example_joiner.cc b/external_parser/joiners/example_joiner.cc index a7e465e40..3202ff82f 100644 --- a/external_parser/joiners/example_joiner.cc +++ b/external_parser/joiners/example_joiner.cc @@ -116,6 +116,10 @@ void example_joiner::set_problem_type_config(v2::ProblemType problem_type, _loop_info.problem_type_config.set(problem_type, sticky); } +void example_joiner::set_use_client_time(bool use_client_time, bool sticky) { + _loop_info.use_client_time.set(use_client_time, sticky); +} + bool example_joiner::joiner_ready() { return _loop_info.is_configured() && _reward_calculation.is_valid(); } @@ -223,10 +227,9 @@ bool example_joiner::process_interaction(const v2::Event &event, return false; } - je = std::move( - typed_event::event_processor::fill_in_joined_event( - *cb, metadata, enqueued_time_utc, - typed_event::event_processor::get_context(*cb))); + je = typed_event::event_processor::fill_in_joined_event( + *cb, metadata, enqueued_time_utc, + typed_event::event_processor::get_context(*cb)); } else if (metadata.payload_type() == v2::PayloadType_CCB) { const v2::MultiSlotEvent *ccb = nullptr; if (!typed_event::process_compression( @@ -243,11 +246,9 @@ bool example_joiner::process_interaction(const v2::Event &event, metadata.id()->c_str()); return false; } - je = std::move( - typed_event::event_processor::fill_in_joined_event( - *ccb, metadata, enqueued_time_utc, - typed_event::event_processor::get_context( - *ccb))); + je = typed_event::event_processor::fill_in_joined_event( + *ccb, metadata, enqueued_time_utc, + typed_event::event_processor::get_context(*ccb)); } else if (metadata.payload_type() == v2::PayloadType_CA) { const v2::CaEvent *ca = nullptr; if (!typed_event::process_compression( @@ -263,10 +264,9 @@ bool example_joiner::process_interaction(const v2::Event &event, metadata.id()->c_str()); return false; } - je = std::move( - typed_event::event_processor::fill_in_joined_event( - *ca, metadata, enqueued_time_utc, - typed_event::event_processor::get_context(*ca))); + je = typed_event::event_processor::fill_in_joined_event( + *ca, metadata, enqueued_time_utc, + typed_event::event_processor::get_context(*ca)); } else { // for now only CB is supported so log and return false VW::io::logger::log_error("Interaction event learning mode [{}] not " @@ -276,19 +276,19 @@ bool example_joiner::process_interaction(const v2::Event &event, return false; } - if(!_binary_to_json) { + if (!_binary_to_json) { std::string context(je.context); try { if (_vw->audit || _vw->hash_inv) { VW::template read_line_json( *_vw, examples, const_cast(context.c_str()), - reinterpret_cast(&VW::get_unused_example), _vw, - &_dedup_cache.dedup_examples); + reinterpret_cast(&VW::get_unused_example), + _vw, &_dedup_cache.dedup_examples); } else { VW::template read_line_json( *_vw, examples, const_cast(context.c_str()), - reinterpret_cast(&VW::get_unused_example), _vw, - &_dedup_cache.dedup_examples); + reinterpret_cast(&VW::get_unused_example), + _vw, &_dedup_cache.dedup_examples); } } catch (VW::vw_exception &e) { VW::io::logger::log_warn( @@ -317,8 +317,21 @@ bool example_joiner::process_outcome(const v2::Event &event, metadata.pass_probability(), metadata.encoding(), metadata.id()->str(), - v2::LearningModeType_Online }; //Online is the default value, we should not leave this uninitialized - o_event.enqueued_time_utc = enqueued_time_utc; + v2::LearningModeType_Online}; + + if (_loop_info.use_client_time) { + if (!metadata.client_time_utc() || + is_empty_timestamp(*metadata.client_time_utc())) { + VW::io::logger::log_warn( + "binary parser is configured to use client-provided EnqueuedTimeUTC, " + "but input metadata does not contain a client timestamp."); + o_event.enqueued_time_utc = enqueued_time_utc; + } else { + o_event.enqueued_time_utc = o_event.metadata.client_time_utc; + } + } else { + o_event.enqueued_time_utc = enqueued_time_utc; + } const v2::OutcomeEvent *outcome = nullptr; if (!typed_event::process_compression( @@ -520,7 +533,6 @@ bool example_joiner::process_joined(v_array &examples) { } je->set_reward_from_data(examples); - if (multiline) { // add an empty example to signal end-of-multiline examples.push_back(&VW::get_unused_example(_vw)); diff --git a/external_parser/joiners/example_joiner.h b/external_parser/joiners/example_joiner.h index b4a705e90..132c6a7f2 100644 --- a/external_parser/joiners/example_joiner.h +++ b/external_parser/joiners/example_joiner.h @@ -26,11 +26,13 @@ class example_joiner : public i_joiner { void set_default_reward(float default_reward, bool sticky = false) override; void set_learning_mode_config(v2::LearningModeType learning_mode, bool sticky = false) override; void set_problem_type_config(v2::ProblemType problem_type, bool sticky = false) override; + void set_use_client_time(bool use_client_time, bool sticky = false) override; bool joiner_ready() override; float default_reward() const { return _loop_info.default_reward; } v2::LearningModeType learning_mode_config() const { return _loop_info.learning_mode_config; } v2::ProblemType problem_type_config() const { return _loop_info.problem_type_config; } + bool use_client_time() const { return _loop_info.use_client_time; } // Takes an event which will have a timestamp and event payload // groups all events interactions with their event observations based on their diff --git a/external_parser/joiners/i_joiner.h b/external_parser/joiners/i_joiner.h index f0b4e0345..63b4ffe8e 100644 --- a/external_parser/joiners/i_joiner.h +++ b/external_parser/joiners/i_joiner.h @@ -31,6 +31,7 @@ class i_joiner { virtual void set_default_reward(float default_reward, bool sticky = false) = 0; virtual void set_learning_mode_config(v2::LearningModeType learning_mode, bool sticky = false) = 0; virtual void set_problem_type_config(v2::ProblemType problem_type, bool sticky = false) = 0; + virtual void set_use_client_time(bool use_client_time, bool sticky = false) = 0; /** * @brief Tells whether config was provided such that it can start joining examples. diff --git a/external_parser/joiners/multistep_example_joiner.cc b/external_parser/joiners/multistep_example_joiner.cc index b0832d742..95a1d3192 100644 --- a/external_parser/joiners/multistep_example_joiner.cc +++ b/external_parser/joiners/multistep_example_joiner.cc @@ -69,6 +69,10 @@ void multistep_example_joiner::set_problem_type_config(v2::ProblemType problem_t _loop_info.problem_type_config.set(problem_type, sticky); } +void multistep_example_joiner::set_use_client_time(bool use_client_time, bool sticky) { + _loop_info.use_client_time.set(use_client_time, sticky); +} + bool multistep_example_joiner::joiner_ready() { return _loop_info.is_configured() && _reward_calculation.is_valid(); } diff --git a/external_parser/joiners/multistep_example_joiner.h b/external_parser/joiners/multistep_example_joiner.h index adf066182..78bdc2364 100644 --- a/external_parser/joiners/multistep_example_joiner.h +++ b/external_parser/joiners/multistep_example_joiner.h @@ -33,6 +33,7 @@ class multistep_example_joiner : public i_joiner { void set_default_reward(float default_reward, bool sticky) override; void set_learning_mode_config(v2::LearningModeType learning_mode, bool sticky) override; void set_problem_type_config(v2::ProblemType problem_type, bool sticky) override; + void set_use_client_time(bool use_client_time, bool sticky = false) override; bool joiner_ready() override; bool current_event_is_skip_learn() override; diff --git a/external_parser/parse_example_binary.cc b/external_parser/parse_example_binary.cc index 307b457c9..71b27fa96 100644 --- a/external_parser/parse_example_binary.cc +++ b/external_parser/parse_example_binary.cc @@ -195,6 +195,7 @@ bool binary_parser::read_checkpoint_msg(io_buf *input) { checkpoint_info->learning_mode_config()); _example_joiner->set_problem_type_config( checkpoint_info->problem_type_config()); + _example_joiner->set_use_client_time(checkpoint_info->use_client_time()); return true; } diff --git a/external_parser/parse_example_external.cc b/external_parser/parse_example_external.cc index ca9c24147..40516831f 100644 --- a/external_parser/parse_example_external.cc +++ b/external_parser/parse_example_external.cc @@ -89,6 +89,9 @@ void apply_cli_overrides(std::unique_ptr& joiner, vw *all, const input } joiner->set_problem_type_config(problem_type, true); } + if(all->options->was_supplied("use_client_time")) { + joiner->set_use_client_time(parsed_options.ext_opts->use_client_time, true); + } if(all->options->was_supplied("learning_mode")) { v2::LearningModeType learning_mode; if(!str_to_learning_mode(parsed_options.ext_opts->learning_mode, learning_mode)) { @@ -167,6 +170,9 @@ void parser::set_parse_args(VW::config::option_group_definition &in_options, .add( VW::config::make_option("problem_type", parsed_options.ext_opts->problem_type) .help("Override the problem type trying to be solved, valid values: CB, CCB, SLATES, CA")) + .add( + VW::config::make_option("use_client_time", parsed_options.ext_opts->use_client_time) + .help("Override use_client_time to define whether client time or enqueued time will be used for reward calculation")) .add( VW::config::make_option("reward_function", parsed_options.ext_opts->reward_function) .help("Override the reward function to be used, valid values: earliest, average, median, sum, min, max")) diff --git a/external_parser/parse_example_external.h b/external_parser/parse_example_external.h index 5777fe098..3202d27ed 100644 --- a/external_parser/parse_example_external.h +++ b/external_parser/parse_example_external.h @@ -19,6 +19,7 @@ struct parser_options { std::string problem_type; std::string reward_function; std::string learning_mode; + bool use_client_time; }; int parse_examples(vw *all, v_array &examples); diff --git a/external_parser/unit_tests/CMakeLists.txt b/external_parser/unit_tests/CMakeLists.txt index 5430e4160..b2c573489 100644 --- a/external_parser/unit_tests/CMakeLists.txt +++ b/external_parser/unit_tests/CMakeLists.txt @@ -13,6 +13,7 @@ set(TEST_SOURCES test_log_converter.cc test_skip_learn.cc test_metrics.cc + test_client_and_enqueued_time.cc ) add_executable(binary_parser_unit_tests ${TEST_SOURCES}) diff --git a/external_parser/unit_tests/test_client_and_enqueued_time.cc b/external_parser/unit_tests/test_client_and_enqueued_time.cc new file mode 100644 index 000000000..e9a9160fb --- /dev/null +++ b/external_parser/unit_tests/test_client_and_enqueued_time.cc @@ -0,0 +1,121 @@ +#include "joiners/example_joiner.h" +#include "test_common.h" +#include + +void process(vw *vw, example_joiner &joiner, v_array &examples, + std::string int_file, std::string obs_file) { + std::string input_files = get_test_files_location(); + + auto interaction_buffer = read_file(input_files + int_file); + // need to keep the fb buffer around in order to process the event + std::vector int_detached_buffers; + + auto joined_cb_events = + wrap_into_joined_events(interaction_buffer, int_detached_buffers); + + for (auto &je : joined_cb_events) { + joiner.process_event(*je); + examples.push_back(&VW::get_unused_example(vw)); + } + + // need to keep the fb buffer around in order to process the event + std::vector obs_detached_buffers; + + auto observation_buffer = read_file(input_files + obs_file); + joined_cb_events = + wrap_into_joined_events(observation_buffer, obs_detached_buffers); + + for (auto &je : joined_cb_events) { + joiner.process_event(*je); + } + + BOOST_CHECK_EQUAL(joiner.processing_batch(), true); + + joiner.process_joined(examples); +} + +BOOST_AUTO_TEST_CASE(test_use_client_time_missing_and_configured_to_true) { + auto vw = VW::initialize("--quiet --binary_parser --cb_explore_adf", nullptr, + false, nullptr, nullptr); + + example_joiner joiner(vw); + v_array examples; + + joiner.set_problem_type_config(v2::ProblemType_CB); + // the test files used do not have a client time utc set, so they will use the + // enqueued time utc even though this is set to true + joiner.set_use_client_time(true); + + // rewards are {5, 4, 3} + // enqueued time goes from latest to earliest + // client time goes from earliest to latest + float earliest_enqueued = -3.f; + process(vw, joiner, examples, "/reward_functions/cb/cb_v2.fb", + "/reward_functions/cb/f-reward_3obs_v2.fb"); + + BOOST_CHECK_EQUAL(examples[1]->l.cb.costs[0].action, 1); + BOOST_CHECK_EQUAL(examples[1]->l.cb.costs[0].cost, earliest_enqueued); + BOOST_CHECK_CLOSE(examples[1]->l.cb.costs[0].probability, 0.9, FLOAT_TOL); + BOOST_CHECK_EQUAL(examples[1]->l.cb.weight, 1.0f); + + clear_examples(examples, vw); + VW::finish(*vw); +} + +BOOST_AUTO_TEST_CASE(test_use_client_time_existing_and_configured_to_false) { + auto vw = VW::initialize("--quiet --binary_parser --cb_explore_adf", nullptr, + false, nullptr, nullptr); + + example_joiner joiner(vw); + v_array examples; + + joiner.set_problem_type_config(v2::ProblemType_CB); + // the test files used have a client time utc set, but enqueued time will be + // used instead + joiner.set_use_client_time(false); + + // rewards are {1, 1, 5} + // enqueued time goes from latest to earliest + // client time goes from earliest to latest + float earliest_enqueued = -5.f; + float earliest_client = -1.f; + process(vw, joiner, examples, "/client_time/cb_v2_client_time.fb", + "/client_time/f-reward_3obs_v2_client_time.fb"); + + BOOST_CHECK_EQUAL(examples[1]->l.cb.costs[0].action, 1); + BOOST_CHECK_EQUAL(examples[1]->l.cb.costs[0].cost, earliest_enqueued); + BOOST_CHECK_CLOSE(examples[1]->l.cb.costs[0].probability, 0.9, FLOAT_TOL); + BOOST_CHECK_EQUAL(examples[1]->l.cb.weight, 1.0f); + + clear_examples(examples, vw); + VW::finish(*vw); +} + +BOOST_AUTO_TEST_CASE(test_use_client_time_existing_and_configured_to_true) { + auto vw = VW::initialize("--quiet --binary_parser --cb_explore_adf", nullptr, + false, nullptr, nullptr); + + example_joiner joiner(vw); + v_array examples; + + joiner.set_problem_type_config(v2::ProblemType_CB); + // the test files used have a client time set and it should be used instead of + // enqueued time utc + joiner.set_use_client_time(true); + + // rewards are {1, 1, 5} + // enqueued time goes from latest to earliest + // client time goes from earliest to latest + float earliest_enqueued = -5.f; + float earliest_client = -1.f; + process(vw, joiner, examples, "/client_time/cb_v2_client_time.fb", + "/client_time/f-reward_3obs_v2_client_time.fb"); + + BOOST_CHECK_EQUAL(examples[1]->l.cb.costs[0].action, 1); + BOOST_CHECK_EQUAL(examples[1]->l.cb.costs[0].cost, earliest_client); + BOOST_CHECK_CLOSE(examples[1]->l.cb.costs[0].probability, 0.9, FLOAT_TOL); + BOOST_CHECK_EQUAL(examples[1]->l.cb.weight, 1.0f); + + clear_examples(examples, vw); + VW::finish(*vw); +} \ No newline at end of file diff --git a/external_parser/unit_tests/test_files/README.md b/external_parser/unit_tests/test_files/README.md index 6cdbcecbb..6ae5b197a 100644 --- a/external_parser/unit_tests/test_files/README.md +++ b/external_parser/unit_tests/test_files/README.md @@ -73,6 +73,15 @@ Residing under `reward_functions` - ca_v2.fb: `./example_gen --kind ca --count 1 --seed -1` - f-reward_3obs_v2.fb: `./example_gen --kind f-reward --count 3 --seed -1 --random_reward` +### client time utc test files + +Residing under `client_time` + +Note: these files were generated after setting `"time_provider.implementation"` to `"CLOCK_TIME_PROVIDER"` in the configuration of example_gen + +- cb_v2_client_time.fb: `./example_gen --kind cb --count 1 --seed -1` +- f-reward_3obs_v2_client_time.fb: `./example_gen --kind f-reward --count 3 --seed -1 --random_reward` + ### skip learn test files Residing under `skip_learn` diff --git a/external_parser/unit_tests/test_files/client_time/cb_v2_client_time.fb b/external_parser/unit_tests/test_files/client_time/cb_v2_client_time.fb new file mode 100644 index 0000000000000000000000000000000000000000..627cb8013281b24bb9d66f58d50142db8da30513 GIT binary patch literal 384 zcmY*V%Syvg5IxmWyNIC+7hSk0vkpkR5D7>rSfrb-;zC3uY4f;klYmxGLZF}FCv{PO z!IgeTJd@Ui1Lw|}b7$txTmYv)X8lY9z zw8%8i;l*Ds?WBFa4|iMoF0nToXC#_qp#z)zgXj?*LVFz((~A*~=;J6Fe(rYH8F?YM zcQ4_yxP_RxxZ;Z=slUjnNSMWpk~|3B86{?ED3p#orLU{C5%-}GZ!}w*JkxT&&!ng? zW$FCa2dWRKKbEN9*L~G=mc7~hADv#e=+^W1$bL@RmhD{PYBRQUcVIKD9S?pq{9rZ@ Mqc|~( Date: Thu, 12 Aug 2021 17:59:29 +0100 Subject: [PATCH 2/2] tidy up --- external_parser/event_processors/metadata.h | 1 - .../event_processors/timestamp_helper.cc | 16 ++++++++++++ .../event_processors/timestamp_helper.h | 5 +++- .../event_processors/typed_events.h | 15 +++-------- external_parser/joiners/example_joiner.cc | 25 ++++--------------- .../joiners/multistep_example_joiner.cc | 11 ++++---- 6 files changed, 33 insertions(+), 40 deletions(-) diff --git a/external_parser/event_processors/metadata.h b/external_parser/event_processors/metadata.h index 05530fdf4..1de047295 100644 --- a/external_parser/event_processors/metadata.h +++ b/external_parser/event_processors/metadata.h @@ -10,7 +10,6 @@ namespace v2 = reinforcement_learning::messages::flatbuff::v2; namespace metadata { // used both for interactions and observations struct event_metadata_info { - TimePoint client_time_utc; std::string app_id; v2::PayloadType payload_type; float pass_probability; diff --git a/external_parser/event_processors/timestamp_helper.cc b/external_parser/event_processors/timestamp_helper.cc index 74b397c39..4ee3b224e 100644 --- a/external_parser/event_processors/timestamp_helper.cc +++ b/external_parser/event_processors/timestamp_helper.cc @@ -1,4 +1,5 @@ #include "timestamp_helper.h" +#include "io/logger.h" TimePoint timestamp_to_chrono(const v2::TimeStamp &ts) { @@ -19,4 +20,19 @@ TimePoint timestamp_to_chrono(const v2::TimeStamp &ts) { bool is_empty_timestamp(const v2::TimeStamp &ts) { return (ts.year() == 0 && ts.month() == 0 && ts.day() == 0 && ts.hour() == 0 && ts.minute() == 0 && ts.second() == 0); +} + +TimePoint get_enqueued_time(const v2::TimeStamp *enqueued_time_utc, + const v2::TimeStamp *client_time_utc, + bool use_client_time) { + if (use_client_time) { + if (!client_time_utc || is_empty_timestamp(*client_time_utc)) { + VW::io::logger::log_warn( + "binary parser is configured to use client-provided EnqueuedTimeUTC, " + "but input metadata does not contain a client timestamp."); + return timestamp_to_chrono(*enqueued_time_utc); + } + return timestamp_to_chrono(*client_time_utc); + } + return timestamp_to_chrono(*enqueued_time_utc); } \ No newline at end of file diff --git a/external_parser/event_processors/timestamp_helper.h b/external_parser/event_processors/timestamp_helper.h index 5101a005f..ac85784c7 100644 --- a/external_parser/event_processors/timestamp_helper.h +++ b/external_parser/event_processors/timestamp_helper.h @@ -8,4 +8,7 @@ namespace v2 = reinforcement_learning::messages::flatbuff::v2; using TimePoint = std::chrono::time_point; TimePoint timestamp_to_chrono(const v2::TimeStamp &ts); -bool is_empty_timestamp(const v2::TimeStamp& ts); \ No newline at end of file +bool is_empty_timestamp(const v2::TimeStamp &ts); +TimePoint get_enqueued_time(const v2::TimeStamp *enqueued_time_utc, + const v2::TimeStamp *client_time_utc, + bool use_client_time); \ No newline at end of file diff --git a/external_parser/event_processors/typed_events.h b/external_parser/event_processors/typed_events.h index a46ca4924..8ed8160d0 100644 --- a/external_parser/event_processors/typed_events.h +++ b/external_parser/event_processors/typed_events.h @@ -81,10 +81,7 @@ template <> struct event_processor { ccb_data->probability_of_drop = 1.f - metadata.pass_probability(); return {TimePoint(enqueued_time_utc), - {metadata.client_time_utc() - ? timestamp_to_chrono(*metadata.client_time_utc()) - : TimePoint(), - metadata.app_id() ? metadata.app_id()->str() : "", + {metadata.app_id() ? metadata.app_id()->str() : "", metadata.payload_type(), metadata.pass_probability(), metadata.encoding(), metadata.id()->str(), evt.learning_mode()}, std::move(line_vec), @@ -147,10 +144,7 @@ template <> struct event_processor { cb_data->interaction_data.skipLearn = evt.deferred_action(); return {TimePoint(enqueued_time_utc), - {metadata.client_time_utc() - ? timestamp_to_chrono(*metadata.client_time_utc()) - : TimePoint(), - metadata.app_id() ? metadata.app_id()->str() : "", + {metadata.app_id() ? metadata.app_id()->str() : "", metadata.payload_type(), metadata.pass_probability(), metadata.encoding(), metadata.id()->str(), evt.learning_mode()}, std::move(line_vec), @@ -201,10 +195,7 @@ template <> struct event_processor { ca_data->interaction_data.skipLearn = evt.deferred_action(); return {TimePoint(enqueued_time_utc), - {metadata.client_time_utc() - ? timestamp_to_chrono(*metadata.client_time_utc()) - : TimePoint(), - metadata.app_id() ? metadata.app_id()->str() : "", + {metadata.app_id() ? metadata.app_id()->str() : "", metadata.payload_type(), metadata.pass_probability(), metadata.encoding(), metadata.id()->str(), evt.learning_mode()}, std::move(line_vec), diff --git a/external_parser/joiners/example_joiner.cc b/external_parser/joiners/example_joiner.cc index 3202ff82f..04567d209 100644 --- a/external_parser/joiners/example_joiner.cc +++ b/external_parser/joiners/example_joiner.cc @@ -309,29 +309,14 @@ bool example_joiner::process_outcome(const v2::Event &event, const v2::Metadata &metadata, const TimePoint &enqueued_time_utc) { reward::outcome_event o_event; - o_event.metadata = {metadata.client_time_utc() - ? timestamp_to_chrono(*metadata.client_time_utc()) - : TimePoint(), - metadata.app_id() ? metadata.app_id()->str() : "", + o_event.metadata = {metadata.app_id() ? metadata.app_id()->str() : "", metadata.payload_type(), metadata.pass_probability(), metadata.encoding(), metadata.id()->str(), v2::LearningModeType_Online}; - if (_loop_info.use_client_time) { - if (!metadata.client_time_utc() || - is_empty_timestamp(*metadata.client_time_utc())) { - VW::io::logger::log_warn( - "binary parser is configured to use client-provided EnqueuedTimeUTC, " - "but input metadata does not contain a client timestamp."); - o_event.enqueued_time_utc = enqueued_time_utc; - } else { - o_event.enqueued_time_utc = o_event.metadata.client_time_utc; - } - } else { - o_event.enqueued_time_utc = enqueued_time_utc; - } + o_event.enqueued_time_utc = enqueued_time_utc; const v2::OutcomeEvent *outcome = nullptr; if (!typed_event::process_compression( @@ -441,7 +426,9 @@ bool example_joiner::process_joined(v_array &examples) { for (auto &joined_event : _batch_grouped_events[id]) { auto event = flatbuffers::GetRoot(joined_event->event()->data()); auto metadata = event->meta(); - auto enqueued_time_utc = timestamp_to_chrono(*joined_event->timestamp()); + auto enqueued_time_utc = get_enqueued_time(joined_event->timestamp(), + metadata->client_time_utc(), + _loop_info.use_client_time); const auto &payload_type = metadata->payload_type(); if (payload_type == v2::PayloadType_Outcome) { @@ -467,8 +454,6 @@ bool example_joiner::process_joined(v_array &examples) { if (!je->is_joined_event_learnable()) { _joiner_metrics.number_of_skipped_events++; } else { - // TODO does this potentially need to check and set client time utc if - // that option is on? if (_joiner_metrics.first_event_id.empty()) { _joiner_metrics.first_event_id = std::move(je->interaction_metadata.event_id); diff --git a/external_parser/joiners/multistep_example_joiner.cc b/external_parser/joiners/multistep_example_joiner.cc index 95a1d3192..6c8973e8c 100644 --- a/external_parser/joiners/multistep_example_joiner.cc +++ b/external_parser/joiners/multistep_example_joiner.cc @@ -31,7 +31,9 @@ multistep_example_joiner::~multistep_example_joiner() { bool multistep_example_joiner::process_event(const v2::JoinedEvent &joined_event) { auto event = flatbuffers::GetRoot(joined_event.event()->data()); const v2::Metadata& meta = *event->meta(); - auto enqueued_time_utc = timestamp_to_chrono(*joined_event.timestamp()); + auto enqueued_time_utc = get_enqueued_time(joined_event.timestamp(), + meta.client_time_utc(), + _loop_info.use_client_time); switch (meta.payload_type()) { case v2::PayloadType_MultiStep: { @@ -125,8 +127,7 @@ reward::outcome_event multistep_example_joiner::process_outcome(const multistep_ const auto& metadata = event_meta.meta; const auto& event = event_meta.event; reward::outcome_event o_event; - o_event.metadata = {timestamp_to_chrono(*metadata.client_time_utc()), - metadata.app_id() ? metadata.app_id()->str() : "", + o_event.metadata = {metadata.app_id() ? metadata.app_id()->str() : "", metadata.payload_type(), metadata.pass_probability(), metadata.encoding(), @@ -146,9 +147,7 @@ joined_event::joined_event multistep_example_joiner::process_interaction( v_array &examples) { const auto& metadata = event_meta.meta; const auto& event = event_meta.event; - metadata::event_metadata_info meta = {metadata.client_time_utc() - ? timestamp_to_chrono(*metadata.client_time_utc()) - : TimePoint(), + metadata::event_metadata_info meta = { metadata.app_id() ? metadata.app_id()->str() : "", metadata.payload_type(), metadata.pass_probability(),