Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion test/cases/81-Tools/03-Benchmark/json/tmq_cancel.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
"rows_file": "./rows_file_sequ",
"expect_rows": -1,
"topic_list": [
{"name": "topic_benchmark_meters", "sql": "select * from test.meters;"}
{"name": "topic_benchmark_meters", "sql": "select * from dbcancel.meters;"}
]
}
}
4 changes: 2 additions & 2 deletions test/cases/81-Tools/03-Benchmark/test_benchmark_tmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,9 @@ def stopThread(self, isForceExit):
time.sleep(10)
pids = self.get_pids_by_name("taosBenchmark")
if pids:
tdLog.info(f"Find a process named taosBbenchmark with PID: {pids}")
tdLog.info(f"Find a process named taosBenchmark with PID: {pids}")
else:
tdLog.exit("No process named taosBbenchmark was found.")
tdLog.exit("No process named taosBenchmark was found.")

os.kill(pids[0], signal.SIGINT)
time.sleep(10)
Expand Down
46 changes: 28 additions & 18 deletions tools/taos-tools/src/benchTmq.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ void printfTmqConfigIntoFile() {
infoPrintToFile( "msgWithTableName: %s\n", pConsumerInfo->msgWithTableName);
infoPrintToFile( "rowsFile: %s\n", pConsumerInfo->rowsFile);
infoPrintToFile( "expectRows: %d\n", pConsumerInfo->expectRows);

for (int i = 0; i < pConsumerInfo->topicCount; ++i) {
infoPrintToFile( "topicName[%d]: %s\n", i, pConsumerInfo->topicName[i]);
infoPrintToFile( "topicSql[%d]: %s\n", i, pConsumerInfo->topicSql[i]);
}
}
}


Expand Down Expand Up @@ -79,13 +79,16 @@ static int create_topic() {

infoPrint("successfully create topic: %s\n", pConsumerInfo->topicName[i]);
taos_free_result(res);

// 延时 10 秒
// toolsMsleep(5000);
if (g_arguments->terminate) {
infoPrint("%s\n", "user cancel , so exit testing.");
taos_free_result(res);
closeBenchConn(conn);
return -1;
}

}
closeBenchConn(conn);
return 0;
Expand Down Expand Up @@ -149,9 +152,9 @@ int buildConsumerAndSubscribe(tmqThreadInfo * pThreadInfo, char* groupId) {
SConsumerInfo* pConsumerInfo = &g_tmqInfo.consumerInfo;

tmq_list_t * topic_list = buildTopicList();

tmq_conf_t * conf = tmq_conf_new();

tmq_conf_set(conf, "td.connect.user", g_arguments->user);
tmq_conf_set(conf, "td.connect.pass", g_arguments->password);
tmq_conf_set(conf, "td.connect.ip", g_arguments->host);
Expand Down Expand Up @@ -203,9 +206,9 @@ static void* tmqConsume(void* arg) {
tmqThreadInfo* pThreadInfo = (tmqThreadInfo*)arg;
SConsumerInfo* pConsumerInfo = &g_tmqInfo.consumerInfo;
char groupId[16] = {0};

// "sequential" or "parallel"
if (pConsumerInfo->createMode && 0 != strncasecmp(pConsumerInfo->createMode, "sequential", 10)) {
if (pConsumerInfo->createMode && 0 != strncasecmp(pConsumerInfo->createMode, "sequential", 10)) {

char* tPtr = pConsumerInfo->groupId;
// "share" or "independent"
Expand All @@ -225,7 +228,7 @@ static void* tmqConsume(void* arg) {
infoPrint("%s\n", "buildConsumerAndSubscribe() fail in tmqConsume()");
return NULL;
}
}
}

int64_t totalMsgs = 0;
int64_t totalRows = 0;
Expand All @@ -234,7 +237,7 @@ static void* tmqConsume(void* arg) {
infoPrint("consumer id %d start to loop pull msg\n", pThreadInfo->id);

if ((NULL != pConsumerInfo->enableManualCommit) && (0 == strncmp("true", pConsumerInfo->enableManualCommit, 4))) {
manualCommit = 1;
manualCommit = 1;
infoPrint("consumer id %d enable manual commit\n", pThreadInfo->id);
}

Expand All @@ -258,7 +261,7 @@ static void* tmqConsume(void* arg) {
if (0 != manualCommit) {
tmq_commit_sync(pThreadInfo->tmq, tmqMsg);
}

taos_free_result(tmqMsg);

totalMsgs++;
Expand Down Expand Up @@ -297,8 +300,15 @@ static void* tmqConsume(void* arg) {

code = tmq_consumer_close(pThreadInfo->tmq);
if (code != 0) {
errorPrint("thread %d tmq_consumer_close() fail, reason: %s\n",
pThreadInfo->id, tmq_err2str(code));
errorPrint("thread %d tmq_consumer_close() fail, reason: %s, try again after 3s\n",
pThreadInfo->id, tmq_err2str(code));

toolsMsleep(3000);
code = tmq_consumer_close(pThreadInfo->tmq);
if (code != 0) {
errorPrint("thread %d tmq_consumer_close() fail, reason: %s\n",
pThreadInfo->id, tmq_err2str(code));
}
}
pThreadInfo->tmq = NULL;

Expand Down Expand Up @@ -331,7 +341,7 @@ int subscribeTestProcess() {
tPtr = groupId;
}
}

pthread_t * pids = benchCalloc(pConsumerInfo->concurrent, sizeof(pthread_t), true);
tmqThreadInfo *infos = benchCalloc(pConsumerInfo->concurrent, sizeof(tmqThreadInfo), true);

Expand All @@ -355,7 +365,7 @@ int subscribeTestProcess() {
}

// "sequential" or "parallel"
if (pConsumerInfo->createMode && 0 == strncasecmp(pConsumerInfo->createMode, "sequential", 10)) {
if (pConsumerInfo->createMode && 0 == strncasecmp(pConsumerInfo->createMode, "sequential", 10)) {
int retVal = buildConsumerAndSubscribe(pThreadInfo, tPtr);
if (0 != retVal) {
infoPrint("%s\n", "buildConsumerAndSubscribe() fail!");
Expand Down Expand Up @@ -390,12 +400,12 @@ int subscribeTestProcess() {
infoPrintToFile(
"Consumed total msgs: %" PRId64 ","
"total rows: %" PRId64 "\n", totalMsgs, totalRows);

if (g_arguments->output_json_file) {
tools_cJSON *root = tools_cJSON_CreateObject();
if (root) {
tools_cJSON_AddNumberToObject(root, "total_msgs", totalMsgs);
tools_cJSON_AddNumberToObject(root, "total_rows", totalRows);
tools_cJSON_AddNumberToObject(root, "total_msgs", totalMsgs);
tools_cJSON_AddNumberToObject(root, "total_rows", totalRows);
char *jsonStr = tools_cJSON_PrintUnformatted(root);
if (jsonStr) {
FILE *fp = fopen(g_arguments->output_json_file, "w");
Expand All @@ -411,7 +421,7 @@ int subscribeTestProcess() {
tools_cJSON_Delete(root);
}
}

tmq_over:
free(pids);
free(infos);
Expand Down
Loading
Loading