Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 28 additions & 12 deletions common/redispipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,24 @@ class RedisPipeline {

redisReply *push(const RedisCommand& command, int expectedType)
{
if (expectedType == REDIS_REPLY_NIL)
switch (expectedType)
{
redisAppendFormattedCommand(m_db->getContext(), command.c_str(), command.length());
m_remaining++;
mayflush();
return NULL;
}
else
{
flush();
RedisReply r(m_db, command, expectedType);
return r.release();
case REDIS_REPLY_NIL:
case REDIS_REPLY_STATUS:
case REDIS_REPLY_INTEGER:
{
redisAppendFormattedCommand(m_db->getContext(), command.c_str(), command.length());
m_expectedTypes.push(expectedType);
m_remaining++;
mayflush();
return NULL;
}
default:
{
flush();
RedisReply r(m_db, command, expectedType);
return r.release();
}
}
}

Expand All @@ -59,8 +65,17 @@ class RedisPipeline {

redisReply *reply;
redisGetReply(m_db->getContext(), (void**)&reply);
RedisReply r(reply);
m_remaining--;
return reply;

int expectedType = m_expectedTypes.front();
m_expectedTypes.pop();
r.checkReplyType(expectedType);
if (expectedType == REDIS_REPLY_STATUS)
{
r.checkStatusOK();
}
return r.release();
}

void flush()
Expand All @@ -79,6 +94,7 @@ class RedisPipeline {

private:
DBConnector *m_db;
std::queue<int> m_expectedTypes;
size_t m_remaining;

void mayflush()
Expand Down
57 changes: 44 additions & 13 deletions common/table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,42 @@ using namespace swss;
using json = nlohmann::json;

Table::Table(DBConnector *db, string tableName, string tableSeparator)
: RedisTransactioner(db), TableBase(tableName, tableSeparator)
: Table(new RedisPipeline(db, 1), tableName, tableSeparator, false)
{
m_pipeowned = true;
}

bool Table::get(string key, vector<FieldValueTuple> &values)
Table::Table(RedisPipeline *pipeline, string tableName, string tableSeparator, bool buffered)
: TableBase(tableName, tableSeparator)
, m_buffered(buffered)
, m_pipeowned(false)
, m_pipe(pipeline)
{
}

Table::~Table()
{
if (m_pipeowned)
{
delete m_pipe;
}
}

void Table::setBuffered(bool buffered)
{
string hgetall_key("HGETALL ");
hgetall_key += getKeyName(key);
m_buffered = buffered;
}

RedisReply r(m_db, hgetall_key, REDIS_REPLY_ARRAY);
void Table::flush()
{
m_pipe->flush();
}

bool Table::get(string key, vector<FieldValueTuple> &values)
{
RedisCommand hgetall_key;
hgetall_key.format("HGETALL %s", getKeyName(key).c_str());
RedisReply r = m_pipe->push(hgetall_key, REDIS_REPLY_ARRAY);
redisReply *reply = r.getContext();
values.clear();

Expand Down Expand Up @@ -51,14 +77,18 @@ void Table::set(string key, vector<FieldValueTuple> &values,
RedisCommand cmd;
cmd.formatHMSET(getKeyName(key), values);

RedisReply r(m_db, cmd, REDIS_REPLY_STATUS);

r.checkStatusOK();
m_pipe->push(cmd, REDIS_REPLY_STATUS);
if (!m_buffered)
{
m_pipe->flush();
}
}

void Table::del(string key, string /* op */, string /*prefix*/)
{
RedisReply r(m_db, string("DEL ") + getKeyName(key), REDIS_REPLY_INTEGER);
RedisCommand del_key;
del_key.format("DEL %s", getKeyName(key).c_str());
m_pipe->push(del_key, REDIS_REPLY_INTEGER);
}

void TableEntryEnumerable::getContent(vector<KeyOpFieldsValuesTuple> &tuples)
Expand All @@ -80,8 +110,9 @@ void TableEntryEnumerable::getContent(vector<KeyOpFieldsValuesTuple> &tuples)

void Table::getKeys(vector<string> &keys)
{
string keys_cmd("KEYS " + getTableName() + getTableNameSeparator() + "*");
RedisReply r(m_db, keys_cmd, REDIS_REPLY_ARRAY);
RedisCommand keys_cmd;
keys_cmd.format("KEYS %s%s*", getTableName().c_str(), getTableNameSeparator().c_str());
RedisReply r = m_pipe->push(keys_cmd, REDIS_REPLY_ARRAY);
redisReply *reply = r.getContext();
keys.clear();

Expand All @@ -103,7 +134,7 @@ void Table::dump(TableDump& tableDump)

static std::string luaScript = loadLuaScript("table_dump.lua");

static std::string sha = loadRedisScript(m_db, luaScript);
static std::string sha = m_pipe->loadRedisScript(luaScript);

SWSS_LOG_TIMER("getting");

Expand All @@ -112,7 +143,7 @@ void Table::dump(TableDump& tableDump)
sha.c_str(),
getTableName().c_str());

RedisReply r(m_db, command, REDIS_REPLY_STRING);
RedisReply r = m_pipe->push(command, REDIS_REPLY_STRING);

auto ctx = r.getContext();

Expand Down
14 changes: 12 additions & 2 deletions common/table.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "dbconnector.h"
#include "redisreply.h"
#include "redisselect.h"
#include "redispipeline.h"
#include "schema.h"
#include "redistran.h"

Expand Down Expand Up @@ -112,10 +113,11 @@ class TableEntryEnumerable {
void getContent(std::vector<KeyOpFieldsValuesTuple> &tuples);
};

class Table : public RedisTransactioner, public TableBase, public TableEntryEnumerable {
class Table : public TableBase, public TableEntryEnumerable {
public:
Table(DBConnector *db, std::string tableName, std::string tableSeparator = DEFAULT_TABLE_NAME_SEPARATOR);
virtual ~Table() { }
Table(RedisPipeline *pipeline, std::string tableName, std::string tableSeparator, bool buffered);
virtual ~Table();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why remove the implementation for the Table deconstructor?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually not removed. I implemented with non-empty logic in .cpp file


/* Set an entry in the DB directly (op not in use) */
virtual void set(std::string key,
Expand All @@ -133,10 +135,18 @@ class Table : public RedisTransactioner, public TableBase, public TableEntryEnum

void getKeys(std::vector<std::string> &keys);

void setBuffered(bool buffered);

void flush();

void dump(TableDump &tableDump);

protected:

bool m_buffered;
bool m_pipeowned;
RedisPipeline *m_pipe;

/* Strip special symbols from keys used for type identification
* Input example:
* port@
Expand Down
4 changes: 3 additions & 1 deletion tests/redis_piped_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,8 @@ TEST(Table, piped_test)
{
string tableName = "TABLE_UT_TEST";
DBConnector db(TEST_VIEW, "localhost", 6379, 0);
Table t(&db, tableName);
RedisPipeline pipeline(&db);
Table t(&pipeline, tableName, DEFAULT_TABLE_NAME_SEPARATOR, true);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sihuihan88 , you need follow this example to enable redispipeline in your scenario.


clearDB();
cout << "Starting table manipulations" << endl;
Expand All @@ -348,6 +349,7 @@ TEST(Table, piped_test)

t.set(key_1, values);
t.set(key_2, values);
t.flush();

cout << "- Step 2. GET_TABLE_CONTENT" << endl;
vector<KeyOpFieldsValuesTuple> tuples;
Expand Down