Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 123 additions & 0 deletions common/producerstatetable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,35 @@ ProducerStateTable::ProducerStateTable(RedisPipeline *pipeline, const string &ta
"end\n";
m_shaDel = m_pipe->loadRedisScript(luaDel);

string luaBatchedSet =
"local added = 0\n"
"local idx = 2\n"
"for i = 0, #KEYS - 4 do\n"
" added = added + redis.call('SADD', KEYS[2], KEYS[4 + i])\n"
" for j = 0, tonumber(ARGV[idx]) - 1 do\n"
" local attr = ARGV[idx + j * 2 + 1]\n"
" local val = ARGV[idx + j * 2 + 2]\n"
" redis.call('HSET', KEYS[3] .. KEYS[4 + i], attr, val)\n"
" end\n"
" idx = idx + tonumber(ARGV[idx]) * 2 + 1\n"
"end\n"
"if added > 0 then \n"
" redis.call('PUBLISH', KEYS[1], ARGV[1])\n"
"end\n";
m_shaBatchedSet = m_pipe->loadRedisScript(luaBatchedSet);

string luaBatchedDel =
"local added = 0\n"
"for i = 0, #KEYS - 5 do\n"
" added = added + redis.call('SADD', KEYS[2], KEYS[5 + i])\n"
" redis.call('SADD', KEYS[3], KEYS[5 + i])\n"
" redis.call('DEL', KEYS[4] .. KEYS[5 + i])\n"
"end\n"
"if added > 0 then \n"
" redis.call('PUBLISH', KEYS[1], ARGV[1])\n"
"end\n";
m_shaBatchedDel = m_pipe->loadRedisScript(luaBatchedDel);

string luaClear =
"redis.call('DEL', KEYS[1])\n"
"local keys = redis.call('KEYS', KEYS[2] .. '*')\n"
Expand Down Expand Up @@ -156,6 +185,100 @@ void ProducerStateTable::del(const string &key, const string &op /*= DEL_COMMAND
}
}

void ProducerStateTable::set(const std::vector<KeyOpFieldsValuesTuple>& values)
{
if (m_tempViewActive)
{
// Write to temp view instead of DB
for (const auto &value : values)
{
const std::string &key = kfvKey(value);
for (const auto &iv : kfvFieldsValues(value))
{
m_tempViewState[key][fvField(iv)] = fvValue(iv);
}
}
return;
}

// Assembly redis command args into a string vector
vector<string> args;
args.emplace_back("EVALSHA");
args.emplace_back(m_shaBatchedSet);
args.emplace_back(to_string(values.size() + 3));
args.emplace_back(getChannelName(m_pipe->getDbId()));
args.emplace_back(getKeySetName());
args.emplace_back(getStateHashPrefix() + getTableName() + getTableNameSeparator());
for (const auto &value : values)
{
args.emplace_back(kfvKey(value));
}
args.emplace_back("G");
for (const auto &value : values)
{
args.emplace_back(to_string(kfvFieldsValues(value).size()));
for (const auto &iv : kfvFieldsValues(value))
{
args.emplace_back(fvField(iv));
args.emplace_back(fvValue(iv));
}
}

// Transform data structure
vector<const char *> args1;
transform(args.begin(), args.end(), back_inserter(args1), [](const string &s) { return s.c_str(); } );

// Invoke redis command
RedisCommand command;
command.formatArgv((int)args1.size(), &args1[0], NULL);
m_pipe->push(command, REDIS_REPLY_NIL);
if (!m_buffered)
{
m_pipe->flush();
}
}

void ProducerStateTable::del(const std::vector<std::string>& keys)
{
if (m_tempViewActive)
{
// Write to temp view instead of DB
for (const auto &key : keys)
{
m_tempViewState.erase(key);
}
return;
}

// Assembly redis command args into a string vector
vector<string> args;
args.emplace_back("EVALSHA");
args.emplace_back(m_shaBatchedDel);
args.emplace_back(to_string(keys.size() + 4));
args.emplace_back(getChannelName(m_pipe->getDbId()));
args.emplace_back(getKeySetName());
args.emplace_back(getDelKeySetName());
args.emplace_back(getStateHashPrefix() + getTableName() + getTableNameSeparator());
for (const auto &key : keys)
{
args.emplace_back(key);
}
args.emplace_back("G");

// Transform data structure
vector<const char *> args1;
transform(args.begin(), args.end(), back_inserter(args1), [](const string &s) { return s.c_str(); } );

// Invoke redis command
RedisCommand command;
command.formatArgv((int)args1.size(), &args1[0], NULL);
m_pipe->push(command, REDIS_REPLY_NIL);
if (!m_buffered)
{
m_pipe->flush();
}
}

void ProducerStateTable::flush()
{
m_pipe->flush();
Expand Down
7 changes: 7 additions & 0 deletions common/producerstatetable.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ class ProducerStateTable : public TableBase, public TableName_KeySet
%}
#endif

// Batched version of set() and del().
virtual void set(const std::vector<KeyOpFieldsValuesTuple>& values);

virtual void del(const std::vector<std::string>& keys);

void flush();

int64_t count();
Expand All @@ -52,6 +57,8 @@ class ProducerStateTable : public TableBase, public TableName_KeySet
RedisPipeline *m_pipe;
std::string m_shaSet;
std::string m_shaDel;
std::string m_shaBatchedSet;
std::string m_shaBatchedDel;
std::string m_shaClear;
std::string m_shaApplyView;
TableDump m_tempViewState;
Expand Down
153 changes: 151 additions & 2 deletions tests/redis_piped_state_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <thread>
#include <algorithm>
#include <memory>
#include <deque>
#include "gtest/gtest.h"
#include "common/dbconnector.h"
#include "common/notificationconsumer.h"
Expand Down Expand Up @@ -469,7 +470,7 @@ TEST(ConsumerStateTable, async_singlethread)
EXPECT_EQ(p.count(), 0);
RedisReply r2(&db, queryCommand.c_str(), REDIS_REPLY_ARRAY);
EXPECT_EQ(r2.getContext()->elements, (size_t)0);

int numberOfNotification = 0;
while ((ret = cs.select(&selectcs, 1000)) == Select::OBJECT)
{
Expand All @@ -492,6 +493,155 @@ TEST(ConsumerStateTable, async_singlethread)
cout << endl << "Done." << endl;
}

TEST(ConsumerStateTable, async_batched)
{
clearDB();

int index = 0;
string tableName = "UT_REDIS_THREAD_" + to_string(index);
DBConnector db(TEST_DB, 0, true);
RedisPipeline pipeline(&db);
ProducerStateTable p(&pipeline, tableName, true);
vector<KeyOpFieldsValuesTuple> set_requests;

// Do pending data clear test first.
for (int i = 0; i < NUMBER_OF_OPS; i++)
{
vector<FieldValueTuple> fields;
int maxNumOfFields = getMaxFields(i);
for (int j = 0; j < maxNumOfFields; j++)
{
FieldValueTuple t(field(j), value(j));
fields.push_back(t);
}
if ((i % 100) == 0)
cout << "+" << flush;
set_requests.push_back(KeyOpFieldsValuesTuple({key(i), SET_COMMAND, fields}));
}
p.set(set_requests);
p.clear();
EXPECT_EQ(p.count(), 0);
string queryCommand = "KEYS " + p.getStateHashPrefix() + tableName + p.getTableNameSeparator() + "*";
RedisReply r(&db, queryCommand.c_str(), REDIS_REPLY_ARRAY);
EXPECT_EQ(r.getContext()->elements, (size_t)0);

// Set the batch size to be NUMBER_OF_OPS, so that we can pop all requests
// in one call.
ConsumerStateTable c(&db, tableName, NUMBER_OF_OPS);
Select cs;
Selectable *selectcs;
int ret, i = 0;
deque<KeyOpFieldsValuesTuple> vkco;

cs.addSelectable(&c);
ret = cs.select(&selectcs, 1000);
EXPECT_EQ(ret, Select::TIMEOUT);

set_requests.clear();
for (i = 0; i < NUMBER_OF_OPS; i++)
{
vector<FieldValueTuple> fields;
int maxNumOfFields = getMaxFields(i);
for (int j = 0; j < maxNumOfFields; j++)
{
FieldValueTuple t(field(j), value(j));
fields.push_back(t);
}
if ((i % 100) == 0)
cout << "+" << flush;
set_requests.push_back(KeyOpFieldsValuesTuple({key(i), SET_COMMAND, fields}));
}
p.set(set_requests);
p.flush();
// KeySet of the ProducerStateTable has data to be picked up by ConsumerStateTable
EXPECT_EQ(p.count(), NUMBER_OF_OPS);

ret = cs.select(&selectcs, 1000);
c.pops(vkco);
int numberOfKeysSet = 0;
for (auto kco : vkco)
{
EXPECT_EQ(kfvOp(kco), "SET");
numberOfKeysSet++;
validateFields(kfvKey(kco), kfvFieldsValues(kco));

if ((i++ % 100) == 0)
cout << "-" << flush;
}
EXPECT_LE(numberOfKeysSet, NUMBER_OF_OPS);
EXPECT_EQ(ret, Select::OBJECT);
// KeySet of the ProducerStateTable has been emptied by ConsumerStateTable
EXPECT_EQ(p.count(), 0);

vector<string> del_requests;
for (i = 0; i < NUMBER_OF_OPS; i++)
{
del_requests.push_back(key(i));
if ((i % 100) == 0)
cout << "+" << flush;
}
p.del(del_requests);

EXPECT_EQ(p.count(), NUMBER_OF_OPS);
p.flush();

ret = cs.select(&selectcs, 1000);
vkco.clear();
c.pops(vkco);
int numberOfKeyDeleted = 0;
for (auto kco : vkco)
{
EXPECT_EQ(kfvOp(kco), "DEL");
numberOfKeyDeleted++;

if ((i++ % 100) == 0)
cout << "-" << flush;
}
EXPECT_EQ(p.count(), 0);

EXPECT_LE(numberOfKeysSet, numberOfKeyDeleted);
EXPECT_EQ(ret, Select::OBJECT);

// clear test with consumer already listening
del_requests.clear();
for (i = 0; i < NUMBER_OF_OPS; i++)
{
del_requests.push_back(key(i));
if ((i % 100) == 0)
cout << "+" << flush;
}
p.del(del_requests);

EXPECT_EQ(p.count(), NUMBER_OF_OPS);
p.flush();
p.clear();
EXPECT_EQ(p.count(), 0);
RedisReply r2(&db, queryCommand.c_str(), REDIS_REPLY_ARRAY);
EXPECT_EQ(r2.getContext()->elements, (size_t)0);

int numberOfNotification = 0;
while ((ret = cs.select(&selectcs, 1000)) == Select::OBJECT)
{
KeyOpFieldsValuesTuple kco;
c.pop(kco);
// keys have been dropped, expecting empty kco.
EXPECT_EQ(kfvOp(kco), "");
numberOfNotification++;

if ((i++ % 100) == 0)
cout << "-" << flush;
}
EXPECT_EQ(p.count(), 0);

// There is only 1 notification in batched request.
EXPECT_EQ(1, numberOfNotification);
EXPECT_EQ(ret, Select::TIMEOUT);

cout << "Done. Waiting for all job to finish " << NUMBER_OF_OPS << " jobs." << endl;

cout << endl << "Done." << endl;
}

TEST(ConsumerStateTable, async_test)
{
thread *producerThreads[NUMBER_OF_THREADS];
Expand Down Expand Up @@ -580,4 +730,3 @@ TEST(ConsumerStateTable, async_multitable)

cout << endl << "Done." << endl;
}

Loading