Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion test/cases/81-Tools/03-Benchmark/json/tmq_cancel.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
"rows_file": "./rows_file_sequ",
"expect_rows": -1,
"topic_list": [
{"name": "topic_benchmark_meters", "sql": "select * from test.meters;"}
{"name": "topic_benchmark_meters", "sql": "select * from dbcancel.meters;"}
]
}
}
25 changes: 13 additions & 12 deletions test/cases/81-Tools/03-Benchmark/test_benchmark_tmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@

class TestBenchmarkTmq:
updatecfgDict = {
'slowLogScope' : "others"
}
'slowLogScope' : "others",
'debugFlag' : 143
}
#
# ------------------- test_tmp_basic.py ----------------
#
Expand Down Expand Up @@ -57,7 +58,7 @@ def do_tmq_case(self):
tdLog.info("%s" % cmd)
os.system("%s" % cmd)
time.sleep(5)

cmd = "%s -f %s/json/tmq_basic2.json " % (binPath, os.path.dirname(__file__))
tdLog.info("%s" % cmd)
os.system("%s" % cmd)
Expand All @@ -77,7 +78,7 @@ def do_tmq_case(self):
# print("taosBenchmark be killed on purpose")
# except:
# tdLog.exit("failed to kill taosBenchmark")

print("do tmq case .......................... [passed]")

#
Expand All @@ -95,15 +96,15 @@ def stopThread(self, isForceExit):
time.sleep(10)
pids = self.get_pids_by_name("taosBenchmark")
if pids:
tdLog.info(f"Find a process named taosBbenchmark with PID: {pids}")
tdLog.info(f"Find a process named taosBenchmark with PID: {pids}")
else:
tdLog.exit("No process named taosBbenchmark was found.")
tdLog.exit("No process named taosBenchmark was found.")

os.kill(pids[0], signal.SIGINT)
time.sleep(10)

if self._rlist:
tdLog.info(self._rlist)
tdLog.info(self._rlist)
self.checkListString(self._rlist, "Receive SIGINT or other signal, quit benchmark")
else:
tdLog.exit("The benchmark process has not stopped!")
Expand All @@ -122,20 +123,20 @@ def dbTmqThread(self):

# run
def do_tmq_cancel(self):
self._rlist = None
self._rlist = None
tdLog.info(f"start to excute {__file__}")
tdSql.execute("drop topic if exists topic_benchmark_meters")
self.dbInsert()
tdLog.info(f"dbInsert finish!")
tdLog.info(f"dbInsert finish!")

t1 = threading.Thread(target=self.dbTmqThread)
t2 = threading.Thread(target=self.stopThread, args=(False,))
t1.start()
t2.start()
# wait for threads to complete
t1.join()
t2.join()
t2.join()

print("do tmq cancel ......................... [passed]")

#
Expand Down
43 changes: 25 additions & 18 deletions tools/taos-tools/src/benchTmq.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ void printfTmqConfigIntoFile() {
infoPrintToFile( "msgWithTableName: %s\n", pConsumerInfo->msgWithTableName);
infoPrintToFile( "rowsFile: %s\n", pConsumerInfo->rowsFile);
infoPrintToFile( "expectRows: %d\n", pConsumerInfo->expectRows);

for (int i = 0; i < pConsumerInfo->topicCount; ++i) {
infoPrintToFile( "topicName[%d]: %s\n", i, pConsumerInfo->topicName[i]);
infoPrintToFile( "topicSql[%d]: %s\n", i, pConsumerInfo->topicSql[i]);
}
}
}


Expand Down Expand Up @@ -85,7 +85,7 @@ static int create_topic() {
closeBenchConn(conn);
return -1;
}

}
closeBenchConn(conn);
return 0;
Expand Down Expand Up @@ -149,9 +149,9 @@ int buildConsumerAndSubscribe(tmqThreadInfo * pThreadInfo, char* groupId) {
SConsumerInfo* pConsumerInfo = &g_tmqInfo.consumerInfo;

tmq_list_t * topic_list = buildTopicList();

tmq_conf_t * conf = tmq_conf_new();

tmq_conf_set(conf, "td.connect.user", g_arguments->user);
tmq_conf_set(conf, "td.connect.pass", g_arguments->password);
tmq_conf_set(conf, "td.connect.ip", g_arguments->host);
Expand Down Expand Up @@ -203,9 +203,9 @@ static void* tmqConsume(void* arg) {
tmqThreadInfo* pThreadInfo = (tmqThreadInfo*)arg;
SConsumerInfo* pConsumerInfo = &g_tmqInfo.consumerInfo;
char groupId[16] = {0};

// "sequential" or "parallel"
if (pConsumerInfo->createMode && 0 != strncasecmp(pConsumerInfo->createMode, "sequential", 10)) {
if (pConsumerInfo->createMode && 0 != strncasecmp(pConsumerInfo->createMode, "sequential", 10)) {

char* tPtr = pConsumerInfo->groupId;
// "share" or "independent"
Expand All @@ -225,7 +225,7 @@ static void* tmqConsume(void* arg) {
infoPrint("%s\n", "buildConsumerAndSubscribe() fail in tmqConsume()");
return NULL;
}
}
}

int64_t totalMsgs = 0;
int64_t totalRows = 0;
Expand All @@ -234,7 +234,7 @@ static void* tmqConsume(void* arg) {
infoPrint("consumer id %d start to loop pull msg\n", pThreadInfo->id);

if ((NULL != pConsumerInfo->enableManualCommit) && (0 == strncmp("true", pConsumerInfo->enableManualCommit, 4))) {
manualCommit = 1;
manualCommit = 1;
infoPrint("consumer id %d enable manual commit\n", pThreadInfo->id);
}

Expand All @@ -258,7 +258,7 @@ static void* tmqConsume(void* arg) {
if (0 != manualCommit) {
tmq_commit_sync(pThreadInfo->tmq, tmqMsg);
}

taos_free_result(tmqMsg);

totalMsgs++;
Expand Down Expand Up @@ -297,8 +297,15 @@ static void* tmqConsume(void* arg) {

code = tmq_consumer_close(pThreadInfo->tmq);
if (code != 0) {
errorPrint("thread %d tmq_consumer_close() fail, reason: %s\n",
pThreadInfo->id, tmq_err2str(code));
errorPrint("thread %d tmq_consumer_close() failed, reason: %s, try again after 3s\n",
pThreadInfo->id, tmq_err2str(code));

toolsMsleep(3000);
code = tmq_consumer_close(pThreadInfo->tmq);
if (code != 0) {
errorPrint("thread %d tmq_consumer_close() failed after retry, reason: %s\n",
pThreadInfo->id, tmq_err2str(code));
}
}
pThreadInfo->tmq = NULL;

Expand Down Expand Up @@ -331,7 +338,7 @@ int subscribeTestProcess() {
tPtr = groupId;
}
}

pthread_t * pids = benchCalloc(pConsumerInfo->concurrent, sizeof(pthread_t), true);
tmqThreadInfo *infos = benchCalloc(pConsumerInfo->concurrent, sizeof(tmqThreadInfo), true);

Expand All @@ -355,7 +362,7 @@ int subscribeTestProcess() {
}

// "sequential" or "parallel"
if (pConsumerInfo->createMode && 0 == strncasecmp(pConsumerInfo->createMode, "sequential", 10)) {
if (pConsumerInfo->createMode && 0 == strncasecmp(pConsumerInfo->createMode, "sequential", 10)) {
int retVal = buildConsumerAndSubscribe(pThreadInfo, tPtr);
if (0 != retVal) {
infoPrint("%s\n", "buildConsumerAndSubscribe() fail!");
Expand Down Expand Up @@ -390,12 +397,12 @@ int subscribeTestProcess() {
infoPrintToFile(
"Consumed total msgs: %" PRId64 ","
"total rows: %" PRId64 "\n", totalMsgs, totalRows);

if (g_arguments->output_json_file) {
tools_cJSON *root = tools_cJSON_CreateObject();
if (root) {
tools_cJSON_AddNumberToObject(root, "total_msgs", totalMsgs);
tools_cJSON_AddNumberToObject(root, "total_rows", totalRows);
tools_cJSON_AddNumberToObject(root, "total_msgs", totalMsgs);
tools_cJSON_AddNumberToObject(root, "total_rows", totalRows);
char *jsonStr = tools_cJSON_PrintUnformatted(root);
if (jsonStr) {
FILE *fp = fopen(g_arguments->output_json_file, "w");
Expand All @@ -411,7 +418,7 @@ int subscribeTestProcess() {
tools_cJSON_Delete(root);
}
}

tmq_over:
free(pids);
free(infos);
Expand Down
Loading
Loading