From 61e318644276c46a22aa3ce24b9408ac704b700b Mon Sep 17 00:00:00 2001 From: Yaming Pei Date: Wed, 26 Nov 2025 10:50:04 +0800 Subject: [PATCH 1/4] chore(taosBenchmark): add debugging logs and fix test cases --- .../03-Benchmark/json/tmq_cancel.json | 2 +- .../03-Benchmark/test_benchmark_tmq.py | 4 +- tools/taos-tools/src/benchTmq.c | 46 ++++++++------ tools/taos-tools/src/benchUtil.c | 60 ++++++++++--------- 4 files changed, 62 insertions(+), 50 deletions(-) diff --git a/test/cases/81-Tools/03-Benchmark/json/tmq_cancel.json b/test/cases/81-Tools/03-Benchmark/json/tmq_cancel.json index 40bb9d5f1e82..e73e0c17d3b4 100644 --- a/test/cases/81-Tools/03-Benchmark/json/tmq_cancel.json +++ b/test/cases/81-Tools/03-Benchmark/json/tmq_cancel.json @@ -23,7 +23,7 @@ "rows_file": "./rows_file_sequ", "expect_rows": -1, "topic_list": [ - {"name": "topic_benchmark_meters", "sql": "select * from test.meters;"} + {"name": "topic_benchmark_meters", "sql": "select * from dbcancel.meters;"} ] } } diff --git a/test/cases/81-Tools/03-Benchmark/test_benchmark_tmq.py b/test/cases/81-Tools/03-Benchmark/test_benchmark_tmq.py index d2c73419d844..b6f8459ca8c8 100644 --- a/test/cases/81-Tools/03-Benchmark/test_benchmark_tmq.py +++ b/test/cases/81-Tools/03-Benchmark/test_benchmark_tmq.py @@ -95,9 +95,9 @@ def stopThread(self, isForceExit): time.sleep(10) pids = self.get_pids_by_name("taosBenchmark") if pids: - tdLog.info(f"Find a process named taosBbenchmark with PID: {pids}") + tdLog.info(f"Find a process named taosBenchmark with PID: {pids}") else: - tdLog.exit("No process named taosBbenchmark was found.") + tdLog.exit("No process named taosBenchmark was found.") os.kill(pids[0], signal.SIGINT) time.sleep(10) diff --git a/tools/taos-tools/src/benchTmq.c b/tools/taos-tools/src/benchTmq.c index 1bd3425310fe..51700f279fae 100644 --- a/tools/taos-tools/src/benchTmq.c +++ b/tools/taos-tools/src/benchTmq.c @@ -44,11 +44,11 @@ void printfTmqConfigIntoFile() { infoPrintToFile( "msgWithTableName: %s\n", pConsumerInfo->msgWithTableName); infoPrintToFile( "rowsFile: %s\n", pConsumerInfo->rowsFile); infoPrintToFile( "expectRows: %d\n", pConsumerInfo->expectRows); - + for (int i = 0; i < pConsumerInfo->topicCount; ++i) { infoPrintToFile( "topicName[%d]: %s\n", i, pConsumerInfo->topicName[i]); infoPrintToFile( "topicSql[%d]: %s\n", i, pConsumerInfo->topicSql[i]); - } + } } @@ -79,13 +79,16 @@ static int create_topic() { infoPrint("successfully create topic: %s\n", pConsumerInfo->topicName[i]); taos_free_result(res); + + // 延时 10 秒 + // toolsMsleep(5000); if (g_arguments->terminate) { infoPrint("%s\n", "user cancel , so exit testing."); taos_free_result(res); closeBenchConn(conn); return -1; } - + } closeBenchConn(conn); return 0; @@ -149,9 +152,9 @@ int buildConsumerAndSubscribe(tmqThreadInfo * pThreadInfo, char* groupId) { SConsumerInfo* pConsumerInfo = &g_tmqInfo.consumerInfo; tmq_list_t * topic_list = buildTopicList(); - + tmq_conf_t * conf = tmq_conf_new(); - + tmq_conf_set(conf, "td.connect.user", g_arguments->user); tmq_conf_set(conf, "td.connect.pass", g_arguments->password); tmq_conf_set(conf, "td.connect.ip", g_arguments->host); @@ -203,9 +206,9 @@ static void* tmqConsume(void* arg) { tmqThreadInfo* pThreadInfo = (tmqThreadInfo*)arg; SConsumerInfo* pConsumerInfo = &g_tmqInfo.consumerInfo; char groupId[16] = {0}; - + // "sequential" or "parallel" - if (pConsumerInfo->createMode && 0 != strncasecmp(pConsumerInfo->createMode, "sequential", 10)) { + if (pConsumerInfo->createMode && 0 != strncasecmp(pConsumerInfo->createMode, "sequential", 10)) { char* tPtr = pConsumerInfo->groupId; // "share" or "independent" @@ -225,7 +228,7 @@ static void* tmqConsume(void* arg) { infoPrint("%s\n", "buildConsumerAndSubscribe() fail in tmqConsume()"); return NULL; } - } + } int64_t totalMsgs = 0; int64_t totalRows = 0; @@ -234,7 +237,7 @@ static void* tmqConsume(void* arg) { infoPrint("consumer id %d start to loop pull msg\n", pThreadInfo->id); if ((NULL != pConsumerInfo->enableManualCommit) && (0 == strncmp("true", pConsumerInfo->enableManualCommit, 4))) { - manualCommit = 1; + manualCommit = 1; infoPrint("consumer id %d enable manual commit\n", pThreadInfo->id); } @@ -258,7 +261,7 @@ static void* tmqConsume(void* arg) { if (0 != manualCommit) { tmq_commit_sync(pThreadInfo->tmq, tmqMsg); } - + taos_free_result(tmqMsg); totalMsgs++; @@ -297,8 +300,15 @@ static void* tmqConsume(void* arg) { code = tmq_consumer_close(pThreadInfo->tmq); if (code != 0) { - errorPrint("thread %d tmq_consumer_close() fail, reason: %s\n", - pThreadInfo->id, tmq_err2str(code)); + errorPrint("thread %d tmq_consumer_close() fail, reason: %s, try again after 3s\n", + pThreadInfo->id, tmq_err2str(code)); + + toolsMsleep(3000); + code = tmq_consumer_close(pThreadInfo->tmq); + if (code != 0) { + errorPrint("thread %d tmq_consumer_close() fail, reason: %s\n", + pThreadInfo->id, tmq_err2str(code)); + } } pThreadInfo->tmq = NULL; @@ -331,7 +341,7 @@ int subscribeTestProcess() { tPtr = groupId; } } - + pthread_t * pids = benchCalloc(pConsumerInfo->concurrent, sizeof(pthread_t), true); tmqThreadInfo *infos = benchCalloc(pConsumerInfo->concurrent, sizeof(tmqThreadInfo), true); @@ -355,7 +365,7 @@ int subscribeTestProcess() { } // "sequential" or "parallel" - if (pConsumerInfo->createMode && 0 == strncasecmp(pConsumerInfo->createMode, "sequential", 10)) { + if (pConsumerInfo->createMode && 0 == strncasecmp(pConsumerInfo->createMode, "sequential", 10)) { int retVal = buildConsumerAndSubscribe(pThreadInfo, tPtr); if (0 != retVal) { infoPrint("%s\n", "buildConsumerAndSubscribe() fail!"); @@ -390,12 +400,12 @@ int subscribeTestProcess() { infoPrintToFile( "Consumed total msgs: %" PRId64 "," "total rows: %" PRId64 "\n", totalMsgs, totalRows); - + if (g_arguments->output_json_file) { tools_cJSON *root = tools_cJSON_CreateObject(); if (root) { - tools_cJSON_AddNumberToObject(root, "total_msgs", totalMsgs); - tools_cJSON_AddNumberToObject(root, "total_rows", totalRows); + tools_cJSON_AddNumberToObject(root, "total_msgs", totalMsgs); + tools_cJSON_AddNumberToObject(root, "total_rows", totalRows); char *jsonStr = tools_cJSON_PrintUnformatted(root); if (jsonStr) { FILE *fp = fopen(g_arguments->output_json_file, "w"); @@ -411,7 +421,7 @@ int subscribeTestProcess() { tools_cJSON_Delete(root); } } - + tmq_over: free(pids); free(infos); diff --git a/tools/taos-tools/src/benchUtil.c b/tools/taos-tools/src/benchUtil.c index bdd96f261315..ff9e3dd61b3a 100644 --- a/tools/taos-tools/src/benchUtil.c +++ b/tools/taos-tools/src/benchUtil.c @@ -19,7 +19,7 @@ #include #include #include -#include +#include #endif char resEncodingChunk[] = "Encoding: chunked"; @@ -54,7 +54,7 @@ FORCE_INLINE void tmfree(void *buf) { } } -FORCE_INLINE bool isRest(int32_t iface) { +FORCE_INLINE bool isRest(int32_t iface) { return REST_IFACE == iface || SML_REST_IFACE == iface; } @@ -249,7 +249,7 @@ int32_t replaceChildTblName(char *inSql, char *outSql, int tblIndex) { if (0 == pos) { errorPrint("sql format error, sql not found mark string '%s'", mark); return -1; - } + } char subTblName[TSDB_TABLE_NAME_LEN]; snprintf(subTblName, TSDB_TABLE_NAME_LEN, @@ -259,7 +259,7 @@ int32_t replaceChildTblName(char *inSql, char *outSql, int tblIndex) { TOOLS_STRNCPY(outSql, inSql, pos - inSql + 1); snprintf(outSql + (pos - inSql), TSDB_MAX_ALLOWED_SQL_LEN - 1, "%s%s", subTblName, pos + strlen(mark)); - return 0; + return 0; } int64_t toolsGetTimestamp(int32_t precision) { @@ -369,8 +369,8 @@ SBenchConn* initBenchConn(char *dbName) { infoPrint("sleep %dms and try to connect... %d/%d \n", g_arguments->trying_interval, keep_trying, g_arguments->keep_trying); if(g_arguments->trying_interval > 0) { toolsMsleep(g_arguments->trying_interval); - } - } + } + } return conn; } @@ -380,11 +380,13 @@ void closeBenchConn(SBenchConn* conn) { return ; if(conn->taos) { + infoPrint("%s", "closing connection...\n"); taos_close(conn->taos); conn->taos = NULL; } if (conn->ctaos) { + infoPrint("%s", "closing check connection...\n"); taos_close(conn->ctaos); conn->ctaos = NULL; } @@ -400,7 +402,7 @@ int64_t fetchResult(TAOS_RES *res, char * filePath) { int64_t rows = 0; char *databuf = NULL; bool toFile = strlen(filePath) > 0; - + if(toFile) { num_fields = taos_field_count(res); @@ -469,7 +471,7 @@ char *convertDatatypeToString(int type) { case TSDB_DATA_TYPE_DOUBLE: return "double"; case TSDB_DATA_TYPE_JSON: - return "json"; + return "json"; case TSDB_DATA_TYPE_VARBINARY: return "varbinary"; case TSDB_DATA_TYPE_GEOMETRY: @@ -635,7 +637,7 @@ void getDecimal64DefaultMax(uint8_t precision, uint8_t scale, Decimal64* dec) { maxStr[i] = '9'; } maxStr[precision] = '\0'; - + stringToDecimal64(maxStr, precision, scale, dec); return; } @@ -650,7 +652,7 @@ void getDecimal64DefaultMin(uint8_t precision, uint8_t scale, Decimal64* dec) { minStr[i] = '9'; } minStr[precision + 1] = '\0'; - + stringToDecimal64(minStr, precision, scale, dec); return; } @@ -664,7 +666,7 @@ void getDecimal128DefaultMax(uint8_t precision, uint8_t scale, Decimal128* dec) maxStr[i] = '9'; } maxStr[precision] = '\0'; - + stringToDecimal128(maxStr, precision, scale, dec); return; } @@ -679,7 +681,7 @@ void getDecimal128DefaultMin(uint8_t precision, uint8_t scale, Decimal128* dec) minStr[i] = '9'; } minStr[precision + 1] = '\0'; - + stringToDecimal128(minStr, precision, scale, dec); return; } @@ -938,15 +940,15 @@ int convertServAddr(int iface, bool tcp, int protocol) { if (tcp && iface == SML_REST_IFACE && protocol == TSDB_SML_TELNET_PROTOCOL) { - // telnet_tcp_port + // telnet_tcp_port if (convertHostToServAddr(host, g_arguments->telnet_tcp_port, &(g_arguments->serv_addr))) { - errorPrint("failed to convertHostToServAddr host=%s telnet_tcp_port:%d iface=%d \n", + errorPrint("failed to convertHostToServAddr host=%s telnet_tcp_port:%d iface=%d \n", host, g_arguments->telnet_tcp_port, iface); return -1; } - infoPrint("restful connect -> convertServAddr host=%s telnet_tcp_port:%d to serv_addr=%p iface=%d \n", + infoPrint("restful connect -> convertServAddr host=%s telnet_tcp_port:%d to serv_addr=%p iface=%d \n", host, g_arguments->telnet_tcp_port, &g_arguments->serv_addr, iface); } else { // port @@ -957,7 +959,7 @@ int convertServAddr(int iface, bool tcp, int protocol) { errorPrint("%s\n", "convert host to server address"); return -1; } - infoPrint("restful connect -> convertServAddr host=%s port:%d to serv_addr=%p iface=%d \n", + infoPrint("restful connect -> convertServAddr host=%s port:%d to serv_addr=%p iface=%d \n", host, port, &g_arguments->serv_addr, iface); } return 0; @@ -1042,7 +1044,7 @@ void destroySockFd(int sockfd) { closeSockFd(sockfd); } -FORCE_INLINE void printErrCmdCodeStr(char *cmd, int32_t code, TAOS_RES *res) { +FORCE_INLINE void printErrCmdCodeStr(char *cmd, int32_t code, TAOS_RES *res) { char buff[530]; char *msg = cmd; if (strlen(cmd) >= sizeof(buff)) { @@ -1076,7 +1078,7 @@ char *genColNames(BArray *cols, bool tbName, char * primaryKeyName) { } else { strcpy(buf, primaryKeyName); } - + for (int32_t i = 0; i < cols->size; i++) { Field * col = benchArrayGet(cols, i); strcat(buf, ","); @@ -1086,13 +1088,13 @@ char *genColNames(BArray *cols, bool tbName, char * primaryKeyName) { } // -// STMT2 +// STMT2 // // create TAOS_STMT2_BINDV* createBindV(int32_t capacity, int32_t tagCnt, int32_t colCnt) { // calc total size - int32_t tableSize = sizeof(char *) + sizeof(TAOS_STMT2_BIND *) + sizeof(TAOS_STMT2_BIND *) + + int32_t tableSize = sizeof(char *) + sizeof(TAOS_STMT2_BIND *) + sizeof(TAOS_STMT2_BIND *) + sizeof(TAOS_STMT2_BIND) * tagCnt + sizeof(TAOS_STMT2_BIND) * colCnt; int32_t size = sizeof(TAOS_STMT2_BINDV) + tableSize * capacity; TAOS_STMT2_BINDV *bindv = benchCalloc(1, size, false); @@ -1140,7 +1142,7 @@ void freeBindV(TAOS_STMT2_BINDV *bindv) { } // -// debug show +// debug show // void showBind(TAOS_STMT2_BIND* bind) { @@ -1163,7 +1165,7 @@ void showBind(TAOS_STMT2_BIND* bind) { break; default: break; - } + } } } @@ -1182,14 +1184,14 @@ void showTableBinds(char* label, TAOS_STMT2_BIND* binds, int32_t cnt) { // show bindv void showBindV(TAOS_STMT2_BINDV *bindv, BArray *tags, BArray *cols) { // num and base info - debugPrint("show bindv table count=%d names=%p tags=%p bind_cols=%p\n", + debugPrint("show bindv table count=%d names=%p tags=%p bind_cols=%p\n", bindv->count, bindv->tbnames, bindv->tags, bindv->bind_cols); - + for(int32_t i=0; i< bindv->count; i++) { debugPrint(" show bindv table index=%d name=%s \n", i, bindv->tbnames[i]); if(bindv->tags) showTableBinds("tag", bindv->tags[i], tags->size); - if(bindv->bind_cols) + if(bindv->bind_cols) showTableBinds("column", bindv->bind_cols[i], cols->size + 1); } } @@ -1397,7 +1399,7 @@ int fetchChildTableName(char *dbName, char *stbName) { conn->taos, dbName, stbName, g_queryInfo.superQueryInfo.childTblName, g_queryInfo.superQueryInfo.childTblCount)) { - // faild + // faild tmfree(g_queryInfo.superQueryInfo.childTblName); closeBenchConn(conn); return -1; @@ -1425,7 +1427,7 @@ int trimCaseCmp(char *str1, char *str2) { } // Check if the remaining characters in str1 are all whitespace - while (*str1 != '\0') { + while (*str1 != '\0') { if (!isblank((unsigned char)*str1)) { return -1; } @@ -1685,7 +1687,7 @@ static int getServerVersionRestImpl(int16_t rest_port, int sockfd) { tools_cJSON *versionObj = tools_cJSON_GetArrayItem(dataObj, 0); tools_cJSON *versionStrObj = tools_cJSON_GetArrayItem(versionObj, 0); server_ver = atoi(versionStrObj->valuestring); - char* pstr = tools_cJSON_Print(versionStrObj); + char* pstr = tools_cJSON_Print(versionStrObj); debugPrint("versionStrObj: %s, version: %s, server_ver: %d\n", pstr ? pstr : "null", versionStrObj->valuestring, server_ver); @@ -1861,6 +1863,6 @@ int check_write_permission(const char *path) { return -1; } fclose(fp); - + return 0; } \ No newline at end of file From 71882fd8b96167425826825d6cd3a3d7d2e90402 Mon Sep 17 00:00:00 2001 From: Yaming Pei Date: Wed, 26 Nov 2025 11:21:56 +0800 Subject: [PATCH 2/4] chore(taosBenchmark): optimize debugging logs --- tools/taos-tools/src/benchTmq.c | 4 ++-- tools/taos-tools/src/benchUtil.c | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/tools/taos-tools/src/benchTmq.c b/tools/taos-tools/src/benchTmq.c index 51700f279fae..3f53e91b15b3 100644 --- a/tools/taos-tools/src/benchTmq.c +++ b/tools/taos-tools/src/benchTmq.c @@ -300,13 +300,13 @@ static void* tmqConsume(void* arg) { code = tmq_consumer_close(pThreadInfo->tmq); if (code != 0) { - errorPrint("thread %d tmq_consumer_close() fail, reason: %s, try again after 3s\n", + errorPrint("thread %d tmq_consumer_close() failed, reason: %s, try again after 3s\n", pThreadInfo->id, tmq_err2str(code)); toolsMsleep(3000); code = tmq_consumer_close(pThreadInfo->tmq); if (code != 0) { - errorPrint("thread %d tmq_consumer_close() fail, reason: %s\n", + errorPrint("thread %d tmq_consumer_close() failed after retry, reason: %s\n", pThreadInfo->id, tmq_err2str(code)); } } diff --git a/tools/taos-tools/src/benchUtil.c b/tools/taos-tools/src/benchUtil.c index ff9e3dd61b3a..0d9a11352665 100644 --- a/tools/taos-tools/src/benchUtil.c +++ b/tools/taos-tools/src/benchUtil.c @@ -1399,7 +1399,7 @@ int fetchChildTableName(char *dbName, char *stbName) { conn->taos, dbName, stbName, g_queryInfo.superQueryInfo.childTblName, g_queryInfo.superQueryInfo.childTblCount)) { - // faild + // failed tmfree(g_queryInfo.superQueryInfo.childTblName); closeBenchConn(conn); return -1; From 1ea862ebc9bf05fc1d99fe73926541b19b14885e Mon Sep 17 00:00:00 2001 From: Yaming Pei Date: Wed, 26 Nov 2025 16:05:21 +0800 Subject: [PATCH 3/4] chore(taosBenchmark): delete invalid comment --- tools/taos-tools/src/benchTmq.c | 3 --- 1 file changed, 3 deletions(-) diff --git a/tools/taos-tools/src/benchTmq.c b/tools/taos-tools/src/benchTmq.c index 3f53e91b15b3..7b224b81feed 100644 --- a/tools/taos-tools/src/benchTmq.c +++ b/tools/taos-tools/src/benchTmq.c @@ -79,9 +79,6 @@ static int create_topic() { infoPrint("successfully create topic: %s\n", pConsumerInfo->topicName[i]); taos_free_result(res); - - // 延时 10 秒 - // toolsMsleep(5000); if (g_arguments->terminate) { infoPrint("%s\n", "user cancel , so exit testing."); taos_free_result(res); From 25370ef06488c98fb8d565b1026860f078b7cac1 Mon Sep 17 00:00:00 2001 From: Yaming Pei Date: Wed, 26 Nov 2025 16:49:40 +0800 Subject: [PATCH 4/4] test(taosBenchmark): open debugging flag --- .../03-Benchmark/test_benchmark_tmq.py | 21 ++++++++++--------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/test/cases/81-Tools/03-Benchmark/test_benchmark_tmq.py b/test/cases/81-Tools/03-Benchmark/test_benchmark_tmq.py index b6f8459ca8c8..f615825555e3 100644 --- a/test/cases/81-Tools/03-Benchmark/test_benchmark_tmq.py +++ b/test/cases/81-Tools/03-Benchmark/test_benchmark_tmq.py @@ -21,8 +21,9 @@ class TestBenchmarkTmq: updatecfgDict = { - 'slowLogScope' : "others" - } + 'slowLogScope' : "others", + 'debugFlag' : 143 + } # # ------------------- test_tmp_basic.py ---------------- # @@ -57,7 +58,7 @@ def do_tmq_case(self): tdLog.info("%s" % cmd) os.system("%s" % cmd) time.sleep(5) - + cmd = "%s -f %s/json/tmq_basic2.json " % (binPath, os.path.dirname(__file__)) tdLog.info("%s" % cmd) os.system("%s" % cmd) @@ -77,7 +78,7 @@ def do_tmq_case(self): # print("taosBenchmark be killed on purpose") # except: # tdLog.exit("failed to kill taosBenchmark") - + print("do tmq case .......................... [passed]") # @@ -101,9 +102,9 @@ def stopThread(self, isForceExit): os.kill(pids[0], signal.SIGINT) time.sleep(10) - + if self._rlist: - tdLog.info(self._rlist) + tdLog.info(self._rlist) self.checkListString(self._rlist, "Receive SIGINT or other signal, quit benchmark") else: tdLog.exit("The benchmark process has not stopped!") @@ -122,11 +123,11 @@ def dbTmqThread(self): # run def do_tmq_cancel(self): - self._rlist = None + self._rlist = None tdLog.info(f"start to excute {__file__}") tdSql.execute("drop topic if exists topic_benchmark_meters") self.dbInsert() - tdLog.info(f"dbInsert finish!") + tdLog.info(f"dbInsert finish!") t1 = threading.Thread(target=self.dbTmqThread) t2 = threading.Thread(target=self.stopThread, args=(False,)) @@ -134,8 +135,8 @@ def do_tmq_cancel(self): t2.start() # wait for threads to complete t1.join() - t2.join() - + t2.join() + print("do tmq cancel ......................... [passed]") #