Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions common/configdb.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,11 @@ class ConfigDBConnector_Native : public SonicV2Connector_Native
try:
(table, row) = key.split(self.TABLE_NAME_SEPARATOR, 1)
if table in self.handlers:
client = self.get_redis_client(self.db_name)
data = self.raw_to_typed(client.hgetall(key))
if item['data'] == 'del':
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

del

Could you add some test cases for hdel?

  1. hdel one field, but there are still remaining fields
  2. hdel the last field, so the whole key disappears.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@qiluo-msft done. Please review

data = None
else:
client = self.get_redis_client(self.db_name)
data = self.raw_to_typed(client.hgetall(key))
if table in init_data and row in init_data[table]:
cache_hit = init_data[table][row] == data
del init_data[table][row]
Expand Down
90 changes: 71 additions & 19 deletions tests/test_redis_ut.py
Original file line number Diff line number Diff line change
Expand Up @@ -634,46 +634,98 @@ def thread_coming_entry():
def test_ConfigDBInit():
table_name_1 = 'TEST_TABLE_1'
table_name_2 = 'TEST_TABLE_2'
table_name_3 = 'TEST_TABLE_3'
test_key = 'key1'
test_data = {'field1': 'value1'}
test_data_update = {'field1': 'value2'}
test_data = {'field1': 'value1', 'field2': 'value2'}

queue = multiprocessing.Queue()

manager = multiprocessing.Manager()
ret_data = manager.dict()

def test_handler(table, key, data, ret):
ret[table] = {key: data}

def test_init_handler(data, ret):
def test_handler(table, key, data, ret, q=None):
if table not in ret:
ret[table] = {}
if data is None:
ret[table] = {k: v for k, v in ret[table].items() if k != key}
if q:
q.put(ret[table])
elif key not in ret[table] or ret[table][key] != data:
ret[table] = {key: data}
if q:
q.put(ret[table])

def test_init_handler(data, ret, queue):
ret.update(data)
queue.put(ret)

def thread_listen(ret):
def thread_listen(ret, queue):
config_db = ConfigDBConnector()
config_db.connect(wait_for_init=False)

config_db.subscribe(table_name_1, lambda table, key, data: test_handler(table, key, data, ret),
config_db.subscribe(table_name_1, lambda table, key, data: test_handler(table, key, data, ret, queue),
fire_init_data=False)
config_db.subscribe(table_name_2, lambda table, key, data: test_handler(table, key, data, ret),
config_db.subscribe(table_name_2, lambda table, key, data: test_handler(table, key, data, ret, queue),
fire_init_data=True)
config_db.subscribe(table_name_3, lambda table, key, data: test_handler(table, key, data, ret, queue),
fire_init_data=False)

config_db.listen(init_data_handler=lambda data: test_init_handler(data, ret))
config_db.listen(init_data_handler=lambda data: test_init_handler(data, ret, queue))

config_db = ConfigDBConnector()
config_db.connect(wait_for_init=False)
client = config_db.get_redis_client(config_db.CONFIG_DB)
client.flushdb()

# Init table data
config_db.set_entry(table_name_1, test_key, test_data)
config_db.set_entry(table_name_2, test_key, test_data)
# Prepare unique data per each table to track if correct data are received in the update
table_1_data = {f'{table_name_1}_{k}': v for k, v in test_data.items()}
config_db.set_entry(table_name_1, test_key, table_1_data)
table_2_data = {f'{table_name_2}_{k}': v for k, v in test_data.items()}
config_db.set_entry(table_name_2, test_key, table_2_data)
config_db.set_entry(table_name_3, test_key, {})

thread = multiprocessing.Process(target=thread_listen, args=(ret_data,))
thread.start()
time.sleep(5)
thread.terminate()
# Run the listener in a separate process. It is not possible to stop a listener when it is running as a thread.
# When it runs in a separate process we can terminate it with a signal.
listener = multiprocessing.Process(target=thread_listen, args=(ret_data, queue))
listener.start()

assert ret_data[table_name_1] == {test_key: test_data}
assert ret_data[table_name_2] == {test_key: test_data}
try:
# During the subscription to table 2 'fire_init_data=True' is passed. The callback should be called before the listener.
# Verify that callback is fired before listener initialization.
data = queue.get(timeout=5)
assert data == {test_key: table_2_data}

# Wait for init data
init_data = queue.get(timeout=5)

# Verify that all tables initialized correctly
assert init_data[table_name_1] == {test_key: table_1_data}
assert init_data[table_name_2] == {test_key: table_2_data}
assert init_data[table_name_3] == {test_key: {}}

# Remove one key-value pair from the data. Verify that the entry is updated correctly
table_1_data.popitem()
config_db.set_entry(table_name_1, test_key, table_1_data)
data = queue.get(timeout=5)
assert data == {test_key: table_1_data}

# Remove all key-value pairs. Verify that the table still contains key
config_db.set_entry(table_name_1, test_key, {})
data = queue.get(timeout=5)
assert data == {test_key: {}}

# Remove the key
config_db.set_entry(table_name_1, test_key, None)
data = queue.get(timeout=5)
assert test_key not in data

# Remove the entry (with no attributes) from the table.
# Verify that the update is received and a callback is called
config_db.set_entry(table_name_3, test_key, None)
table_3_data = queue.get(timeout=5)
assert test_key not in table_3_data
finally:
listener.terminate()


def test_DBConnectFailure():
Expand Down