Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 9 additions & 20 deletions vowpalwabbit/cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -71,25 +71,19 @@ int read_cached_features(vw* all, v_array<example*>& examples)
all->example_parser->_shared_data, &ae->l, ae->_reduction_features, *input);
if (total == 0) return 0;
if (read_cached_tag(*input, ae) == 0) return 0;
char* c;

// is newline example or not
unsigned char newline_indicator = 0;
if (input->buf_read(c, sizeof(newline_indicator)) < sizeof(newline_indicator)) return 0;
newline_indicator = *reinterpret_cast<unsigned char*>(c);
unsigned char newline_indicator = input->read_value<unsigned char>("newline_indicator");
if (newline_indicator == newline_example) { ae->is_newline = true; }
else
{
ae->is_newline = false;
}
c += sizeof(newline_indicator);
all->example_parser->input->set(c);

// read indices
unsigned char num_indices = 0;
if (input->buf_read(c, sizeof(num_indices)) < sizeof(num_indices)) return 0;
num_indices = *reinterpret_cast<unsigned char*>(c);
c += sizeof(num_indices);
unsigned char num_indices = input->read_value<unsigned char>("num_indices");

all->example_parser->input->set(c);
char* c;
for (; num_indices > 0; num_indices--)
{
size_t temp;
Expand All @@ -107,7 +101,7 @@ int read_cached_features(vw* all, v_array<example*>& examples)
features& ours = ae->feature_space[index];
size_t storage = *reinterpret_cast<size_t*>(c);
c += sizeof(size_t);
all->example_parser->input->set(c);
input->set(c);
total += storage;
if (input->buf_read(c, storage) < storage)
{
Expand Down Expand Up @@ -138,7 +132,7 @@ int read_cached_features(vw* all, v_array<example*>& examples)
last = i;
ours.push_back(v, i);
}
all->example_parser->input->set(c);
input->set(c);
}

return static_cast<int>(total);
Expand Down Expand Up @@ -212,13 +206,8 @@ void cache_features(io_buf& cache, example* ae, uint64_t mask)
{
cache_tag(cache, ae->tag);

if (ae->is_newline) { output_byte(cache, newline_example); }
else
{
output_byte(cache, non_newline_example);
}
output_byte(cache, static_cast<unsigned char>(ae->indices.size()));

cache.write_value<unsigned char>(ae->is_newline ? newline_example : non_newline_example);
cache.write_value<unsigned char>(static_cast<unsigned char>(ae->indices.size()));
for (namespace_index ns : ae->indices) output_features(cache, ns, ae->feature_space[ns], mask);
}

Expand Down
32 changes: 31 additions & 1 deletion vowpalwabbit/io_buf.h
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ class io_buf
size_t unflushed_bytes_count() { return head - _buffer._begin; }

void flush();

bool close_file()
{
if (!input_files.empty())
Expand All @@ -212,6 +212,36 @@ class io_buf
while (close_file()) {}
}

template <typename T>
void write_value(const T& value)
{
char* c;
buf_write(c, sizeof(T));
*reinterpret_cast<T*>(c) = value;
c += sizeof(T);
set(c);
}

template <typename T>
T read_value(const char* debug_name = nullptr)
{
char* c;
T value;
if (buf_read(c, sizeof(T)) < sizeof(T))
{
if (debug_name != nullptr)
{ THROW("Failed to read cache value: " << debug_name << ", with size: " << sizeof(T)); }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is probably out of scope for this change, but does it make sense to have a mode where an error in reading from the cache would cause the learning to switch back to reading from non-cache? Or is the coordination hard enough to make it not worth the time?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the current landscape the coordination to achieve that is hard enough to not make it worth it. I can see it being a nice thing to have in future though.

else
{
THROW("Failed to read cache value with size: " << sizeof(T));
}
}
value = *reinterpret_cast<T*>(c);
c += sizeof(T);
set(c);
return value;
}

void buf_write(char*& pointer, size_t n);
size_t buf_read(char*& pointer, size_t n);

Expand Down