Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion external_parser/event_processors/loop.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,10 @@ struct loop_info {
sticky_value<float> default_reward = sticky_value<float>(0.f);
sticky_value<v2::LearningModeType> learning_mode_config = sticky_value<v2::LearningModeType>(v2::LearningModeType_Online);
sticky_value<v2::ProblemType> problem_type_config;
sticky_value<bool> use_client_time = sticky_value<bool>(false);

bool is_configured() const {
return default_reward.is_valid() && learning_mode_config.is_valid() && problem_type_config.is_valid();
return default_reward.is_valid() && learning_mode_config.is_valid() && problem_type_config.is_valid() && use_client_time.is_valid();
}//&& type.is_valid()
};
} // namespace loop
1 change: 0 additions & 1 deletion external_parser/event_processors/metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ namespace v2 = reinforcement_learning::messages::flatbuff::v2;
namespace metadata {
// used both for interactions and observations
struct event_metadata_info {
TimePoint client_time_utc;
std::string app_id;
v2::PayloadType payload_type;
float pass_probability;
Expand Down
21 changes: 21 additions & 0 deletions external_parser/event_processors/timestamp_helper.cc
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "timestamp_helper.h"
#include "io/logger.h"

TimePoint timestamp_to_chrono(const v2::TimeStamp &ts) {

Expand All @@ -14,4 +15,24 @@ TimePoint timestamp_to_chrono(const v2::TimeStamp &ts) {
date::day(ts.day())) +
std::chrono::hours(ts.hour()) + std::chrono::minutes(ts.minute()) +
std::chrono::seconds(ts.second());
}

bool is_empty_timestamp(const v2::TimeStamp &ts) {
return (ts.year() == 0 && ts.month() == 0 && ts.day() == 0 &&
ts.hour() == 0 && ts.minute() == 0 && ts.second() == 0);
}

TimePoint get_enqueued_time(const v2::TimeStamp *enqueued_time_utc,
const v2::TimeStamp *client_time_utc,
bool use_client_time) {
if (use_client_time) {
if (!client_time_utc || is_empty_timestamp(*client_time_utc)) {
VW::io::logger::log_warn(
"binary parser is configured to use client-provided EnqueuedTimeUTC, "
"but input metadata does not contain a client timestamp.");
return timestamp_to_chrono(*enqueued_time_utc);
}
return timestamp_to_chrono(*client_time_utc);
}
return timestamp_to_chrono(*enqueued_time_utc);
}
6 changes: 5 additions & 1 deletion external_parser/event_processors/timestamp_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,8 @@

namespace v2 = reinforcement_learning::messages::flatbuff::v2;
using TimePoint = std::chrono::time_point<std::chrono::system_clock>;
TimePoint timestamp_to_chrono(const v2::TimeStamp &ts);
TimePoint timestamp_to_chrono(const v2::TimeStamp &ts);
bool is_empty_timestamp(const v2::TimeStamp &ts);
TimePoint get_enqueued_time(const v2::TimeStamp *enqueued_time_utc,
const v2::TimeStamp *client_time_utc,
bool use_client_time);
15 changes: 3 additions & 12 deletions external_parser/event_processors/typed_events.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,7 @@ template <> struct event_processor<v2::MultiSlotEvent> {
ccb_data->probability_of_drop = 1.f - metadata.pass_probability();

return {TimePoint(enqueued_time_utc),
{metadata.client_time_utc()
? timestamp_to_chrono(*metadata.client_time_utc())
: TimePoint(),
metadata.app_id() ? metadata.app_id()->str() : "",
{metadata.app_id() ? metadata.app_id()->str() : "",
metadata.payload_type(), metadata.pass_probability(),
metadata.encoding(), metadata.id()->str(), evt.learning_mode()},
std::move(line_vec),
Expand Down Expand Up @@ -147,10 +144,7 @@ template <> struct event_processor<v2::CbEvent> {
cb_data->interaction_data.skipLearn = evt.deferred_action();

return {TimePoint(enqueued_time_utc),
{metadata.client_time_utc()
? timestamp_to_chrono(*metadata.client_time_utc())
: TimePoint(),
metadata.app_id() ? metadata.app_id()->str() : "",
{metadata.app_id() ? metadata.app_id()->str() : "",
metadata.payload_type(), metadata.pass_probability(),
metadata.encoding(), metadata.id()->str(), evt.learning_mode()},
std::move(line_vec),
Expand Down Expand Up @@ -201,10 +195,7 @@ template <> struct event_processor<v2::CaEvent> {
ca_data->interaction_data.skipLearn = evt.deferred_action();

return {TimePoint(enqueued_time_utc),
{metadata.client_time_utc()
? timestamp_to_chrono(*metadata.client_time_utc())
: TimePoint(),
metadata.app_id() ? metadata.app_id()->str() : "",
{metadata.app_id() ? metadata.app_id()->str() : "",
metadata.payload_type(), metadata.pass_probability(),
metadata.encoding(), metadata.id()->str(), evt.learning_mode()},
std::move(line_vec),
Expand Down
51 changes: 24 additions & 27 deletions external_parser/joiners/example_joiner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,10 @@ void example_joiner::set_problem_type_config(v2::ProblemType problem_type,
_loop_info.problem_type_config.set(problem_type, sticky);
}

void example_joiner::set_use_client_time(bool use_client_time, bool sticky) {
_loop_info.use_client_time.set(use_client_time, sticky);
}

bool example_joiner::joiner_ready() {
return _loop_info.is_configured() && _reward_calculation.is_valid();
}
Expand Down Expand Up @@ -223,10 +227,9 @@ bool example_joiner::process_interaction(const v2::Event &event,
return false;
}

je = std::move(
typed_event::event_processor<v2::CbEvent>::fill_in_joined_event(
*cb, metadata, enqueued_time_utc,
typed_event::event_processor<v2::CbEvent>::get_context(*cb)));
je = typed_event::event_processor<v2::CbEvent>::fill_in_joined_event(
*cb, metadata, enqueued_time_utc,
typed_event::event_processor<v2::CbEvent>::get_context(*cb));
} else if (metadata.payload_type() == v2::PayloadType_CCB) {
const v2::MultiSlotEvent *ccb = nullptr;
if (!typed_event::process_compression<v2::MultiSlotEvent>(
Expand All @@ -243,11 +246,9 @@ bool example_joiner::process_interaction(const v2::Event &event,
metadata.id()->c_str());
return false;
}
je = std::move(
typed_event::event_processor<v2::MultiSlotEvent>::fill_in_joined_event(
*ccb, metadata, enqueued_time_utc,
typed_event::event_processor<v2::MultiSlotEvent>::get_context(
*ccb)));
je = typed_event::event_processor<v2::MultiSlotEvent>::fill_in_joined_event(
*ccb, metadata, enqueued_time_utc,
typed_event::event_processor<v2::MultiSlotEvent>::get_context(*ccb));
} else if (metadata.payload_type() == v2::PayloadType_CA) {
const v2::CaEvent *ca = nullptr;
if (!typed_event::process_compression<v2::CaEvent>(
Expand All @@ -263,10 +264,9 @@ bool example_joiner::process_interaction(const v2::Event &event,
metadata.id()->c_str());
return false;
}
je = std::move(
typed_event::event_processor<v2::CaEvent>::fill_in_joined_event(
*ca, metadata, enqueued_time_utc,
typed_event::event_processor<v2::CaEvent>::get_context(*ca)));
je = typed_event::event_processor<v2::CaEvent>::fill_in_joined_event(
*ca, metadata, enqueued_time_utc,
typed_event::event_processor<v2::CaEvent>::get_context(*ca));
} else {
// for now only CB is supported so log and return false
VW::io::logger::log_error("Interaction event learning mode [{}] not "
Expand All @@ -276,19 +276,19 @@ bool example_joiner::process_interaction(const v2::Event &event,
return false;
}

if(!_binary_to_json) {
if (!_binary_to_json) {
std::string context(je.context);
try {
if (_vw->audit || _vw->hash_inv) {
VW::template read_line_json<true>(
*_vw, examples, const_cast<char *>(context.c_str()),
reinterpret_cast<VW::example_factory_t>(&VW::get_unused_example), _vw,
&_dedup_cache.dedup_examples);
reinterpret_cast<VW::example_factory_t>(&VW::get_unused_example),
_vw, &_dedup_cache.dedup_examples);
} else {
VW::template read_line_json<false>(
*_vw, examples, const_cast<char *>(context.c_str()),
reinterpret_cast<VW::example_factory_t>(&VW::get_unused_example), _vw,
&_dedup_cache.dedup_examples);
reinterpret_cast<VW::example_factory_t>(&VW::get_unused_example),
_vw, &_dedup_cache.dedup_examples);
}
} catch (VW::vw_exception &e) {
VW::io::logger::log_warn(
Expand All @@ -309,15 +309,13 @@ bool example_joiner::process_outcome(const v2::Event &event,
const v2::Metadata &metadata,
const TimePoint &enqueued_time_utc) {
reward::outcome_event o_event;
o_event.metadata = {metadata.client_time_utc()
? timestamp_to_chrono(*metadata.client_time_utc())
: TimePoint(),
metadata.app_id() ? metadata.app_id()->str() : "",
o_event.metadata = {metadata.app_id() ? metadata.app_id()->str() : "",
metadata.payload_type(),
metadata.pass_probability(),
metadata.encoding(),
metadata.id()->str(),
v2::LearningModeType_Online }; //Online is the default value, we should not leave this uninitialized
v2::LearningModeType_Online};

o_event.enqueued_time_utc = enqueued_time_utc;

const v2::OutcomeEvent *outcome = nullptr;
Expand Down Expand Up @@ -428,7 +426,9 @@ bool example_joiner::process_joined(v_array<example *> &examples) {
for (auto &joined_event : _batch_grouped_events[id]) {
auto event = flatbuffers::GetRoot<v2::Event>(joined_event->event()->data());
auto metadata = event->meta();
auto enqueued_time_utc = timestamp_to_chrono(*joined_event->timestamp());
auto enqueued_time_utc = get_enqueued_time(joined_event->timestamp(),
metadata->client_time_utc(),
_loop_info.use_client_time);
const auto &payload_type = metadata->payload_type();

if (payload_type == v2::PayloadType_Outcome) {
Expand All @@ -454,8 +454,6 @@ bool example_joiner::process_joined(v_array<example *> &examples) {
if (!je->is_joined_event_learnable()) {
_joiner_metrics.number_of_skipped_events++;
} else {
// TODO does this potentially need to check and set client time utc if
// that option is on?
if (_joiner_metrics.first_event_id.empty()) {
_joiner_metrics.first_event_id =
std::move(je->interaction_metadata.event_id);
Expand Down Expand Up @@ -520,7 +518,6 @@ bool example_joiner::process_joined(v_array<example *> &examples) {
}
je->set_reward_from_data(examples);


if (multiline) {
// add an empty example to signal end-of-multiline
examples.push_back(&VW::get_unused_example(_vw));
Expand Down
2 changes: 2 additions & 0 deletions external_parser/joiners/example_joiner.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@ class example_joiner : public i_joiner {
void set_default_reward(float default_reward, bool sticky = false) override;
void set_learning_mode_config(v2::LearningModeType learning_mode, bool sticky = false) override;
void set_problem_type_config(v2::ProblemType problem_type, bool sticky = false) override;
void set_use_client_time(bool use_client_time, bool sticky = false) override;
bool joiner_ready() override;

float default_reward() const { return _loop_info.default_reward; }
v2::LearningModeType learning_mode_config() const { return _loop_info.learning_mode_config; }
v2::ProblemType problem_type_config() const { return _loop_info.problem_type_config; }
bool use_client_time() const { return _loop_info.use_client_time; }

// Takes an event which will have a timestamp and event payload
// groups all events interactions with their event observations based on their
Expand Down
1 change: 1 addition & 0 deletions external_parser/joiners/i_joiner.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class i_joiner {
virtual void set_default_reward(float default_reward, bool sticky = false) = 0;
virtual void set_learning_mode_config(v2::LearningModeType learning_mode, bool sticky = false) = 0;
virtual void set_problem_type_config(v2::ProblemType problem_type, bool sticky = false) = 0;
virtual void set_use_client_time(bool use_client_time, bool sticky = false) = 0;

/**
* @brief Tells whether config was provided such that it can start joining examples.
Expand Down
15 changes: 9 additions & 6 deletions external_parser/joiners/multistep_example_joiner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ multistep_example_joiner::~multistep_example_joiner() {
bool multistep_example_joiner::process_event(const v2::JoinedEvent &joined_event) {
auto event = flatbuffers::GetRoot<v2::Event>(joined_event.event()->data());
const v2::Metadata& meta = *event->meta();
auto enqueued_time_utc = timestamp_to_chrono(*joined_event.timestamp());
auto enqueued_time_utc = get_enqueued_time(joined_event.timestamp(),
meta.client_time_utc(),
_loop_info.use_client_time);
switch (meta.payload_type()) {
case v2::PayloadType_MultiStep:
{
Expand Down Expand Up @@ -69,6 +71,10 @@ void multistep_example_joiner::set_problem_type_config(v2::ProblemType problem_t
_loop_info.problem_type_config.set(problem_type, sticky);
}

void multistep_example_joiner::set_use_client_time(bool use_client_time, bool sticky) {
_loop_info.use_client_time.set(use_client_time, sticky);
}

bool multistep_example_joiner::joiner_ready() {
return _loop_info.is_configured() && _reward_calculation.is_valid();
}
Expand Down Expand Up @@ -121,8 +127,7 @@ reward::outcome_event multistep_example_joiner::process_outcome(const multistep_
const auto& metadata = event_meta.meta;
const auto& event = event_meta.event;
reward::outcome_event o_event;
o_event.metadata = {timestamp_to_chrono(*metadata.client_time_utc()),
metadata.app_id() ? metadata.app_id()->str() : "",
o_event.metadata = {metadata.app_id() ? metadata.app_id()->str() : "",
metadata.payload_type(),
metadata.pass_probability(),
metadata.encoding(),
Expand All @@ -142,9 +147,7 @@ joined_event::joined_event multistep_example_joiner::process_interaction(
v_array<example *> &examples) {
const auto& metadata = event_meta.meta;
const auto& event = event_meta.event;
metadata::event_metadata_info meta = {metadata.client_time_utc()
? timestamp_to_chrono(*metadata.client_time_utc())
: TimePoint(),
metadata::event_metadata_info meta = {
metadata.app_id() ? metadata.app_id()->str() : "",
metadata.payload_type(),
metadata.pass_probability(),
Expand Down
1 change: 1 addition & 0 deletions external_parser/joiners/multistep_example_joiner.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class multistep_example_joiner : public i_joiner {
void set_default_reward(float default_reward, bool sticky) override;
void set_learning_mode_config(v2::LearningModeType learning_mode, bool sticky) override;
void set_problem_type_config(v2::ProblemType problem_type, bool sticky) override;
void set_use_client_time(bool use_client_time, bool sticky = false) override;
bool joiner_ready() override;

bool current_event_is_skip_learn() override;
Expand Down
1 change: 1 addition & 0 deletions external_parser/parse_example_binary.cc
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ bool binary_parser::read_checkpoint_msg(io_buf *input) {
checkpoint_info->learning_mode_config());
_example_joiner->set_problem_type_config(
checkpoint_info->problem_type_config());
_example_joiner->set_use_client_time(checkpoint_info->use_client_time());

return true;
}
Expand Down
6 changes: 6 additions & 0 deletions external_parser/parse_example_external.cc
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ void apply_cli_overrides(std::unique_ptr<i_joiner>& joiner, vw *all, const input
}
joiner->set_problem_type_config(problem_type, true);
}
if(all->options->was_supplied("use_client_time")) {
joiner->set_use_client_time(parsed_options.ext_opts->use_client_time, true);
}
if(all->options->was_supplied("learning_mode")) {
v2::LearningModeType learning_mode;
if(!str_to_learning_mode(parsed_options.ext_opts->learning_mode, learning_mode)) {
Expand Down Expand Up @@ -167,6 +170,9 @@ void parser::set_parse_args(VW::config::option_group_definition &in_options,
.add(
VW::config::make_option("problem_type", parsed_options.ext_opts->problem_type)
.help("Override the problem type trying to be solved, valid values: CB, CCB, SLATES, CA"))
.add(
VW::config::make_option("use_client_time", parsed_options.ext_opts->use_client_time)
.help("Override use_client_time to define whether client time or enqueued time will be used for reward calculation"))
.add(
VW::config::make_option("reward_function", parsed_options.ext_opts->reward_function)
.help("Override the reward function to be used, valid values: earliest, average, median, sum, min, max"))
Expand Down
1 change: 1 addition & 0 deletions external_parser/parse_example_external.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ struct parser_options {
std::string problem_type;
std::string reward_function;
std::string learning_mode;
bool use_client_time;
};

int parse_examples(vw *all, v_array<example *> &examples);
Expand Down
1 change: 1 addition & 0 deletions external_parser/unit_tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ set(TEST_SOURCES
test_log_converter.cc
test_skip_learn.cc
test_metrics.cc
test_client_and_enqueued_time.cc
)

add_executable(binary_parser_unit_tests ${TEST_SOURCES})
Expand Down
Loading