Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 2 additions & 12 deletions include/common/tmsg.h
Original file line number Diff line number Diff line change
Expand Up @@ -4462,22 +4462,13 @@ typedef struct {
int8_t subType;
int8_t withMeta;
char* qmsg; // SubPlanToString
SSchemaWrapper schema;
int64_t suid;
} SMqRebVgReq;

int32_t tEncodeSMqRebVgReq(SEncoder* pCoder, const SMqRebVgReq* pReq);
int32_t tDecodeSMqRebVgReq(SDecoder* pCoder, SMqRebVgReq* pReq);

typedef struct {
char topic[TSDB_TOPIC_FNAME_LEN];
int64_t ntbUid;
SArray* colIdList; // SArray<int16_t>
} STqCheckInfo;

int32_t tEncodeSTqCheckInfo(SEncoder* pEncoder, const STqCheckInfo* pInfo);
int32_t tDecodeSTqCheckInfo(SDecoder* pDecoder, STqCheckInfo* pInfo);
void tDeleteSTqCheckInfo(STqCheckInfo* pInfo);

// tqOffset
enum {
TMQ_OFFSET__RESET_NONE = -3,
Expand Down Expand Up @@ -4943,7 +4934,6 @@ typedef struct {
char topic[TSDB_TOPIC_FNAME_LEN];
char db[TSDB_DB_FNAME_LEN];
SArray* vgs; // SArray<SMqSubVgEp>
SSchemaWrapper schema;
} SMqSubTopicEp;

int32_t tEncodeMqSubTopicEp(void** buf, const SMqSubTopicEp* pTopicEp);
Expand All @@ -4970,7 +4960,7 @@ typedef struct {
STqOffsetVal reqOffset;
int32_t blockNum;
int8_t withTbName;
int8_t withSchema;
// int8_t withSchema;
SArray* blockDataLen;
SArray* blockData;
SArray* blockTbName;
Expand Down
3 changes: 1 addition & 2 deletions include/libs/executor/executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,7 @@ bool qNeedReset(qTaskInfo_t pInfo);
* @param SReadHandle
* @return
*/
qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int32_t vgId, int32_t* numOfCols,
uint64_t id);
qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int32_t vgId, uint64_t id);

int32_t qGetColumnsFromNodeList(void* data, bool isList, SArray** pColList);
SSDataBlock* createDataBlockFromDescNode(void* pNode);
Expand Down
3 changes: 0 additions & 3 deletions include/libs/executor/storageapi.h
Original file line number Diff line number Diff line change
Expand Up @@ -263,9 +263,6 @@ typedef struct SStoreTqReader {
// int32_t (*tqReaderRetrieveTaosXBlock)(); // todo remove it

int32_t (*tqReaderSetSubmitMsg)(); // todo remove it
// bool (*tqReaderNextBlockFilterOut)();

int32_t (*tqReaderSetVtableInfo)();
} SStoreTqReader;

typedef struct SStoreSnapshotFn {
Expand Down
1 change: 0 additions & 1 deletion source/client/inc/clientInt.h
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,6 @@ typedef struct {
char topic[TSDB_TOPIC_FNAME_LEN];
char db[TSDB_DB_FNAME_LEN];
int32_t vgId;
SSchemaWrapper schema;
int32_t resIter;
SReqResultInfo resInfo;
union{
Expand Down
8 changes: 0 additions & 8 deletions source/client/src/clientRawBlockWrite.c
Original file line number Diff line number Diff line change
Expand Up @@ -2201,10 +2201,6 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, uint32_t dataLen) {
RAW_RETURN_CHECK(smlInitHandle(&pQuery));
uDebug(LOG_ID_TAG " write raw meta data block num:%d", LOG_ID_VALUE, rspObj.dataRsp.blockNum);
while (++rspObj.resIter < rspObj.dataRsp.blockNum) {
if (!rspObj.dataRsp.withSchema) {
goto end;
}

const char* tbName = (const char*)taosArrayGetP(rspObj.dataRsp.blockTbName, rspObj.resIter);
RAW_NULL_CHECK(tbName);
SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.dataRsp.blockSchema, rspObj.resIter);
Expand Down Expand Up @@ -2284,10 +2280,6 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, uint32_t dataLen)
RAW_RETURN_CHECK(smlInitHandle(&pQuery));
uDebug(LOG_ID_TAG " write raw meta data block num:%d", LOG_ID_VALUE, rspObj.dataRsp.blockNum);
while (++rspObj.resIter < rspObj.dataRsp.blockNum) {
if (!rspObj.dataRsp.withSchema) {
goto end;
}

const char* tbName = (const char*)taosArrayGetP(rspObj.dataRsp.blockTbName, rspObj.resIter);
RAW_NULL_CHECK(tbName);
SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.dataRsp.blockSchema, rspObj.resIter);
Expand Down
34 changes: 4 additions & 30 deletions source/client/src/clientTmq.c
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,6 @@ typedef struct {
char topicName[TSDB_TOPIC_FNAME_LEN];
char db[TSDB_DB_FNAME_LEN];
SArray* vgs; // SArray<SMqClientVg>
SSchemaWrapper schema;
int8_t noPrivilege;
} SMqClientTopic;

Expand Down Expand Up @@ -1250,7 +1249,6 @@ static void freeClientTopic(void* param) {
return;
}
SMqClientTopic* pTopic = param;
taosMemoryFreeClear(pTopic->schema.pSchema);
taosArrayDestroyEx(pTopic->vgs, freeClientVg);
}

Expand All @@ -1259,9 +1257,6 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic
if (pTopic == NULL || pTopicEp == NULL || pVgOffsetHashMap == NULL || tmq == NULL) {
return;
}
pTopic->schema = pTopicEp->schema;
pTopicEp->schema.nCols = 0;
pTopicEp->schema.pSchema = NULL;

char vgKey[TSDB_TOPIC_FNAME_LEN + 22] = {0};
int32_t vgNumGet = taosArrayGetSize(pTopicEp->vgs);
Expand Down Expand Up @@ -2263,15 +2258,6 @@ static void tmqBuildRspFromWrapperInner(SMqPollRspWrapper* pWrapper, SMqClientVg
pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;

SMqDataRsp* pDataRsp = &pRspObj->dataRsp;
bool needTransformSchema = !pDataRsp->withSchema;
if (!pDataRsp->withSchema) { // withSchema is false if subscribe subquery, true if subscribe db or stable
pDataRsp->withSchema = true;
pDataRsp->blockSchema = taosArrayInit(pDataRsp->blockNum, sizeof(void*));
if (pDataRsp->blockSchema == NULL) {
tqErrorC("failed to allocate memory for blockSchema");
return;
}
}
// extract the rows in this data packet
for (int32_t i = 0; i < pDataRsp->blockNum; ++i) {
void* pRetrieve = taosArrayGetP(pDataRsp->blockData, i);
Expand All @@ -2283,15 +2269,6 @@ static void tmqBuildRspFromWrapperInner(SMqPollRspWrapper* pWrapper, SMqClientVg
pVg->numOfRows += rows;
(*numOfRows) += rows;
changeByteEndian(rawData);
if (needTransformSchema) { // withSchema is false if subscribe subquery, true if subscribe db or stable
SSchemaWrapper* schema = tCloneSSchemaWrapper(&pWrapper->topicHandle->schema);
if (schema) {
if (taosArrayPush(pDataRsp->blockSchema, &schema) == NULL) {
tqErrorC("failed to push schema into blockSchema");
continue;
}
}
}
}
}

Expand Down Expand Up @@ -3095,14 +3072,11 @@ int32_t tmqGetNextResInfo(TAOS_RES* res, bool convertUcs4, SReqResultInfo** pRes

pRspObj->resIter++;
if (pRspObj->resIter < data->blockNum) {
if (data->withSchema) {
doFreeReqResultInfo(&pRspObj->resInfo);
SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(data->blockSchema, pRspObj->resIter);
if (pSW) {
TAOS_CHECK_RETURN(setResSchemaInfo(&pRspObj->resInfo, pSW->pSchema, pSW->nCols, NULL, false));
}
doFreeReqResultInfo(&pRspObj->resInfo);
SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(data->blockSchema, pRspObj->resIter);
if (pSW) {
TAOS_CHECK_RETURN(setResSchemaInfo(&pRspObj->resInfo, pSW->pSchema, pSW->nCols, NULL, false));
}

void* pRetrieve = taosArrayGetP(data->blockData, pRspObj->resIter);
void* rawData = NULL;
int64_t rows = 0;
Expand Down
79 changes: 20 additions & 59 deletions source/common/src/msg/tmsg.c
Original file line number Diff line number Diff line change
Expand Up @@ -13297,38 +13297,6 @@ int32_t tDecodeMqVgOffset(SDecoder *pDecoder, SMqVgOffset *pOffset) {
return 0;
}

int32_t tEncodeSTqCheckInfo(SEncoder *pEncoder, const STqCheckInfo *pInfo) {
TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, pInfo->topic));
TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pInfo->ntbUid));
int32_t sz = taosArrayGetSize(pInfo->colIdList);
TAOS_CHECK_RETURN(tEncodeI32(pEncoder, sz));
for (int32_t i = 0; i < sz; i++) {
int16_t colId = *(int16_t *)taosArrayGet(pInfo->colIdList, i);
TAOS_CHECK_RETURN(tEncodeI16(pEncoder, colId));
}
return pEncoder->pos;
}

int32_t tDecodeSTqCheckInfo(SDecoder *pDecoder, STqCheckInfo *pInfo) {
TAOS_CHECK_RETURN(tDecodeCStrTo(pDecoder, pInfo->topic));
TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pInfo->ntbUid));
int32_t sz = 0;
TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &sz));
pInfo->colIdList = taosArrayInit(sz, sizeof(int16_t));
if (pInfo->colIdList == NULL) {
TAOS_CHECK_RETURN(terrno);
}
for (int32_t i = 0; i < sz; i++) {
int16_t colId = 0;
TAOS_CHECK_RETURN(tDecodeI16(pDecoder, &colId));
if (taosArrayPush(pInfo->colIdList, &colId) == NULL) {
TAOS_CHECK_RETURN(terrno);
}
}
return 0;
}
void tDeleteSTqCheckInfo(STqCheckInfo *pInfo) { taosArrayDestroy(pInfo->colIdList); }

int32_t tEncodeSMqRebVgReq(SEncoder *pCoder, const SMqRebVgReq *pReq) {
TAOS_CHECK_RETURN(tStartEncode(pCoder));
TAOS_CHECK_RETURN(tEncodeI64(pCoder, pReq->leftForVer));
Expand All @@ -13341,6 +13309,7 @@ int32_t tEncodeSMqRebVgReq(SEncoder *pCoder, const SMqRebVgReq *pReq) {

if (pReq->subType == TOPIC_SUB_TYPE__COLUMN) {
TAOS_CHECK_RETURN(tEncodeCStr(pCoder, pReq->qmsg));
TAOS_CHECK_RETURN(tEncodeSSchemaWrapper(pCoder, &pReq->schema));
} else if (pReq->subType == TOPIC_SUB_TYPE__TABLE) {
TAOS_CHECK_RETURN(tEncodeI64(pCoder, pReq->suid));
TAOS_CHECK_RETURN(tEncodeCStr(pCoder, pReq->qmsg));
Expand All @@ -13366,6 +13335,7 @@ int32_t tDecodeSMqRebVgReq(SDecoder *pCoder, SMqRebVgReq *pReq) {

if (pReq->subType == TOPIC_SUB_TYPE__COLUMN) {
TAOS_CHECK_EXIT(tDecodeCStr(pCoder, &pReq->qmsg));
TAOS_CHECK_EXIT(tDecodeSSchemaWrapper(pCoder, &pReq->schema));
} else if (pReq->subType == TOPIC_SUB_TYPE__TABLE) {
TAOS_CHECK_EXIT(tDecodeI64(pCoder, &pReq->suid));
if (!tDecodeIsEnd(pCoder)) {
Expand Down Expand Up @@ -13461,16 +13431,14 @@ int32_t tEncodeMqDataRspCommon(SEncoder *pEncoder, const SMqDataRsp *pRsp) {
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->blockNum));
if (pRsp->blockNum != 0) {
TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pRsp->withTbName));
TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pRsp->withSchema));
TAOS_CHECK_EXIT(tEncodeI8(pEncoder, 1));

for (int32_t i = 0; i < pRsp->blockNum; i++) {
int32_t bLen = *(int32_t *)taosArrayGet(pRsp->blockDataLen, i);
void *data = taosArrayGetP(pRsp->blockData, i);
TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, (const uint8_t *)data, bLen));
if (pRsp->withSchema) {
SSchemaWrapper *pSW = (SSchemaWrapper *)taosArrayGetP(pRsp->blockSchema, i);
TAOS_CHECK_EXIT(tEncodeSSchemaWrapper(pEncoder, pSW));
}
SSchemaWrapper *pSW = (SSchemaWrapper *)taosArrayGetP(pRsp->blockSchema, i);
TAOS_CHECK_EXIT(tEncodeSSchemaWrapper(pEncoder, pSW));
if (pRsp->withTbName) {
char *tbName = (char *)taosArrayGetP(pRsp->blockTbName, i);
TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, tbName));
Expand Down Expand Up @@ -13505,16 +13473,15 @@ int32_t tDecodeMqDataRspCommon(SDecoder *pDecoder, SMqDataRsp *pRsp) {
TAOS_CHECK_EXIT(terrno);
}
TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pRsp->withTbName));
TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pRsp->withSchema));
int8_t dummy;
TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &dummy));
if (pRsp->withTbName) {
if ((pRsp->blockTbName = taosArrayInit(pRsp->blockNum, sizeof(void *))) == NULL) {
TAOS_CHECK_EXIT(terrno);
}
}
if (pRsp->withSchema) {
if ((pRsp->blockSchema = taosArrayInit(pRsp->blockNum, sizeof(void *))) == NULL) {
TAOS_CHECK_EXIT(terrno);
}
if ((pRsp->blockSchema = taosArrayInit(pRsp->blockNum, sizeof(void *))) == NULL) {
TAOS_CHECK_EXIT(terrno);
}

for (int32_t i = 0; i < pRsp->blockNum; i++) {
Expand All @@ -13531,21 +13498,19 @@ int32_t tDecodeMqDataRspCommon(SDecoder *pDecoder, SMqDataRsp *pRsp) {
TAOS_CHECK_EXIT(terrno);
}

if (pRsp->withSchema) {
SSchemaWrapper *pSW = (SSchemaWrapper *)taosMemoryCalloc(1, sizeof(SSchemaWrapper));
if (pSW == NULL) {
TAOS_CHECK_EXIT(terrno);
}
SSchemaWrapper *pSW = (SSchemaWrapper *)taosMemoryCalloc(1, sizeof(SSchemaWrapper));
if (pSW == NULL) {
TAOS_CHECK_EXIT(terrno);
}

if ((code = tDecodeSSchemaWrapper(pDecoder, pSW))) {
taosMemoryFree(pSW);
goto _exit;
}
if ((code = tDecodeSSchemaWrapper(pDecoder, pSW))) {
taosMemoryFree(pSW);
goto _exit;
}

if (taosArrayPush(pRsp->blockSchema, &pSW) == NULL) {
taosMemoryFree(pSW);
TAOS_CHECK_EXIT(terrno);
}
if (taosArrayPush(pRsp->blockSchema, &pSW) == NULL) {
taosMemoryFree(pSW);
TAOS_CHECK_EXIT(terrno);
}

if (pRsp->withTbName) {
Expand Down Expand Up @@ -14212,7 +14177,6 @@ int32_t tEncodeMqSubTopicEp(void **buf, const SMqSubTopicEp *pTopicEp) {
SMqSubVgEp *pVgEp = (SMqSubVgEp *)taosArrayGet(pTopicEp->vgs, i);
tlen += tEncodeSMqSubVgEp(buf, pVgEp);
}
tlen += taosEncodeSSchemaWrapper(buf, &pTopicEp->schema);
return tlen;
}

Expand All @@ -14234,13 +14198,10 @@ void *tDecodeMqSubTopicEp(void *buf, SMqSubTopicEp *pTopicEp) {
return NULL;
}
}
buf = taosDecodeSSchemaWrapper(buf, &pTopicEp->schema);
return buf;
}

void tDeleteMqSubTopicEp(SMqSubTopicEp *pSubTopicEp) {
taosMemoryFreeClear(pSubTopicEp->schema.pSchema);
pSubTopicEp->schema.nCols = 0;
taosArrayDestroy(pSubTopicEp->vgs);
}

Expand Down
2 changes: 0 additions & 2 deletions source/common/src/systable.c
Original file line number Diff line number Diff line change
Expand Up @@ -367,9 +367,7 @@ static const SSysDbTableSchema topicSchema[] = {
{.name = "db_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false},
{.name = "sql", .bytes = TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
{.name = "schema", .bytes = TSDB_MAX_BINARY_LEN, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
{.name = "meta", .bytes = 4 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
{.name = "type", .bytes = 8 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
};

static const SSysDbTableSchema subscriptionSchema[] = {
Expand Down
7 changes: 0 additions & 7 deletions source/dnode/mnode/impl/inc/mndDef.h
Original file line number Diff line number Diff line change
Expand Up @@ -741,17 +741,11 @@ typedef struct {
int8_t withMeta; // TODO
SRWLatch lock;
int32_t sqlLen;
int32_t astLen;
char* sql;
char* ast;
char* physicalPlan;
SSchemaWrapper schema;
int64_t stbUid;
char stbName[TSDB_TABLE_FNAME_LEN];
// forbid condition
int64_t ntbUid;
SArray* ntbColIds;
int64_t ctbStbUid;
} SMqTopicObj;

typedef struct {
Expand Down Expand Up @@ -826,7 +820,6 @@ typedef struct {
SArray* unassignedVgs; // SArray<SMqVgEp>
SArray* offsetRows;
char dbName[TSDB_DB_FNAME_LEN];
char* qmsg; // SubPlanToString
} SMqSubscribeObj;

int32_t tNewSubscribeObj(const char* key, SMqSubscribeObj** ppSub);
Expand Down
1 change: 0 additions & 1 deletion source/dnode/mnode/impl/inc/mndStb.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ void mndExtractTbNameFromStbFullName(const char *stbFullName, char *dst, int32_t
const char *mndGetStbStr(const char *src);

int32_t mndAllocStbSchemas(const SStbObj *pOld, SStbObj *pNew);
int32_t mndCheckColAndTagModifiable(SMnode *pMnode, const char *stbFullName, int64_t suid, col_id_t colId, bool isTag);
void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, int32_t *pContLen, void *alterOriData,
int32_t alterOriDataLen);
int32_t mndSetForceDropCreateStbRedoActions(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, SStbObj *pStb);
Expand Down
Loading
Loading