From 9f24e1facfd658d3257b8337072f8ba984d8c2c2 Mon Sep 17 00:00:00 2001 From: changqi1 Date: Wed, 7 Feb 2024 11:31:08 +0800 Subject: [PATCH 01/19] init pp on pp=2,tp=1. --- CMakeLists.txt | 8 ++--- src/comm_helper/comm_helper.cpp | 56 ++++++++++++++++++++++++++------- src/layers/attention.h | 29 ++++++++++------- src/models/CMakeLists.txt | 5 +++ src/models/common_decoder.h | 55 +++++++++++++++++++++++++++----- src/models/models.cpp | 3 +- src/searchers/greedy_search.cpp | 16 ++++++++-- src/utils/messenger.h | 11 +++++-- src/utils/verbose.h | 30 +++++++++++++++--- 9 files changed, 166 insertions(+), 47 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 4e8ce36c..4b008ad8 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -55,7 +55,7 @@ if(GCC_VERSION VERSION_GREATER_EQUAL "10.1") message(FATAL_ERROR "Error: Your compiler has issues to support avx512bf16.\n We recommend GCC version >= 12.3.0") endif() endif() - +set(CMAKE_BUILD_TYPE Debug) if(CMAKE_BUILD_TYPE MATCHES "Debug") message(STATUS "Notice: Using Debug mode.") set(CMAKE_C_FLAGS "-O0 -g") @@ -143,9 +143,9 @@ add_definitions(-DAVX512_FP16_WEIGHT_ONLY_INT4=true) add_definitions(-DAVX512_FP32_WEIGHT_ONLY_NF4=true) # add_definitions(-DAVX512_FP16_WEIGHT_ONLY_NF4=true) -# add_definitions(-DDEBUG=true) -# add_definitions(-DSTEP_BY_STEP_ATTN=true) -add_definitions(-DUSE_SHM=true) +add_definitions(-DDEBUG=true) +add_definitions(-DSTEP_BY_STEP_ATTN=true) +# add_definitions(-DUSE_SHM=true) option(XFT_BUILD_TESTS "Build xfastertransformer unit tests" OFF) # timeline event diff --git a/src/comm_helper/comm_helper.cpp b/src/comm_helper/comm_helper.cpp index 4591e014..5bb88e0d 100644 --- a/src/comm_helper/comm_helper.cpp +++ b/src/comm_helper/comm_helper.cpp @@ -18,30 +18,57 @@ static ccl::communicator *pcomm; -extern "C" int init(int *rank, int *size) { +extern "C" int init(int *world_size, int *world_rank, int *world_color) { ccl::init(); MPI_Init(NULL, NULL); - MPI_Comm_size(MPI_COMM_WORLD, size); - MPI_Comm_rank(MPI_COMM_WORLD, rank); - + MPI_Comm_size(MPI_COMM_WORLD, world_size); + MPI_Comm_rank(MPI_COMM_WORLD, world_rank); +printf("world_size: %d, world_rank: %d, world_color: %d\n", *world_size, *world_rank, *world_color); +fflush(stdout); + + // 1) rank = 0, 1, 2, 3, 4, 5, 6, 7; pp = 2; tp = 4 + // color = 0, 0, 0, 0, 1, 1, 1, 1 + // 2) rank = 0, 1, 2, 3, 4, 5, 6, 7; pp = 4; tp = 2 + // color = 0, 0, 1, 1, 2, 2, 3, 3 + // 3) rank = 0, 1, 2, 3; pp = 1; tp = 4 + // color = 0, 0, 0, 0 + // 4) rank = 0, 1, 2, 3; pp = 2; tp = 2 + // color = 0, 0, 1, 1 + // 5) rank = 0, 1, 2, 3; pp = 4; tp = 1 + // color = 0, 1, 2, 3 + // 7) rank = 0, 1; pp = 1; tp = 2 + // color = 0, 0 + // 8) rank = 0, 1; pp = 2; tp = 1 + // color = 0, 1 + // world_color = world_rank / tp_num = world_rank / (world_size / pp_num) + *world_color = *world_rank / (*world_size / *world_color); + MPI_Comm row_comm; + MPI_Comm_split(MPI_COMM_WORLD, *world_color, *world_rank, &row_comm); + + int row_size, row_rank; + MPI_Comm_size(row_comm, &row_size); + MPI_Comm_rank(row_comm, &row_rank); +printf("row_size: %d, row_rank: %d, world_color: %d\n", row_size, row_rank, *world_color); +fflush(stdout); ccl::shared_ptr_class kvs; ccl::kvs::address_type mainAddr; - if (*rank == 0) { + if (row_rank == 0) { kvs = ccl::create_main_kvs(); mainAddr = kvs->get_address(); - MPI_Bcast((void *)mainAddr.data(), mainAddr.size(), MPI_BYTE, 0, MPI_COMM_WORLD); + MPI_Bcast((void *)mainAddr.data(), mainAddr.size(), MPI_BYTE, 0, row_comm); } else { - MPI_Bcast((void *)mainAddr.data(), mainAddr.size(), MPI_BYTE, 0, MPI_COMM_WORLD); + MPI_Bcast((void *)mainAddr.data(), mainAddr.size(), MPI_BYTE, 0, row_comm); kvs = ccl::create_kvs(mainAddr); } - pcomm = new ccl::communicator(ccl::create_communicator(*size, *rank, kvs)); - - *rank = pcomm->rank(); - *size = pcomm->size(); + pcomm = new ccl::communicator(ccl::create_communicator(row_size, row_rank, kvs)); + *world_size = pcomm->size(); + *world_rank = pcomm->rank(); +printf("ccl world_size: %d, world_rank: %d, world_color: %d\n", *world_size, *world_rank, *world_color); +fflush(stdout); #ifdef USE_SHM char myHostname[MPI_MAX_PROCESSOR_NAME]; char all_hostnames[MPI_MAX_PROCESSOR_NAME * MPI_MAX_PROCESSOR_NAME]; @@ -53,7 +80,7 @@ extern "C" int init(int *rank, int *size) { MPI_COMM_WORLD); int sameHostnames = 1; - for (int i = 1; i < *size; i++) { + for (int i = 1; i < *world_size; i++) { if (strcmp(myHostname, &all_hostnames[i * MPI_MAX_PROCESSOR_NAME]) != 0) { sameHostnames = 0; break; @@ -89,4 +116,9 @@ extern "C" void broadcast(int *buf, size_t count) { extern "C" void allgatherv( const float *sendBuf, size_t count, float *recvBuf, const std::vector &recvCounts) { ccl::allgatherv(sendBuf, count, recvBuf, recvCounts, *pcomm).wait(); +} + +extern "C" void barrier( + const float *sendBuf, size_t count, float *recvBuf, const std::vector &recvCounts) { + ccl::allgatherv(sendBuf, count, recvBuf, recvCounts, *pcomm).wait(); } \ No newline at end of file diff --git a/src/layers/attention.h b/src/layers/attention.h index e03d82c2..e46a706a 100644 --- a/src/layers/attention.h +++ b/src/layers/attention.h @@ -48,8 +48,10 @@ class Attention { if (ctx->attHeadNum % ctx->kvHeadNum == 0) { // We are responsible for the range [startQHead, endQHead) auto range = getTaskRange(ctx->attHeadNum, ctx->numSplit, ctx->splitIdx); + printf("ctx->attHeadNum: %d, ctx->numSplit: %d, ctx->splitIdx: %d\n", ctx->attHeadNum, ctx->numSplit, ctx->splitIdx); this->startQHead = range.first; this->endQHead = range.second; + printf("this->startQHead: %d, this->endQHead: %d\n", this->startQHead, this->endQHead); int expandFactor = ctx->attHeadNum / ctx->kvHeadNum; this->startKVHead = startQHead / expandFactor; @@ -186,19 +188,19 @@ class Attention { hpj::Matrix inputBuffer(input, ctx->batchSize * inputSeqLen, hiddenSize, hiddenSize); hpj::Matrix imBuffer(imBuf, ctx->batchSize * inputSeqLen, hiddenSize, hiddenSize); hpj::Matrix outBuffer(output, ctx->batchSize * inputSeqLen, hiddenSize, hiddenSize); - - float epsilon = ctx->epsilon; - int headSize = ctx->attHeadSize; - int qkvRows = ctx->batchSize * inputSeqLen; - int qCols = (this->endQHead - this->startQHead) * headSize; - int kvCols = (this->endKVHead - this->startKVHead) * headSize; - int qkCols = qCols + kvCols; - int qkvCols = qkCols + kvCols; - +// printf("attention.forward 1\n"); fflush(stdout); + float epsilon = ctx->epsilon; // printf("af 1\n"); fflush(stdout); + int headSize = ctx->attHeadSize; // printf("af 2\n"); fflush(stdout); + int qkvRows = ctx->batchSize * inputSeqLen; // printf("af 3\n"); fflush(stdout); + int qCols = (this->endQHead - this->startQHead) * headSize; // printf("af 4\n"); fflush(stdout); + int kvCols = (this->endKVHead - this->startKVHead) * headSize; // printf("af 5\n"); fflush(stdout); + int qkCols = qCols + kvCols; // printf("af 6\n"); fflush(stdout); + int qkvCols = qkCols + kvCols; // printf("af 7\n"); fflush(stdout); +// printf("attention.forward 2\n"); fflush(stdout); int qkvStride = qkvCols; auto &qkvMatMul = ctx->qkvMatMul; hpj::Matrix qkvGroupMatMul((ImT *)qkvMatMul.Data(), qkvRows, qkvCols, qkvStride); - +// printf("attention.forward 3\n"); fflush(stdout); #ifdef DEBUG dbg.debugPrint("---- DecoderLayer.forward (useSelfAttn=%d) ----\n", useSelfAttn); dbg.debugPrint("input:\n"); @@ -273,6 +275,7 @@ class Attention { imBuffer.Assign(inputBuffer.Data(), inputBuffer.Rows(), inputBuffer.Cols(), inputBuffer.Stride()); inputBuffer.Assign(tmp, rows, cols, stride); } + // printf("attention.forward 4\n"); fflush(stdout); // TODO: refine the logic (and support large inputSeqLen when pastSeqLen > 0) if constexpr (std::is_same_v && std::is_same_v) { if (pastSeqLen == 0) { @@ -284,8 +287,10 @@ class Attention { if (ctx->inputSeqLen >= 1024 && pastSeqLen == 0) flashAttention( ctx, qkvGroupMatMul, outBuffer, imBuffer, presentKey, presentValue, attnMask, pastSeqLen); - else + else { + // printf("attention.forward 5\n"); fflush(stdout); fusedAttention(ctx, query, key, value, imBuffer, presentKey, presentValue, attnMask, pastSeqLen); + } } t4.release(); @@ -375,7 +380,7 @@ class Attention { // to make sure it works better (the logic here is trying to make sure each head of BMM result [seq * seq] in cache) // WARN: reserve field in context is used to make it effective for all layers, do not change it in other places int &mBlockSize = ctx->reserved1; - if (layerId == 0) { + if (layerId == 0 || layerId == 12) { // TODO: if pastSeqLen > 0 and inputSeqLen large. if (pastSeqLen == 0) { const int l2CacheSize = 2 * 1024 * 1024; // TODO: get it dynamically diff --git a/src/models/CMakeLists.txt b/src/models/CMakeLists.txt index 18f4fe52..7ee1119d 100644 --- a/src/models/CMakeLists.txt +++ b/src/models/CMakeLists.txt @@ -14,7 +14,12 @@ # ============================================================================ cmake_minimum_required(VERSION 3.15.1) +find_package(MPI REQUIRED) +include_directories(${MPI_INCLUDE_PATH}) +add_definitions(${MPI_CXX_COMPILE_FLAGS}) + aux_source_directory(${CMAKE_CURRENT_SOURCE_DIR} MODEL_SRCS) add_library(models OBJECT ${MODEL_SRCS}) add_dependencies(models utils) +target_link_libraries(models ${MPI_CXX_LIBRARIES}) \ No newline at end of file diff --git a/src/models/common_decoder.h b/src/models/common_decoder.h index 971dfc59..919f2af5 100644 --- a/src/models/common_decoder.h +++ b/src/models/common_decoder.h @@ -18,6 +18,8 @@ #include #include #include +#include +#include #include "INIReader.h" #include "abstract_decoder.h" @@ -134,7 +136,14 @@ class CommonDecoder : public AbstractDecoder { vocabSize, embeddingSize, maxPositions, maxPosEmbed, maxSeqLength, ropeParamsPtr); // Decoder - for (int i = 0; i < layers; ++i) { + if (layers % pp_stages_num != 0) { + std::cerr << "Warning: layers cannot be evenly divided by pp_stage." << std::endl; + } + + int layers_per_stage = layers / pp_stages_num; + int start_layer = pp_stage_idx * layers_per_stage; + printf("new layers: %d, layers_per_stage: %d, start_layer: %d\n", layers, layers_per_stage, start_layer); + for (int i = start_layer; i < start_layer + layers_per_stage; ++i) { auto pdec = new DECODER(ctx, i); this->setDecoderWeights(pdec, modelPath, i); this->decoders.push_back(pdec); @@ -143,6 +152,8 @@ class CommonDecoder : public AbstractDecoder { // Predictor int workers = messenger.getSize(); int rank = messenger.getRank(); + int color = messenger.getColor(); + printf("workers: %d, rank: %d, color: %d\n", workers, rank, color); this->predictor = new DistLinear(hiddenSize, vocabSize, rank, workers); this->setPredictorWeight(modelPath); @@ -215,9 +226,9 @@ class CommonDecoder : public AbstractDecoder { dbg.debugPrint("---- embedding.forward ----\n"); dbg.debugPrint("ids:\n"); dbg.dumpMatrix(ids, batchSize, inputSeqLen, inputSeqLen); - dbg.debugPrint("embBuf(rows: %d, cols: %d, stride: %d):\n", this->embBuf->Rows(), this->embBuf->Cols(), - this->embBuf->Stride()); - dbg.dumpMatrix(*this->embBuf); + dbg.debugPrint("embBuf(rows: %d, cols: %d, stride: %d):\n", batchSize * inputSeqLen, ctx->hiddenSize, + ctx->hiddenSize); + dbg.dumpMatrix(embBuf, batchSize * inputSeqLen, ctx->hiddenSize, ctx->hiddenSize); #endif // Prepare attention mask @@ -227,9 +238,17 @@ class CommonDecoder : public AbstractDecoder { int *positionIds = this->getPositionIds(ids, batchSize, inputSeqLen, step + this->prefixSharing); t1.release(); + if (pp_stage_idx > 0) { + MPI_Recv(embBuf, batchSize * inputSeqLen * ctx->hiddenSize, MPI_FLOAT, 0, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + printf("wait... gather : pp_stage_idx %d\n", pp_stage_idx); + } + // Decoder: forward int hiddenSize = ctx->hiddenSize; - for (int i = 0; i < this->decoders.size(); ++i) { + int layers_per_stage = this->decoders.size(); + int start_layer = pp_stage_idx * layers_per_stage; + printf("forward layers_per_stage: %d, start_layer: %d\n", layers_per_stage, start_layer); + for (int i = 0; i < layers_per_stage; ++i) { int workers = this->messenger.getSize(); if (step == 0 && this->prefixSharing) { // Expand the prefix KV cache for each batch @@ -237,9 +256,12 @@ class CommonDecoder : public AbstractDecoder { } KVCacheTensor &presentKey = this->kvCacheMgr->getKey(i); KVCacheTensor &presentValue = this->kvCacheMgr->getValue(i); - +// printf("attention start 1 forward layers_per_stage: %d, start_layer: %d\n", layers_per_stage, start_layer); +fflush(stdout); // Pls be noted: in attention, 'outBuf' is used as imtermediate buffer, 'tmpBuf' is used as output AttnOutT *attnOut = (AttnOutT *)(this->getContext()->tmpBuf.Data()); +// printf("attention start 2 forward layers_per_stage: %d, start_layer: %d\n", layers_per_stage, start_layer); +fflush(stdout); this->decoders[i]->forwardAttention(getContext(), embBuf, outBuf, attnOut, attnMask, presentKey, // presentKey, presentValue, // presentValue, @@ -251,7 +273,7 @@ class CommonDecoder : public AbstractDecoder { // Expand the KV cache as it only has values for beam 0 if (step == 0 && beamSize > 1) { this->kvCacheMgr->expandCache(i, userSideBS, beamSize, seqLen); } - +// printf("attention end forward layers_per_stage: %d, start_layer: %d\n", layers_per_stage, start_layer); // Merge the result of attention // When attention and FFN/MLP are in parallel, do not need to reduce after attention if constexpr (!ATTN_MLP_PARALLEL) { @@ -259,7 +281,7 @@ class CommonDecoder : public AbstractDecoder { this->messenger.reduceAdd(attnOut, attnOut, batchSize * inputSeqLen * hiddenSize); } } - +// printf("attention reduceadd end forward layers_per_stage: %d, start_layer: %d\n", layers_per_stage, start_layer); // When attention and FFN/MLP are in parallel, use the initial embedding as input if constexpr (ATTN_MLP_PARALLEL) { if (this->messenger.getSize() > 1) { @@ -279,6 +301,15 @@ class CommonDecoder : public AbstractDecoder { } } + MPI_Send(embBuf, batchSize * inputSeqLen * ctx->hiddenSize, MPI_FLOAT, 1, 0, MPI_COMM_WORLD); + + printf("end forward layers_per_stage: %d, start_layer: %d\n", layers_per_stage, start_layer); + + if (pp_stage_idx == 0) { + printf("finish : pp_stage_idx %d\n", pp_stage_idx); + return std::tuple(nullptr, 0, 0); + } + // Prepare input for final Layer Norm (only care about the last row of the result) // Shape of embBuf: (bs, seqLen, hiddenSize) MlpOutT *lnIn = embBuf; @@ -469,8 +500,12 @@ class CommonDecoder : public AbstractDecoder { DecoderContext *getDecoderContext(int layers, const int hiddenSize, const int attHeadNum, const int kvHeadNum, const int imSize, const std::string &act, const float epsilon, int vocabSize, int embeddingSize, int maxPositions, int maxPosEmbed, int maxSeqLength, RopeParams *ropeParamsPtr) { + pp_stages_num = Env::getPipeline(); int splits = messenger.getSize(); int splitIdx = messenger.getRank(); + pp_stage_idx = messenger.getColor(); + tp_rank = splitIdx; + printf("pp_stages_num: %d, pp_stage_idx: %d, tp_rank: %d\n", pp_stages_num, pp_stage_idx, tp_rank); if (context != nullptr) { if (context->hiddenSize == hiddenSize && context->attHeadNum == attHeadNum @@ -719,6 +754,10 @@ class CommonDecoder : public AbstractDecoder { int startId; int endId; + int pp_stages_num = 1; // pipeline_parallel_stages_num + int pp_stage_idx = 0; // pipeline_parallel_stage + int tp_rank = 0; // tensor_parallel_rank + WDataType wType; #ifdef DEBUG diff --git a/src/models/models.cpp b/src/models/models.cpp index 989984df..97585615 100644 --- a/src/models/models.cpp +++ b/src/models/models.cpp @@ -50,7 +50,8 @@ GenerationMode getGenerationMode(SearcherConfig &config_) { } Model::Model() : decoder(nullptr), searcher(nullptr), isNewInput(true) { - Env::initValue(); + Env::initVerbose(); + Env::initPipeline(); TimeLine::init(); } diff --git a/src/searchers/greedy_search.cpp b/src/searchers/greedy_search.cpp index 672ba225..ac2f5f63 100644 --- a/src/searchers/greedy_search.cpp +++ b/src/searchers/greedy_search.cpp @@ -46,7 +46,13 @@ std::vector GreedySearch::getNextToken(int *ids, int batchSize, int seqLen) std::tuple result = decoder.forward(ids, dims, this->step++); - this->nextTokens = search(result); + if (std::get<0>(result) == nullptr) { + this->nextTokens = std::vector(batchSize, 0); + MPI_Recv(this->nextTokens.data(), batchSize, MPI_INT32_T, 1, 1, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + } else { + this->nextTokens = search(result); + MPI_Send(this->nextTokens.data(), batchSize, MPI_INT32_T, 0, 1, MPI_COMM_WORLD); + } this->curLen++; for (int batchId = 0; batchId < batchSize; ++batchId) { @@ -62,7 +68,13 @@ std::vector GreedySearch::getNextToken() { int64_t dims[3] = {batchSize, 1, 1}; std::tuple result = decoder.forward(nextTokens.data(), dims, this->step++); - this->nextTokens = search(result); + if (std::get<0>(result) == nullptr) { + this->nextTokens = std::vector(batchSize, 0); + MPI_Recv(this->nextTokens.data(), batchSize, MPI_INT32_T, 1, 1, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + } else { + this->nextTokens = search(result); + MPI_Send(this->nextTokens.data(), batchSize, MPI_INT32_T, 0, 1, MPI_COMM_WORLD); + } this->curLen++; for (int batchId = 0; batchId < batchSize; ++batchId) { diff --git a/src/utils/messenger.h b/src/utils/messenger.h index c5565476..e614a19d 100644 --- a/src/utils/messenger.h +++ b/src/utils/messenger.h @@ -24,6 +24,7 @@ #include "oneapi/ccl.hpp" #include "shm_reduction.h" #include "timeline.h" +#include "verbose.h" class Messenger { private: @@ -46,7 +47,7 @@ class Messenger { exit(-1); } - helperInit = (int (*)(int *, int *))dlsym(commHelperHanlde, "init"); + helperInit = (int (*)(int *, int *, int *))dlsym(commHelperHanlde, "init"); helperFreePCOMM = (void (*)())dlsym(commHelperHanlde, "freePCOMM"); helperAllreduce = (void (*)(float *, float *, size_t))dlsym(commHelperHanlde, "allreduce"); helperAllreduceBF16 = (void (*)(bfloat16_t *, bfloat16_t *, size_t))dlsym(commHelperHanlde, "allreduceBF16"); @@ -56,7 +57,8 @@ class Messenger { atexit(Messenger::mpi_finalize); - int sameHostnames = (*helperInit)(&rank, &size); + color = Env::getPipeline(); + int sameHostnames = (*helperInit)(&size, &rank, &color); #ifdef USE_SHM if (sameHostnames && !std::getenv("XFT_ONECCL")) { @@ -88,6 +90,8 @@ class Messenger { int getSize() { return size; } + int getColor() { return color; } + // From some example code of oneCCL, inplace reducing is supported // Only float is used now void reduceAdd(float *sendBuf, float *recvBuf, size_t count) { @@ -176,13 +180,14 @@ class Messenger { private: int size; int rank; + int color; bool localRanksFlag; #ifdef USE_SHM ShmReduction *pshm; #endif void *commHelperHanlde; - int (*helperInit)(int *, int *); + int (*helperInit)(int *, int *, int *); void (*helperFreePCOMM)(); void (*helperAllreduce)(float *, float *, size_t); void (*helperAllreduceBF16)(bfloat16_t *, bfloat16_t *, size_t); diff --git a/src/utils/verbose.h b/src/utils/verbose.h index 3f29c118..c31bdac8 100644 --- a/src/utils/verbose.h +++ b/src/utils/verbose.h @@ -46,19 +46,19 @@ class Printer { class Env { private: - static int &verbose_value() { + static int &verboseValue() { static int value = 0; return value; } public: - static void initValue() { + static void initVerbose() { char *xft_verbose_value = getenv("XFT_VERBOSE"); if (xft_verbose_value != NULL) { int value = atoi(xft_verbose_value); - verbose_value() = value; + verboseValue() = value; } else { - verbose_value() = 0; + verboseValue() = 0; } // TODO: Move XFT_FAKE_MODEL here. @@ -67,7 +67,27 @@ class Env { } } - static int getVerbose() { return verbose_value(); } + static int getVerbose() { return verboseValue(); } + +// Pipeline Parallel +private: + static int &pipelineValue() { + static int value = 0; + return value; + } + +public: + static void initPipeline() { + char *xft_pipeline_value = getenv("XFT_PIPELINE_STAGES"); + if (xft_pipeline_value != NULL) { + int value = atoi(xft_pipeline_value); + pipelineValue() = value; + } else { + pipelineValue() = 1; + } + } + + static int getPipeline() { return pipelineValue(); } }; #define GEMMVERBOSE(api_func, compute_func) \ From c9993a09729b0b0f764811f88b11fd974c937c5e Mon Sep 17 00:00:00 2001 From: changqi1 Date: Wed, 7 Feb 2024 15:58:42 +0800 Subject: [PATCH 02/19] optimize dispatch. --- include/abstract_decoder.h | 4 + src/comm_helper/comm_helper.cpp | 39 +++------ src/layers/attention.h | 22 +++-- src/models/common_decoder.h | 142 ++++++++++++++++++++++++-------- src/models/hybrid_model.h | 4 + src/searchers/greedy_search.cpp | 9 +- 6 files changed, 144 insertions(+), 76 deletions(-) diff --git a/include/abstract_decoder.h b/include/abstract_decoder.h index 571e0bf7..1c2928f1 100644 --- a/include/abstract_decoder.h +++ b/include/abstract_decoder.h @@ -44,6 +44,10 @@ class AbstractDecoder { virtual int getRank() = 0; + virtual int getPPSize() = 0; + + virtual int getTPSize() = 0; + virtual int getEndId() = 0; virtual void setPrefix(int *ids, int seqLen) = 0; diff --git a/src/comm_helper/comm_helper.cpp b/src/comm_helper/comm_helper.cpp index 5bb88e0d..a3b6e660 100644 --- a/src/comm_helper/comm_helper.cpp +++ b/src/comm_helper/comm_helper.cpp @@ -18,30 +18,22 @@ static ccl::communicator *pcomm; +// world_color is initialized to pipeline_parallel_stages_num(pp_size) +// and will be re-assign to world_color of MPI extern "C" int init(int *world_size, int *world_rank, int *world_color) { ccl::init(); MPI_Init(NULL, NULL); MPI_Comm_size(MPI_COMM_WORLD, world_size); MPI_Comm_rank(MPI_COMM_WORLD, world_rank); -printf("world_size: %d, world_rank: %d, world_color: %d\n", *world_size, *world_rank, *world_color); -fflush(stdout); - - // 1) rank = 0, 1, 2, 3, 4, 5, 6, 7; pp = 2; tp = 4 - // color = 0, 0, 0, 0, 1, 1, 1, 1 - // 2) rank = 0, 1, 2, 3, 4, 5, 6, 7; pp = 4; tp = 2 - // color = 0, 0, 1, 1, 2, 2, 3, 3 - // 3) rank = 0, 1, 2, 3; pp = 1; tp = 4 - // color = 0, 0, 0, 0 - // 4) rank = 0, 1, 2, 3; pp = 2; tp = 2 - // color = 0, 0, 1, 1 - // 5) rank = 0, 1, 2, 3; pp = 4; tp = 1 - // color = 0, 1, 2, 3 - // 7) rank = 0, 1; pp = 1; tp = 2 - // color = 0, 0 - // 8) rank = 0, 1; pp = 2; tp = 1 - // color = 0, 1 - // world_color = world_rank / tp_num = world_rank / (world_size / pp_num) + + // world_color = world_rank / tp_size = world_rank / (world_size / pp_size) + // like: world_rank = 0, 1, -> row_rank = 0, 1; + // 2, 3, 0, 1; + // 4, 5, 0, 1; + // 6, 7; 0, 1; + // pp = 4; tp = 2 + // color = 0, 0, 1, 1, 2, 2, 3, 3 *world_color = *world_rank / (*world_size / *world_color); MPI_Comm row_comm; MPI_Comm_split(MPI_COMM_WORLD, *world_color, *world_rank, &row_comm); @@ -49,8 +41,7 @@ fflush(stdout); int row_size, row_rank; MPI_Comm_size(row_comm, &row_size); MPI_Comm_rank(row_comm, &row_rank); -printf("row_size: %d, row_rank: %d, world_color: %d\n", row_size, row_rank, *world_color); -fflush(stdout); + ccl::shared_ptr_class kvs; ccl::kvs::address_type mainAddr; @@ -67,8 +58,7 @@ fflush(stdout); *world_size = pcomm->size(); *world_rank = pcomm->rank(); -printf("ccl world_size: %d, world_rank: %d, world_color: %d\n", *world_size, *world_rank, *world_color); -fflush(stdout); + #ifdef USE_SHM char myHostname[MPI_MAX_PROCESSOR_NAME]; char all_hostnames[MPI_MAX_PROCESSOR_NAME * MPI_MAX_PROCESSOR_NAME]; @@ -116,9 +106,4 @@ extern "C" void broadcast(int *buf, size_t count) { extern "C" void allgatherv( const float *sendBuf, size_t count, float *recvBuf, const std::vector &recvCounts) { ccl::allgatherv(sendBuf, count, recvBuf, recvCounts, *pcomm).wait(); -} - -extern "C" void barrier( - const float *sendBuf, size_t count, float *recvBuf, const std::vector &recvCounts) { - ccl::allgatherv(sendBuf, count, recvBuf, recvCounts, *pcomm).wait(); } \ No newline at end of file diff --git a/src/layers/attention.h b/src/layers/attention.h index e46a706a..414fae24 100644 --- a/src/layers/attention.h +++ b/src/layers/attention.h @@ -48,10 +48,8 @@ class Attention { if (ctx->attHeadNum % ctx->kvHeadNum == 0) { // We are responsible for the range [startQHead, endQHead) auto range = getTaskRange(ctx->attHeadNum, ctx->numSplit, ctx->splitIdx); - printf("ctx->attHeadNum: %d, ctx->numSplit: %d, ctx->splitIdx: %d\n", ctx->attHeadNum, ctx->numSplit, ctx->splitIdx); this->startQHead = range.first; this->endQHead = range.second; - printf("this->startQHead: %d, this->endQHead: %d\n", this->startQHead, this->endQHead); int expandFactor = ctx->attHeadNum / ctx->kvHeadNum; this->startKVHead = startQHead / expandFactor; @@ -188,19 +186,19 @@ class Attention { hpj::Matrix inputBuffer(input, ctx->batchSize * inputSeqLen, hiddenSize, hiddenSize); hpj::Matrix imBuffer(imBuf, ctx->batchSize * inputSeqLen, hiddenSize, hiddenSize); hpj::Matrix outBuffer(output, ctx->batchSize * inputSeqLen, hiddenSize, hiddenSize); -// printf("attention.forward 1\n"); fflush(stdout); - float epsilon = ctx->epsilon; // printf("af 1\n"); fflush(stdout); - int headSize = ctx->attHeadSize; // printf("af 2\n"); fflush(stdout); - int qkvRows = ctx->batchSize * inputSeqLen; // printf("af 3\n"); fflush(stdout); - int qCols = (this->endQHead - this->startQHead) * headSize; // printf("af 4\n"); fflush(stdout); - int kvCols = (this->endKVHead - this->startKVHead) * headSize; // printf("af 5\n"); fflush(stdout); - int qkCols = qCols + kvCols; // printf("af 6\n"); fflush(stdout); - int qkvCols = qkCols + kvCols; // printf("af 7\n"); fflush(stdout); -// printf("attention.forward 2\n"); fflush(stdout); + + float epsilon = ctx->epsilon; + int headSize = ctx->attHeadSize; + int qkvRows = ctx->batchSize * inputSeqLen; + int qCols = (this->endQHead - this->startQHead) * headSize; + int kvCols = (this->endKVHead - this->startKVHead) * headSize; + int qkCols = qCols + kvCols; + int qkvCols = qkCols + kvCols; + int qkvStride = qkvCols; auto &qkvMatMul = ctx->qkvMatMul; hpj::Matrix qkvGroupMatMul((ImT *)qkvMatMul.Data(), qkvRows, qkvCols, qkvStride); -// printf("attention.forward 3\n"); fflush(stdout); + #ifdef DEBUG dbg.debugPrint("---- DecoderLayer.forward (useSelfAttn=%d) ----\n", useSelfAttn); dbg.debugPrint("input:\n"); diff --git a/src/models/common_decoder.h b/src/models/common_decoder.h index 919f2af5..dffe89b6 100644 --- a/src/models/common_decoder.h +++ b/src/models/common_decoder.h @@ -65,6 +65,82 @@ struct MlpTypeExtractor> { using Tout = OutT; }; +/* +Pipeline parallel and tensor parallel introduction: + + 1) MPI_Instances = 16,XFT_PIPELINE_STAGES = 4 => pp_size = 4, tp_size = 4 + 2) TP sync by oneCCL(row_comm) or shared_memory + 3) PP sync by MPI MPI_COMM_WORLD + + World Rank: => Row Rank: => tp0 tp1 tp2 tp3 + [ 0, 1, 2, 3, [ 0, 1, 2, 3]; pp0 [ 0, 1, 2, 3]; + 4, 5, 6, 7, [ 0, 1, 2, 3]; pp1 [ 0, 1, 2, 3]; + 8, 9, 10, 11, [ 0, 1, 2, 3]; pp2 [ 0, 1, 2, 3]; + 12, 13, 14, 15]; [ 0, 1, 2, 3]; pp3 [ 0, 1, 2, 3]; + + Prompts + │ + ▼ + Embedding(PP0) ◄────────────────────────────────────────────────────────────────────┐ + │ │ + PP0 ▼ │ + ┌─────────────────────────────────────────────────────────────────────────────────┐ │ + │ TP0 TP1 TP2 TP3 │ │ + │ ┌────────────────┐ ┌────────────────┐ ┌────────────────┐ ┌────────────────┐ │ │ + │ │ OMP │ │ OMP │ │ OMP │ │ OMP │ │ │ + │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ + │ │ ▼ ▼ ▼ ▼ ▼ ▼ ...│ │ ▼ ▼ ▼ ▼ ▼ ▼ ...│ │ ▼ ▼ ▼ ▼ ▼ ▼ ...│ │ ▼ ▼ ▼ ▼ ▼ ▼ ...│ │ │ + │ └──┬─────────────┘ └─┬──────────────┘ └─┬──────────────┘ └─┬──────────────┘ ... │ │ + │ │◄────────────────┘◄─────────────────┘◄─────────────────┘ │ │ + │ ▼ layer0-7 │ │ + └────┬────────────────────────────────────────────────────────────────────────────┘ │ + │ │ + PP1 ▼ │ + ┌─────────────────────────────────────────────────────────────────────────────────┐ │ + │ TP0 TP1 TP2 TP3 │ │ + │ ┌────────────────┐ ┌────────────────┐ ┌────────────────┐ ┌────────────────┐ │ │ + │ │ OMP │ │ OMP │ │ OMP │ │ OMP │ │ │ + │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ + │ │ ▼ ▼ ▼ ▼ ▼ ▼ ...│ │ ▼ ▼ ▼ ▼ ▼ ▼ ...│ │ ▼ ▼ ▼ ▼ ▼ ▼ ...│ │ ▼ ▼ ▼ ▼ ▼ ▼ ...│ │ │ + │ └──┬─────────────┘ └─┬──────────────┘ └─┬──────────────┘ └─┬──────────────┘ ... │ │ + │ │◄────────────────┘◄─────────────────┘◄─────────────────┘ │ │ + │ ▼ layer8-15 │ │ + └────┬────────────────────────────────────────────────────────────────────────────┘ │ + │ │ + PP2 ▼ │ + ┌─────────────────────────────────────────────────────────────────────────────────┐ │ + │ TP0 TP1 TP2 TP3 │ │ + │ ┌────────────────┐ ┌────────────────┐ ┌────────────────┐ ┌────────────────┐ │ │ + │ │ OMP │ │ OMP │ │ OMP │ │ OMP │ │ │ + │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ + │ │ ▼ ▼ ▼ ▼ ▼ ▼ ...│ │ ▼ ▼ ▼ ▼ ▼ ▼ ...│ │ ▼ ▼ ▼ ▼ ▼ ▼ ...│ │ ▼ ▼ ▼ ▼ ▼ ▼ ...│ │ │ + │ └──┬─────────────┘ └─┬──────────────┘ └─┬──────────────┘ └─┬──────────────┘ ... │ │ + │ │◄────────────────┘◄─────────────────┘◄─────────────────┘ │ │ + │ ▼ layer16-23 │ │ + └────┬────────────────────────────────────────────────────────────────────────────┘ │ + │ │ + PP3 ▼ │ + ┌─────────────────────────────────────────────────────────────────────────────────┐ │ + │ TP0 TP1 TP2 TP3 │ │ + │ ┌────────────────┐ ┌────────────────┐ ┌────────────────┐ ┌────────────────┐ │ │ + │ │ OMP │ │ OMP │ │ OMP │ │ OMP │ │ │ + │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ + │ │ ▼ ▼ ▼ ▼ ▼ ▼ ...│ │ ▼ ▼ ▼ ▼ ▼ ▼ ...│ │ ▼ ▼ ▼ ▼ ▼ ▼ ...│ │ ▼ ▼ ▼ ▼ ▼ ▼ ...│ │ │ + │ └──┬─────────────┘ └─┬──────────────┘ └─┬──────────────┘ └─┬──────────────┘ ... │ │ + │ │◄────────────────┘◄─────────────────┘◄─────────────────┘ │ │ + │ ▼ layer24-31 │ │ + └────┬────────────────────────────────────────────────────────────────────────────┘ │ + ... ... + │ │ + ▼ │ + Predictor(PP3) │ + │ │ + ▼ │ + Searchers │ + │ │ + ▼ ───────────────────────────────────────────────────────────────────────────────┘ +*/ + // Template parameters: // ATTN_CLS - class for attention impl. // MLP_CLS - MLP implementation @@ -136,14 +212,13 @@ class CommonDecoder : public AbstractDecoder { vocabSize, embeddingSize, maxPositions, maxPosEmbed, maxSeqLength, ropeParamsPtr); // Decoder - if (layers % pp_stages_num != 0) { - std::cerr << "Warning: layers cannot be evenly divided by pp_stage." << std::endl; + if (layers % pp_size != 0) { + std::cerr << "Warning: layers cannot be evenly divided by pipeline parallel stage size(pp_size)." << std::endl; } - int layers_per_stage = layers / pp_stages_num; - int start_layer = pp_stage_idx * layers_per_stage; - printf("new layers: %d, layers_per_stage: %d, start_layer: %d\n", layers, layers_per_stage, start_layer); - for (int i = start_layer; i < start_layer + layers_per_stage; ++i) { + int layers_per_pp_stage = layers / pp_size; + int start_layer = pp_rank * layers_per_pp_stage; + for (int i = start_layer; i < start_layer + layers_per_pp_stage; ++i) { auto pdec = new DECODER(ctx, i); this->setDecoderWeights(pdec, modelPath, i); this->decoders.push_back(pdec); @@ -153,7 +228,6 @@ class CommonDecoder : public AbstractDecoder { int workers = messenger.getSize(); int rank = messenger.getRank(); int color = messenger.getColor(); - printf("workers: %d, rank: %d, color: %d\n", workers, rank, color); this->predictor = new DistLinear(hiddenSize, vocabSize, rank, workers); this->setPredictorWeight(modelPath); @@ -238,17 +312,16 @@ class CommonDecoder : public AbstractDecoder { int *positionIds = this->getPositionIds(ids, batchSize, inputSeqLen, step + this->prefixSharing); t1.release(); - if (pp_stage_idx > 0) { - MPI_Recv(embBuf, batchSize * inputSeqLen * ctx->hiddenSize, MPI_FLOAT, 0, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE); - printf("wait... gather : pp_stage_idx %d\n", pp_stage_idx); + // if current pipeline parallel stage rank isn't the first stage, should receive previous stage data + if (pp_rank > 0) { + // [MPI] Recv data from world_rank 0 + MPI_Recv(embBuf, batchSize * inputSeqLen * ctx->hiddenSize, MPI_FLOAT, pp_rank - 1, 100 * (pp_rank - 1), MPI_COMM_WORLD, MPI_STATUS_IGNORE); } // Decoder: forward int hiddenSize = ctx->hiddenSize; - int layers_per_stage = this->decoders.size(); - int start_layer = pp_stage_idx * layers_per_stage; - printf("forward layers_per_stage: %d, start_layer: %d\n", layers_per_stage, start_layer); - for (int i = 0; i < layers_per_stage; ++i) { + int layers_per_pp_stage = this->decoders.size(); + for (int i = 0; i < layers_per_pp_stage; ++i) { int workers = this->messenger.getSize(); if (step == 0 && this->prefixSharing) { // Expand the prefix KV cache for each batch @@ -256,12 +329,10 @@ class CommonDecoder : public AbstractDecoder { } KVCacheTensor &presentKey = this->kvCacheMgr->getKey(i); KVCacheTensor &presentValue = this->kvCacheMgr->getValue(i); -// printf("attention start 1 forward layers_per_stage: %d, start_layer: %d\n", layers_per_stage, start_layer); -fflush(stdout); + // Pls be noted: in attention, 'outBuf' is used as imtermediate buffer, 'tmpBuf' is used as output AttnOutT *attnOut = (AttnOutT *)(this->getContext()->tmpBuf.Data()); -// printf("attention start 2 forward layers_per_stage: %d, start_layer: %d\n", layers_per_stage, start_layer); -fflush(stdout); + this->decoders[i]->forwardAttention(getContext(), embBuf, outBuf, attnOut, attnMask, presentKey, // presentKey, presentValue, // presentValue, @@ -273,7 +344,7 @@ fflush(stdout); // Expand the KV cache as it only has values for beam 0 if (step == 0 && beamSize > 1) { this->kvCacheMgr->expandCache(i, userSideBS, beamSize, seqLen); } -// printf("attention end forward layers_per_stage: %d, start_layer: %d\n", layers_per_stage, start_layer); + // Merge the result of attention // When attention and FFN/MLP are in parallel, do not need to reduce after attention if constexpr (!ATTN_MLP_PARALLEL) { @@ -281,7 +352,7 @@ fflush(stdout); this->messenger.reduceAdd(attnOut, attnOut, batchSize * inputSeqLen * hiddenSize); } } -// printf("attention reduceadd end forward layers_per_stage: %d, start_layer: %d\n", layers_per_stage, start_layer); + // When attention and FFN/MLP are in parallel, use the initial embedding as input if constexpr (ATTN_MLP_PARALLEL) { if (this->messenger.getSize() > 1) { @@ -301,12 +372,10 @@ fflush(stdout); } } - MPI_Send(embBuf, batchSize * inputSeqLen * ctx->hiddenSize, MPI_FLOAT, 1, 0, MPI_COMM_WORLD); - - printf("end forward layers_per_stage: %d, start_layer: %d\n", layers_per_stage, start_layer); - - if (pp_stage_idx == 0) { - printf("finish : pp_stage_idx %d\n", pp_stage_idx); + if (pp_rank < pp_size - 1) { + // If current pipeline stage isn't the end of stage, return nullptr + // [MPI] Send data to next pipeline stage + MPI_Send(embBuf, batchSize * inputSeqLen * ctx->hiddenSize, MPI_FLOAT, pp_rank + 1, 100 * pp_rank, MPI_COMM_WORLD); return std::tuple(nullptr, 0, 0); } @@ -463,6 +532,10 @@ fflush(stdout); int getRank() { return messenger.getRank(); } + int getPPSize() { return this->pp_size; } + + int getTPSize() { return this->tp_size; } + WDataType getDataType() { return wType; } int getEndId() { return endId; } @@ -500,12 +573,13 @@ fflush(stdout); DecoderContext *getDecoderContext(int layers, const int hiddenSize, const int attHeadNum, const int kvHeadNum, const int imSize, const std::string &act, const float epsilon, int vocabSize, int embeddingSize, int maxPositions, int maxPosEmbed, int maxSeqLength, RopeParams *ropeParamsPtr) { - pp_stages_num = Env::getPipeline(); + pp_size = Env::getPipeline(); + pp_rank = messenger.getColor(); int splits = messenger.getSize(); int splitIdx = messenger.getRank(); - pp_stage_idx = messenger.getColor(); + tp_size = splits; tp_rank = splitIdx; - printf("pp_stages_num: %d, pp_stage_idx: %d, tp_rank: %d\n", pp_stages_num, pp_stage_idx, tp_rank); + printf("pp_size: %d, pp_rank: %d, tp_size: %d, tp_rank: %d\n", pp_size, pp_rank, tp_size, tp_rank); if (context != nullptr) { if (context->hiddenSize == hiddenSize && context->attHeadNum == attHeadNum @@ -714,6 +788,12 @@ fflush(stdout); // For communication Messenger &messenger; + // For pipeline parallel and tensor parallel config + int pp_size = 1; // pipeline parallel stage size + int pp_rank = 0; // pipeline parallel stage rank + int tp_size = 1; // tensor parallel size + int tp_rank = 0; // tensor parallel rank + // Execution context std::shared_ptr context; @@ -754,10 +834,6 @@ fflush(stdout); int startId; int endId; - int pp_stages_num = 1; // pipeline_parallel_stages_num - int pp_stage_idx = 0; // pipeline_parallel_stage - int tp_rank = 0; // tensor_parallel_rank - WDataType wType; #ifdef DEBUG diff --git a/src/models/hybrid_model.h b/src/models/hybrid_model.h index c9379573..5d026b8f 100644 --- a/src/models/hybrid_model.h +++ b/src/models/hybrid_model.h @@ -77,6 +77,10 @@ class HybridModel : public AbstractDecoder { int getRank() { return firstModel->getRank(); } + int getPPSize() { return firstModel->getPPSize(); } + + int getTPSize() { return firstModel->getTPSize(); } + int getEndId() { return firstModel->getEndId(); } void setPrefix(int *ids, int seqLen) { firstModel->setPrefix(ids, seqLen); } diff --git a/src/searchers/greedy_search.cpp b/src/searchers/greedy_search.cpp index ac2f5f63..d3557326 100644 --- a/src/searchers/greedy_search.cpp +++ b/src/searchers/greedy_search.cpp @@ -46,12 +46,13 @@ std::vector GreedySearch::getNextToken(int *ids, int batchSize, int seqLen) std::tuple result = decoder.forward(ids, dims, this->step++); - if (std::get<0>(result) == nullptr) { + int predictor_world_rank = (decoder.getPPSize() - 1) * decoder.getTPSize(); + if (std::get<0>(result) == nullptr) { // The first pipeline parallel stage this->nextTokens = std::vector(batchSize, 0); - MPI_Recv(this->nextTokens.data(), batchSize, MPI_INT32_T, 1, 1, MPI_COMM_WORLD, MPI_STATUS_IGNORE); - } else { + MPI_Recv(this->nextTokens.data(), batchSize, MPI_INT32_T, predictor_world_rank, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + } else { // The last pipeline parallel stage this->nextTokens = search(result); - MPI_Send(this->nextTokens.data(), batchSize, MPI_INT32_T, 0, 1, MPI_COMM_WORLD); + MPI_Send(this->nextTokens.data(), batchSize, MPI_INT32_T, 0, 0, MPI_COMM_WORLD); } this->curLen++; From d4e6be3b40996af40f46bef48714fb7a21254cb7 Mon Sep 17 00:00:00 2001 From: changqi1 Date: Wed, 7 Feb 2024 21:46:23 +0800 Subject: [PATCH 03/19] fix pp=2,tp=2 --- include/abstract_decoder.h | 4 --- src/common/transformer_ctx.h | 12 ++++++++- src/layers/attention.h | 5 ++-- src/models/common_decoder.h | 47 +++++++++++++-------------------- src/models/hybrid_model.h | 4 --- src/searchers/greedy_search.cpp | 33 ++++++++++++++++------- 6 files changed, 56 insertions(+), 49 deletions(-) diff --git a/include/abstract_decoder.h b/include/abstract_decoder.h index 1c2928f1..571e0bf7 100644 --- a/include/abstract_decoder.h +++ b/include/abstract_decoder.h @@ -44,10 +44,6 @@ class AbstractDecoder { virtual int getRank() = 0; - virtual int getPPSize() = 0; - - virtual int getTPSize() = 0; - virtual int getEndId() = 0; virtual void setPrefix(int *ids, int seqLen) = 0; diff --git a/src/common/transformer_ctx.h b/src/common/transformer_ctx.h index 6be7f62d..e2e876de 100644 --- a/src/common/transformer_ctx.h +++ b/src/common/transformer_ctx.h @@ -84,6 +84,12 @@ struct DecoderContext { // # of splits (the same as NUMA node number in the system) const int numSplit; + // For pipeline parallel and tensor parallel config + int ppSize = 1; // pipeline parallel stage size + int ppRank = 0; // pipeline parallel stage rank + int tpSize = 1; // tensor parallel size + int tpRank = 0; // tensor parallel rank + enum ActivationType { RELU, GELU, SWIGLU, SILU }; ActivationType actType; @@ -105,7 +111,7 @@ struct DecoderContext { public: DecoderContext(int _layers, int _hiddenSize, int _attHeadNum, int _kvHeadNum, int _imSize, const std::string &act, float epsilon, int _vocabSize, int _embeddingSize, int _maxPositions, int _maxPosEmbed, int _maxSeqLength, - int _splitIdx, int _splits, RopeParams *_ropeParamsPtr = nullptr, int numThreads = 0) + int _splitIdx, int _splits, int _ppSize = 1, int _ppRank = 0, RopeParams *_ropeParamsPtr = nullptr, int numThreads = 0) : layers(_layers) , hiddenSize(_hiddenSize) , intermediateSize(_imSize) @@ -119,6 +125,10 @@ struct DecoderContext { , ropeParamsPtr(_ropeParamsPtr) , splitIdx(_splitIdx) , numSplit(_splits) + , ppSize(_ppSize) + , ppRank(_ppRank) + , tpSize(_splits) + , tpRank(_splitIdx) , epsilon(epsilon) { if (attHeadNum != 0) { this->attHeadSize = hiddenSize / attHeadNum; diff --git a/src/layers/attention.h b/src/layers/attention.h index 414fae24..0e22a755 100644 --- a/src/layers/attention.h +++ b/src/layers/attention.h @@ -273,7 +273,7 @@ class Attention { imBuffer.Assign(inputBuffer.Data(), inputBuffer.Rows(), inputBuffer.Cols(), inputBuffer.Stride()); inputBuffer.Assign(tmp, rows, cols, stride); } - // printf("attention.forward 4\n"); fflush(stdout); + // TODO: refine the logic (and support large inputSeqLen when pastSeqLen > 0) if constexpr (std::is_same_v && std::is_same_v) { if (pastSeqLen == 0) { @@ -286,7 +286,6 @@ class Attention { flashAttention( ctx, qkvGroupMatMul, outBuffer, imBuffer, presentKey, presentValue, attnMask, pastSeqLen); else { - // printf("attention.forward 5\n"); fflush(stdout); fusedAttention(ctx, query, key, value, imBuffer, presentKey, presentValue, attnMask, pastSeqLen); } } @@ -378,7 +377,7 @@ class Attention { // to make sure it works better (the logic here is trying to make sure each head of BMM result [seq * seq] in cache) // WARN: reserve field in context is used to make it effective for all layers, do not change it in other places int &mBlockSize = ctx->reserved1; - if (layerId == 0 || layerId == 12) { + if (layerId % (ctx->layers / ctx->ppSize) == 0) { // TODO: if pastSeqLen > 0 and inputSeqLen large. if (pastSeqLen == 0) { const int l2CacheSize = 2 * 1024 * 1024; // TODO: get it dynamically diff --git a/src/models/common_decoder.h b/src/models/common_decoder.h index dffe89b6..f8667550 100644 --- a/src/models/common_decoder.h +++ b/src/models/common_decoder.h @@ -68,7 +68,7 @@ struct MlpTypeExtractor> { /* Pipeline parallel and tensor parallel introduction: - 1) MPI_Instances = 16,XFT_PIPELINE_STAGES = 4 => pp_size = 4, tp_size = 4 + 1) MPI_Instances = 16,XFT_PIPELINE_STAGES = 4 => ctx->ppSize = 4, ctx->tpSize = 4 2) TP sync by oneCCL(row_comm) or shared_memory 3) PP sync by MPI MPI_COMM_WORLD @@ -212,12 +212,12 @@ class CommonDecoder : public AbstractDecoder { vocabSize, embeddingSize, maxPositions, maxPosEmbed, maxSeqLength, ropeParamsPtr); // Decoder - if (layers % pp_size != 0) { - std::cerr << "Warning: layers cannot be evenly divided by pipeline parallel stage size(pp_size)." << std::endl; + if (layers % ctx->ppSize != 0) { + std::cerr << "Warning: layers cannot be evenly divided by pipeline parallel stage size(ppSize)." << std::endl; } - int layers_per_pp_stage = layers / pp_size; - int start_layer = pp_rank * layers_per_pp_stage; + int layers_per_pp_stage = layers / ctx->ppSize; + int start_layer = ctx->ppRank * layers_per_pp_stage; for (int i = start_layer; i < start_layer + layers_per_pp_stage; ++i) { auto pdec = new DECODER(ctx, i); this->setDecoderWeights(pdec, modelPath, i); @@ -313,9 +313,11 @@ class CommonDecoder : public AbstractDecoder { t1.release(); // if current pipeline parallel stage rank isn't the first stage, should receive previous stage data - if (pp_rank > 0) { + if (ctx->ppRank > 0) { // [MPI] Recv data from world_rank 0 - MPI_Recv(embBuf, batchSize * inputSeqLen * ctx->hiddenSize, MPI_FLOAT, pp_rank - 1, 100 * (pp_rank - 1), MPI_COMM_WORLD, MPI_STATUS_IGNORE); + int curr_world_rank = ctx->ppRank * ctx->tpSize + ctx->tpRank; + int prev_world_rank = (ctx->ppRank - 1) * ctx->tpSize + ctx->tpRank; + MPI_Recv(embBuf, batchSize * inputSeqLen * ctx->hiddenSize, MPI_FLOAT, prev_world_rank, curr_world_rank, MPI_COMM_WORLD, MPI_STATUS_IGNORE); } // Decoder: forward @@ -372,10 +374,11 @@ class CommonDecoder : public AbstractDecoder { } } - if (pp_rank < pp_size - 1) { + if (ctx->ppRank < ctx->ppSize - 1) { // If current pipeline stage isn't the end of stage, return nullptr // [MPI] Send data to next pipeline stage - MPI_Send(embBuf, batchSize * inputSeqLen * ctx->hiddenSize, MPI_FLOAT, pp_rank + 1, 100 * pp_rank, MPI_COMM_WORLD); + int next_world_rank = (ctx->ppRank + 1) * ctx->tpSize + ctx->tpRank; + MPI_Send(embBuf, batchSize * inputSeqLen * ctx->hiddenSize, MPI_FLOAT, next_world_rank, next_world_rank, MPI_COMM_WORLD); return std::tuple(nullptr, 0, 0); } @@ -532,10 +535,6 @@ class CommonDecoder : public AbstractDecoder { int getRank() { return messenger.getRank(); } - int getPPSize() { return this->pp_size; } - - int getTPSize() { return this->tp_size; } - WDataType getDataType() { return wType; } int getEndId() { return endId; } @@ -573,18 +572,16 @@ class CommonDecoder : public AbstractDecoder { DecoderContext *getDecoderContext(int layers, const int hiddenSize, const int attHeadNum, const int kvHeadNum, const int imSize, const std::string &act, const float epsilon, int vocabSize, int embeddingSize, int maxPositions, int maxPosEmbed, int maxSeqLength, RopeParams *ropeParamsPtr) { - pp_size = Env::getPipeline(); - pp_rank = messenger.getColor(); - int splits = messenger.getSize(); - int splitIdx = messenger.getRank(); - tp_size = splits; - tp_rank = splitIdx; - printf("pp_size: %d, pp_rank: %d, tp_size: %d, tp_rank: %d\n", pp_size, pp_rank, tp_size, tp_rank); + int tpSize = messenger.getSize(); + int tpRank = messenger.getRank(); + int ppSize = Env::getPipeline(); + int ppRank = messenger.getColor(); + printf("ppSize: %d, ppRank: %d, tpSize: %d, tpRank: %d\n", ppSize, ppRank, tpSize, tpRank); if (context != nullptr) { if (context->hiddenSize == hiddenSize && context->attHeadNum == attHeadNum && context->kvHeadNum == kvHeadNum && context->intermediateSize == imSize - && context->splitIdx == splitIdx) { + && context->tpRank == tpRank) { return context.get(); } else { printf("Different context size not unsupported!\n"); @@ -592,7 +589,7 @@ class CommonDecoder : public AbstractDecoder { } } else { this->context.reset(new DecoderContext(layers, hiddenSize, attHeadNum, kvHeadNum, imSize, act, epsilon, - vocabSize, embeddingSize, maxPositions, maxPosEmbed, maxSeqLength, splitIdx, splits, ropeParamsPtr)); + vocabSize, embeddingSize, maxPositions, maxPosEmbed, maxSeqLength, tpRank, tpSize, ppSize, ppRank, ropeParamsPtr)); } return this->context.get(); @@ -788,12 +785,6 @@ class CommonDecoder : public AbstractDecoder { // For communication Messenger &messenger; - // For pipeline parallel and tensor parallel config - int pp_size = 1; // pipeline parallel stage size - int pp_rank = 0; // pipeline parallel stage rank - int tp_size = 1; // tensor parallel size - int tp_rank = 0; // tensor parallel rank - // Execution context std::shared_ptr context; diff --git a/src/models/hybrid_model.h b/src/models/hybrid_model.h index 5d026b8f..c9379573 100644 --- a/src/models/hybrid_model.h +++ b/src/models/hybrid_model.h @@ -77,10 +77,6 @@ class HybridModel : public AbstractDecoder { int getRank() { return firstModel->getRank(); } - int getPPSize() { return firstModel->getPPSize(); } - - int getTPSize() { return firstModel->getTPSize(); } - int getEndId() { return firstModel->getEndId(); } void setPrefix(int *ids, int seqLen) { firstModel->setPrefix(ids, seqLen); } diff --git a/src/searchers/greedy_search.cpp b/src/searchers/greedy_search.cpp index d3557326..715d5f9d 100644 --- a/src/searchers/greedy_search.cpp +++ b/src/searchers/greedy_search.cpp @@ -46,13 +46,20 @@ std::vector GreedySearch::getNextToken(int *ids, int batchSize, int seqLen) std::tuple result = decoder.forward(ids, dims, this->step++); - int predictor_world_rank = (decoder.getPPSize() - 1) * decoder.getTPSize(); - if (std::get<0>(result) == nullptr) { // The first pipeline parallel stage + DecoderContext *ctx = decoder.getContext(); + if (std::get<0>(result) == nullptr) { // The first embedding pipeline parallel stage this->nextTokens = std::vector(batchSize, 0); - MPI_Recv(this->nextTokens.data(), batchSize, MPI_INT32_T, predictor_world_rank, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE); - } else { // The last pipeline parallel stage + if (ctx->ppRank == 0) { + int predictor_world_rank = (ctx->ppSize - 1) * ctx->tpSize + ctx->tpRank; + MPI_Recv(this->nextTokens.data(), batchSize, MPI_INT32_T, predictor_world_rank, predictor_world_rank, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + } + } else { // The last predictor pipeline parallel stage this->nextTokens = search(result); - MPI_Send(this->nextTokens.data(), batchSize, MPI_INT32_T, 0, 0, MPI_COMM_WORLD); + if (ctx->ppRank == ctx->ppSize - 1) { + int embedding_world_rank = 0 * ctx->tpSize + ctx->tpRank; + int predictor_world_rank = (ctx->ppSize - 1) * ctx->tpSize + ctx->tpRank; + MPI_Send(this->nextTokens.data(), batchSize, MPI_INT32_T, embedding_world_rank, predictor_world_rank, MPI_COMM_WORLD); + } } this->curLen++; @@ -69,12 +76,20 @@ std::vector GreedySearch::getNextToken() { int64_t dims[3] = {batchSize, 1, 1}; std::tuple result = decoder.forward(nextTokens.data(), dims, this->step++); - if (std::get<0>(result) == nullptr) { + DecoderContext *ctx = decoder.getContext(); + if (std::get<0>(result) == nullptr) { // The first embedding pipeline parallel stage this->nextTokens = std::vector(batchSize, 0); - MPI_Recv(this->nextTokens.data(), batchSize, MPI_INT32_T, 1, 1, MPI_COMM_WORLD, MPI_STATUS_IGNORE); - } else { + if (ctx->ppRank == 0) { + int predictor_world_rank = (ctx->ppSize - 1) * ctx->tpSize + ctx->tpRank; + MPI_Recv(this->nextTokens.data(), batchSize, MPI_INT32_T, predictor_world_rank, predictor_world_rank, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + } + } else { // The last predictor pipeline parallel stage this->nextTokens = search(result); - MPI_Send(this->nextTokens.data(), batchSize, MPI_INT32_T, 0, 1, MPI_COMM_WORLD); + if (ctx->ppRank == ctx->ppSize - 1) { + int embedding_world_rank = 0 * ctx->tpSize + ctx->tpRank; + int predictor_world_rank = (ctx->ppSize - 1) * ctx->tpSize + ctx->tpRank; + MPI_Send(this->nextTokens.data(), batchSize, MPI_INT32_T, embedding_world_rank, predictor_world_rank, MPI_COMM_WORLD); + } } this->curLen++; From d3515cd17bd48f097b9162c0e6f2c9e4bd3ff344 Mon Sep 17 00:00:00 2001 From: changqi1 Date: Wed, 7 Feb 2024 21:54:04 +0800 Subject: [PATCH 04/19] format code --- src/models/common_decoder.h | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/models/common_decoder.h b/src/models/common_decoder.h index f8667550..b23315b7 100644 --- a/src/models/common_decoder.h +++ b/src/models/common_decoder.h @@ -227,7 +227,6 @@ class CommonDecoder : public AbstractDecoder { // Predictor int workers = messenger.getSize(); int rank = messenger.getRank(); - int color = messenger.getColor(); this->predictor = new DistLinear(hiddenSize, vocabSize, rank, workers); this->setPredictorWeight(modelPath); @@ -334,7 +333,6 @@ class CommonDecoder : public AbstractDecoder { // Pls be noted: in attention, 'outBuf' is used as imtermediate buffer, 'tmpBuf' is used as output AttnOutT *attnOut = (AttnOutT *)(this->getContext()->tmpBuf.Data()); - this->decoders[i]->forwardAttention(getContext(), embBuf, outBuf, attnOut, attnMask, presentKey, // presentKey, presentValue, // presentValue, From 3a92d20fa99e6012355fe608e99553e0aa733b72 Mon Sep 17 00:00:00 2001 From: changqi1 Date: Wed, 7 Feb 2024 21:54:36 +0800 Subject: [PATCH 05/19] format code --- src/models/common_decoder.h | 20 ++++++++++++-------- src/searchers/greedy_search.cpp | 12 ++++++++---- 2 files changed, 20 insertions(+), 12 deletions(-) diff --git a/src/models/common_decoder.h b/src/models/common_decoder.h index b23315b7..e957ddf8 100644 --- a/src/models/common_decoder.h +++ b/src/models/common_decoder.h @@ -15,11 +15,11 @@ #pragma once #include +#include #include +#include #include #include -#include -#include #include "INIReader.h" #include "abstract_decoder.h" @@ -213,7 +213,8 @@ class CommonDecoder : public AbstractDecoder { // Decoder if (layers % ctx->ppSize != 0) { - std::cerr << "Warning: layers cannot be evenly divided by pipeline parallel stage size(ppSize)." << std::endl; + std::cerr << "Warning: layers cannot be evenly divided by pipeline parallel stage size(ppSize)." + << std::endl; } int layers_per_pp_stage = layers / ctx->ppSize; @@ -299,8 +300,8 @@ class CommonDecoder : public AbstractDecoder { dbg.debugPrint("---- embedding.forward ----\n"); dbg.debugPrint("ids:\n"); dbg.dumpMatrix(ids, batchSize, inputSeqLen, inputSeqLen); - dbg.debugPrint("embBuf(rows: %d, cols: %d, stride: %d):\n", batchSize * inputSeqLen, ctx->hiddenSize, - ctx->hiddenSize); + dbg.debugPrint( + "embBuf(rows: %d, cols: %d, stride: %d):\n", batchSize * inputSeqLen, ctx->hiddenSize, ctx->hiddenSize); dbg.dumpMatrix(embBuf, batchSize * inputSeqLen, ctx->hiddenSize, ctx->hiddenSize); #endif @@ -316,7 +317,8 @@ class CommonDecoder : public AbstractDecoder { // [MPI] Recv data from world_rank 0 int curr_world_rank = ctx->ppRank * ctx->tpSize + ctx->tpRank; int prev_world_rank = (ctx->ppRank - 1) * ctx->tpSize + ctx->tpRank; - MPI_Recv(embBuf, batchSize * inputSeqLen * ctx->hiddenSize, MPI_FLOAT, prev_world_rank, curr_world_rank, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + MPI_Recv(embBuf, batchSize * inputSeqLen * ctx->hiddenSize, MPI_FLOAT, prev_world_rank, curr_world_rank, + MPI_COMM_WORLD, MPI_STATUS_IGNORE); } // Decoder: forward @@ -376,7 +378,8 @@ class CommonDecoder : public AbstractDecoder { // If current pipeline stage isn't the end of stage, return nullptr // [MPI] Send data to next pipeline stage int next_world_rank = (ctx->ppRank + 1) * ctx->tpSize + ctx->tpRank; - MPI_Send(embBuf, batchSize * inputSeqLen * ctx->hiddenSize, MPI_FLOAT, next_world_rank, next_world_rank, MPI_COMM_WORLD); + MPI_Send(embBuf, batchSize * inputSeqLen * ctx->hiddenSize, MPI_FLOAT, next_world_rank, next_world_rank, + MPI_COMM_WORLD); return std::tuple(nullptr, 0, 0); } @@ -587,7 +590,8 @@ class CommonDecoder : public AbstractDecoder { } } else { this->context.reset(new DecoderContext(layers, hiddenSize, attHeadNum, kvHeadNum, imSize, act, epsilon, - vocabSize, embeddingSize, maxPositions, maxPosEmbed, maxSeqLength, tpRank, tpSize, ppSize, ppRank, ropeParamsPtr)); + vocabSize, embeddingSize, maxPositions, maxPosEmbed, maxSeqLength, tpRank, tpSize, ppSize, ppRank, + ropeParamsPtr)); } return this->context.get(); diff --git a/src/searchers/greedy_search.cpp b/src/searchers/greedy_search.cpp index 715d5f9d..764dafc2 100644 --- a/src/searchers/greedy_search.cpp +++ b/src/searchers/greedy_search.cpp @@ -51,14 +51,16 @@ std::vector GreedySearch::getNextToken(int *ids, int batchSize, int seqLen) this->nextTokens = std::vector(batchSize, 0); if (ctx->ppRank == 0) { int predictor_world_rank = (ctx->ppSize - 1) * ctx->tpSize + ctx->tpRank; - MPI_Recv(this->nextTokens.data(), batchSize, MPI_INT32_T, predictor_world_rank, predictor_world_rank, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + MPI_Recv(this->nextTokens.data(), batchSize, MPI_INT32_T, predictor_world_rank, predictor_world_rank, + MPI_COMM_WORLD, MPI_STATUS_IGNORE); } } else { // The last predictor pipeline parallel stage this->nextTokens = search(result); if (ctx->ppRank == ctx->ppSize - 1) { int embedding_world_rank = 0 * ctx->tpSize + ctx->tpRank; int predictor_world_rank = (ctx->ppSize - 1) * ctx->tpSize + ctx->tpRank; - MPI_Send(this->nextTokens.data(), batchSize, MPI_INT32_T, embedding_world_rank, predictor_world_rank, MPI_COMM_WORLD); + MPI_Send(this->nextTokens.data(), batchSize, MPI_INT32_T, embedding_world_rank, predictor_world_rank, + MPI_COMM_WORLD); } } @@ -81,14 +83,16 @@ std::vector GreedySearch::getNextToken() { this->nextTokens = std::vector(batchSize, 0); if (ctx->ppRank == 0) { int predictor_world_rank = (ctx->ppSize - 1) * ctx->tpSize + ctx->tpRank; - MPI_Recv(this->nextTokens.data(), batchSize, MPI_INT32_T, predictor_world_rank, predictor_world_rank, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + MPI_Recv(this->nextTokens.data(), batchSize, MPI_INT32_T, predictor_world_rank, predictor_world_rank, + MPI_COMM_WORLD, MPI_STATUS_IGNORE); } } else { // The last predictor pipeline parallel stage this->nextTokens = search(result); if (ctx->ppRank == ctx->ppSize - 1) { int embedding_world_rank = 0 * ctx->tpSize + ctx->tpRank; int predictor_world_rank = (ctx->ppSize - 1) * ctx->tpSize + ctx->tpRank; - MPI_Send(this->nextTokens.data(), batchSize, MPI_INT32_T, embedding_world_rank, predictor_world_rank, MPI_COMM_WORLD); + MPI_Send(this->nextTokens.data(), batchSize, MPI_INT32_T, embedding_world_rank, predictor_world_rank, + MPI_COMM_WORLD); } } From 10b83fb22b9563993015c566d2aa0f47015f4ff8 Mon Sep 17 00:00:00 2001 From: changqi1 Date: Wed, 7 Feb 2024 21:58:01 +0800 Subject: [PATCH 06/19] format code --- CMakeLists.txt | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 4b008ad8..8a91baa5 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -55,7 +55,7 @@ if(GCC_VERSION VERSION_GREATER_EQUAL "10.1") message(FATAL_ERROR "Error: Your compiler has issues to support avx512bf16.\n We recommend GCC version >= 12.3.0") endif() endif() -set(CMAKE_BUILD_TYPE Debug) + if(CMAKE_BUILD_TYPE MATCHES "Debug") message(STATUS "Notice: Using Debug mode.") set(CMAKE_C_FLAGS "-O0 -g") @@ -143,8 +143,8 @@ add_definitions(-DAVX512_FP16_WEIGHT_ONLY_INT4=true) add_definitions(-DAVX512_FP32_WEIGHT_ONLY_NF4=true) # add_definitions(-DAVX512_FP16_WEIGHT_ONLY_NF4=true) -add_definitions(-DDEBUG=true) -add_definitions(-DSTEP_BY_STEP_ATTN=true) +# add_definitions(-DDEBUG=true) +# add_definitions(-DSTEP_BY_STEP_ATTN=true) # add_definitions(-DUSE_SHM=true) option(XFT_BUILD_TESTS "Build xfastertransformer unit tests" OFF) From 24a288c728b5f8ebe7506fb2616e3221a8c8298a Mon Sep 17 00:00:00 2001 From: changqi1 Date: Wed, 7 Feb 2024 22:02:56 +0800 Subject: [PATCH 07/19] format code --- src/models/common_decoder.h | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/models/common_decoder.h b/src/models/common_decoder.h index e957ddf8..6cebfe42 100644 --- a/src/models/common_decoder.h +++ b/src/models/common_decoder.h @@ -15,12 +15,12 @@ #pragma once #include -#include #include -#include #include #include +#include + #include "INIReader.h" #include "abstract_decoder.h" #include "attention.h" @@ -577,7 +577,7 @@ class CommonDecoder : public AbstractDecoder { int tpRank = messenger.getRank(); int ppSize = Env::getPipeline(); int ppRank = messenger.getColor(); - printf("ppSize: %d, ppRank: %d, tpSize: %d, tpRank: %d\n", ppSize, ppRank, tpSize, tpRank); + // printf("ppSize: %d, ppRank: %d, tpSize: %d, tpRank: %d\n", ppSize, ppRank, tpSize, tpRank); if (context != nullptr) { if (context->hiddenSize == hiddenSize && context->attHeadNum == attHeadNum From cf3bce4b56bb92d0f05906bac46a821f738597c6 Mon Sep 17 00:00:00 2001 From: changqi1 Date: Wed, 7 Feb 2024 22:18:03 +0800 Subject: [PATCH 08/19] format code --- src/models/common_decoder.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/models/common_decoder.h b/src/models/common_decoder.h index 6cebfe42..e2f8e339 100644 --- a/src/models/common_decoder.h +++ b/src/models/common_decoder.h @@ -82,7 +82,7 @@ Pipeline parallel and tensor parallel introduction: │ ▼ Embedding(PP0) ◄────────────────────────────────────────────────────────────────────┐ - │ │ + │ │ PP0 ▼ │ ┌─────────────────────────────────────────────────────────────────────────────────┐ │ │ TP0 TP1 TP2 TP3 │ │ @@ -133,7 +133,7 @@ Pipeline parallel and tensor parallel introduction: ... ... │ │ ▼ │ - Predictor(PP3) │ + Predictor(PP3) │ │ │ ▼ │ Searchers │ From df1f96c86d2fcba5a7d2eb039de486a977f0cbec Mon Sep 17 00:00:00 2001 From: changqi1 Date: Thu, 8 Feb 2024 09:27:56 +0800 Subject: [PATCH 09/19] format code --- src/comm_helper/comm_helper.cpp | 13 ++++++------- src/models/common_decoder.h | 4 ++-- src/searchers/greedy_search.cpp | 8 ++++---- src/utils/verbose.h | 5 +++-- 4 files changed, 15 insertions(+), 15 deletions(-) diff --git a/src/comm_helper/comm_helper.cpp b/src/comm_helper/comm_helper.cpp index a3b6e660..e4abd44f 100644 --- a/src/comm_helper/comm_helper.cpp +++ b/src/comm_helper/comm_helper.cpp @@ -27,13 +27,12 @@ extern "C" int init(int *world_size, int *world_rank, int *world_color) { MPI_Comm_size(MPI_COMM_WORLD, world_size); MPI_Comm_rank(MPI_COMM_WORLD, world_rank); - // world_color = world_rank / tp_size = world_rank / (world_size / pp_size) - // like: world_rank = 0, 1, -> row_rank = 0, 1; - // 2, 3, 0, 1; - // 4, 5, 0, 1; - // 6, 7; 0, 1; - // pp = 4; tp = 2 - // color = 0, 0, 1, 1, 2, 2, 3, 3 + // world_color = world_rank / tpSize = world_rank / (world_size / ppSize) + // like: world_color = 0~7 / (8 / 4), XFT_PIPELINE_STAGES = ppSize = 4; tpSize = 2 + // world_rank = 0, 1, -> world_color = ppRank = 0, 0, -> tpRank = 0, 1; + // 2, 3, 1, 1, 0, 1; + // 4, 5, 2, 2, 0, 1; + // 6, 7; 3, 3; 0, 1; *world_color = *world_rank / (*world_size / *world_color); MPI_Comm row_comm; MPI_Comm_split(MPI_COMM_WORLD, *world_color, *world_rank, &row_comm); diff --git a/src/models/common_decoder.h b/src/models/common_decoder.h index e2f8e339..e8cbc916 100644 --- a/src/models/common_decoder.h +++ b/src/models/common_decoder.h @@ -313,7 +313,7 @@ class CommonDecoder : public AbstractDecoder { t1.release(); // if current pipeline parallel stage rank isn't the first stage, should receive previous stage data - if (ctx->ppRank > 0) { + if (ctx->ppSize > 1 && ctx->ppRank > 0) { // [MPI] Recv data from world_rank 0 int curr_world_rank = ctx->ppRank * ctx->tpSize + ctx->tpRank; int prev_world_rank = (ctx->ppRank - 1) * ctx->tpSize + ctx->tpRank; @@ -374,7 +374,7 @@ class CommonDecoder : public AbstractDecoder { } } - if (ctx->ppRank < ctx->ppSize - 1) { + if (ctx->ppSize > 1 && ctx->ppRank < ctx->ppSize - 1) { // If current pipeline stage isn't the end of stage, return nullptr // [MPI] Send data to next pipeline stage int next_world_rank = (ctx->ppRank + 1) * ctx->tpSize + ctx->tpRank; diff --git a/src/searchers/greedy_search.cpp b/src/searchers/greedy_search.cpp index 764dafc2..5f807811 100644 --- a/src/searchers/greedy_search.cpp +++ b/src/searchers/greedy_search.cpp @@ -49,14 +49,14 @@ std::vector GreedySearch::getNextToken(int *ids, int batchSize, int seqLen) DecoderContext *ctx = decoder.getContext(); if (std::get<0>(result) == nullptr) { // The first embedding pipeline parallel stage this->nextTokens = std::vector(batchSize, 0); - if (ctx->ppRank == 0) { + if (ctx->ppSize > 1 && ctx->ppRank == 0) { int predictor_world_rank = (ctx->ppSize - 1) * ctx->tpSize + ctx->tpRank; MPI_Recv(this->nextTokens.data(), batchSize, MPI_INT32_T, predictor_world_rank, predictor_world_rank, MPI_COMM_WORLD, MPI_STATUS_IGNORE); } } else { // The last predictor pipeline parallel stage this->nextTokens = search(result); - if (ctx->ppRank == ctx->ppSize - 1) { + if (ctx->ppSize > 1 && ctx->ppRank == ctx->ppSize - 1) { int embedding_world_rank = 0 * ctx->tpSize + ctx->tpRank; int predictor_world_rank = (ctx->ppSize - 1) * ctx->tpSize + ctx->tpRank; MPI_Send(this->nextTokens.data(), batchSize, MPI_INT32_T, embedding_world_rank, predictor_world_rank, @@ -81,14 +81,14 @@ std::vector GreedySearch::getNextToken() { DecoderContext *ctx = decoder.getContext(); if (std::get<0>(result) == nullptr) { // The first embedding pipeline parallel stage this->nextTokens = std::vector(batchSize, 0); - if (ctx->ppRank == 0) { + if (ctx->ppSize > 1 && ctx->ppRank == 0) { int predictor_world_rank = (ctx->ppSize - 1) * ctx->tpSize + ctx->tpRank; MPI_Recv(this->nextTokens.data(), batchSize, MPI_INT32_T, predictor_world_rank, predictor_world_rank, MPI_COMM_WORLD, MPI_STATUS_IGNORE); } } else { // The last predictor pipeline parallel stage this->nextTokens = search(result); - if (ctx->ppRank == ctx->ppSize - 1) { + if (ctx->ppSize > 1 && ctx->ppRank == ctx->ppSize - 1) { int embedding_world_rank = 0 * ctx->tpSize + ctx->tpRank; int predictor_world_rank = (ctx->ppSize - 1) * ctx->tpSize + ctx->tpRank; MPI_Send(this->nextTokens.data(), batchSize, MPI_INT32_T, embedding_world_rank, predictor_world_rank, diff --git a/src/utils/verbose.h b/src/utils/verbose.h index c31bdac8..8d2f3cfb 100644 --- a/src/utils/verbose.h +++ b/src/utils/verbose.h @@ -72,7 +72,7 @@ class Env { // Pipeline Parallel private: static int &pipelineValue() { - static int value = 0; + static int value = 1; return value; } @@ -81,7 +81,8 @@ class Env { char *xft_pipeline_value = getenv("XFT_PIPELINE_STAGES"); if (xft_pipeline_value != NULL) { int value = atoi(xft_pipeline_value); - pipelineValue() = value; + if (value >= 1) + pipelineValue() = value; } else { pipelineValue() = 1; } From 3614aec3bbe5008d62a7d0df9ea37759b6434b16 Mon Sep 17 00:00:00 2001 From: changqi1 Date: Thu, 8 Feb 2024 10:27:31 +0800 Subject: [PATCH 10/19] Add introduction --- src/models/common_decoder.h | 133 ++++++++++++++++++------------------ 1 file changed, 66 insertions(+), 67 deletions(-) diff --git a/src/models/common_decoder.h b/src/models/common_decoder.h index e8cbc916..5d3a0a8b 100644 --- a/src/models/common_decoder.h +++ b/src/models/common_decoder.h @@ -72,73 +72,72 @@ Pipeline parallel and tensor parallel introduction: 2) TP sync by oneCCL(row_comm) or shared_memory 3) PP sync by MPI MPI_COMM_WORLD - World Rank: => Row Rank: => tp0 tp1 tp2 tp3 - [ 0, 1, 2, 3, [ 0, 1, 2, 3]; pp0 [ 0, 1, 2, 3]; - 4, 5, 6, 7, [ 0, 1, 2, 3]; pp1 [ 0, 1, 2, 3]; - 8, 9, 10, 11, [ 0, 1, 2, 3]; pp2 [ 0, 1, 2, 3]; - 12, 13, 14, 15]; [ 0, 1, 2, 3]; pp3 [ 0, 1, 2, 3]; - - Prompts - │ - ▼ - Embedding(PP0) ◄────────────────────────────────────────────────────────────────────┐ - │ │ - PP0 ▼ │ - ┌─────────────────────────────────────────────────────────────────────────────────┐ │ - │ TP0 TP1 TP2 TP3 │ │ - │ ┌────────────────┐ ┌────────────────┐ ┌────────────────┐ ┌────────────────┐ │ │ - │ │ OMP │ │ OMP │ │ OMP │ │ OMP │ │ │ - │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ - │ │ ▼ ▼ ▼ ▼ ▼ ▼ ...│ │ ▼ ▼ ▼ ▼ ▼ ▼ ...│ │ ▼ ▼ ▼ ▼ ▼ ▼ ...│ │ ▼ ▼ ▼ ▼ ▼ ▼ ...│ │ │ - │ └──┬─────────────┘ └─┬──────────────┘ └─┬──────────────┘ └─┬──────────────┘ ... │ │ - │ │◄────────────────┘◄─────────────────┘◄─────────────────┘ │ │ - │ ▼ layer0-7 │ │ - └────┬────────────────────────────────────────────────────────────────────────────┘ │ - │ │ - PP1 ▼ │ - ┌─────────────────────────────────────────────────────────────────────────────────┐ │ - │ TP0 TP1 TP2 TP3 │ │ - │ ┌────────────────┐ ┌────────────────┐ ┌────────────────┐ ┌────────────────┐ │ │ - │ │ OMP │ │ OMP │ │ OMP │ │ OMP │ │ │ - │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ - │ │ ▼ ▼ ▼ ▼ ▼ ▼ ...│ │ ▼ ▼ ▼ ▼ ▼ ▼ ...│ │ ▼ ▼ ▼ ▼ ▼ ▼ ...│ │ ▼ ▼ ▼ ▼ ▼ ▼ ...│ │ │ - │ └──┬─────────────┘ └─┬──────────────┘ └─┬──────────────┘ └─┬──────────────┘ ... │ │ - │ │◄────────────────┘◄─────────────────┘◄─────────────────┘ │ │ - │ ▼ layer8-15 │ │ - └────┬────────────────────────────────────────────────────────────────────────────┘ │ - │ │ - PP2 ▼ │ - ┌─────────────────────────────────────────────────────────────────────────────────┐ │ - │ TP0 TP1 TP2 TP3 │ │ - │ ┌────────────────┐ ┌────────────────┐ ┌────────────────┐ ┌────────────────┐ │ │ - │ │ OMP │ │ OMP │ │ OMP │ │ OMP │ │ │ - │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ - │ │ ▼ ▼ ▼ ▼ ▼ ▼ ...│ │ ▼ ▼ ▼ ▼ ▼ ▼ ...│ │ ▼ ▼ ▼ ▼ ▼ ▼ ...│ │ ▼ ▼ ▼ ▼ ▼ ▼ ...│ │ │ - │ └──┬─────────────┘ └─┬──────────────┘ └─┬──────────────┘ └─┬──────────────┘ ... │ │ - │ │◄────────────────┘◄─────────────────┘◄─────────────────┘ │ │ - │ ▼ layer16-23 │ │ - └────┬────────────────────────────────────────────────────────────────────────────┘ │ - │ │ - PP3 ▼ │ - ┌─────────────────────────────────────────────────────────────────────────────────┐ │ - │ TP0 TP1 TP2 TP3 │ │ - │ ┌────────────────┐ ┌────────────────┐ ┌────────────────┐ ┌────────────────┐ │ │ - │ │ OMP │ │ OMP │ │ OMP │ │ OMP │ │ │ - │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ - │ │ ▼ ▼ ▼ ▼ ▼ ▼ ...│ │ ▼ ▼ ▼ ▼ ▼ ▼ ...│ │ ▼ ▼ ▼ ▼ ▼ ▼ ...│ │ ▼ ▼ ▼ ▼ ▼ ▼ ...│ │ │ - │ └──┬─────────────┘ └─┬──────────────┘ └─┬──────────────┘ └─┬──────────────┘ ... │ │ - │ │◄────────────────┘◄─────────────────┘◄─────────────────┘ │ │ - │ ▼ layer24-31 │ │ - └────┬────────────────────────────────────────────────────────────────────────────┘ │ - ... ... - │ │ - ▼ │ - Predictor(PP3) │ - │ │ - ▼ │ - Searchers │ - │ │ - ▼ ───────────────────────────────────────────────────────────────────────────────┘ + World Rank: => Row Rank: => Rank: tp0 tp1 tp2 tp3 + [ 0, 1, 2, 3, [ 0, 1, 2, 3]; pp0 [ 0, 1, 2, 3]; + 4, 5, 6, 7, [ 0, 1, 2, 3]; pp1 [ 0, 1, 2, 3]; + 8, 9, 10, 11, [ 0, 1, 2, 3]; pp2 [ 0, 1, 2, 3]; + 12, 13, 14, 15]; [ 0, 1, 2, 3]; pp3 [ 0, 1, 2, 3]; + + Prompts + │ + ┌──────────────────┬─────────┴────────┬──────────────────┐ + │ │ │ │ + ▼ ▼ ▼ ▼ + Embedding(PP0) Embedding(PP0) Embedding(PP0) Embedding(PP0) + │ │ │ │ + PP0 │ │ │ │ + ┌─────────┼──────────────────┼──────────────────┼──────────────────┼──────────────┐ + │ TP0 │ TP1 │ TP2 │ TP3 │ layer0-7 │ + │ ┌───────▼────────┐ ┌───────▼────────┐ ┌───────▼────────┐ ┌───────▼────────┐ │ + │ │ OMP │ │ OMP │ │ OMP │ │ OMP │ │ + │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ + │ │ ▼ ▼ ▼ ▼ ▼ ▼ ...│ │ ▼ ▼ ▼ ▼ ▼ ▼ ...│ │ ▼ ▼ ▼ ▼ ▼ ▼ ...│ │ ▼ ▼ ▼ ▼ ▼ ▼ ...│ │ + │ └───────┬────────┘ └───────┬────────┘ └───────┬────────┘ └───────┬────────┘ │ + │ ┌───────┼──────────────────┼─────AllReduce────┼──────────────────┼────────┐ │ + │ └───────┼──────────────────┼──────────────────┼──────────────────┼────────┘ │ + └─────────┼──────────────────┼──────────────────┼──────────────────┼──────────────┘ + PP1 │ MPI Send/Recv │ │ │ + ┌─────────┼──────────────────┼──────────────────┼──────────────────┼──────────────┐ + │ TP0 │ TP1 │ TP2 │ TP3 │ layer8-15 │ + │ ┌───────▼────────┐ ┌───────▼────────┐ ┌───────▼────────┐ ┌───────▼────────┐ │ + │ │ OMP │ │ OMP │ │ OMP │ │ OMP │ │ + │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ + │ │ ▼ ▼ ▼ ▼ ▼ ▼ ...│ │ ▼ ▼ ▼ ▼ ▼ ▼ ...│ │ ▼ ▼ ▼ ▼ ▼ ▼ ...│ │ ▼ ▼ ▼ ▼ ▼ ▼ ...│ │ + │ └───────┬────────┘ └───────┬────────┘ └───────┬────────┘ └───────┬────────┘ │ + │ ┌───────┼──────────────────┼─────AllReduce────┼──────────────────┼────────┐ │ + │ └───────┼──────────────────┼──────────────────┼──────────────────┼────────┘ │ + └─────────┼──────────────────┼──────────────────┼──────────────────┼──────────────┘ + PP2 │ MPI Send/Recv │ │ │ + ┌─────────┼──────────────────┼──────────────────┼──────────────────┼──────────────┐ + │ TP0 │ TP1 │ TP2 │ TP3 │ layer16-23 │ + │ ┌───────▼────────┐ ┌───────▼────────┐ ┌───────▼────────┐ ┌───────▼────────┐ │ + │ │ OMP │ │ OMP │ │ OMP │ │ OMP │ │ + │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ + │ │ ▼ ▼ ▼ ▼ ▼ ▼ ...│ │ ▼ ▼ ▼ ▼ ▼ ▼ ...│ │ ▼ ▼ ▼ ▼ ▼ ▼ ...│ │ ▼ ▼ ▼ ▼ ▼ ▼ ...│ │ + │ └───────┬────────┘ └───────┬────────┘ └───────┬────────┘ └───────┬────────┘ │ + │ ┌───────┼──────────────────┼─────AllReduce────┼──────────────────┼────────┐ │ + │ └───────┼──────────────────┼──────────────────┼──────────────────┼────────┘ │ + └─────────┼──────────────────┼──────────────────┼──────────────────┼──────────────┘ + PP3 │ MPI Send/Recv │ │ │ + ┌─────────┼──────────────────┼──────────────────┼──────────────────┼──────────────┐ + │ TP0 │ TP1 │ TP2 │ TP3 │ layer24-31 │ + │ ┌───────▼────────┐ ┌───────▼────────┐ ┌───────▼────────┐ ┌───────▼────────┐ │ + │ │ OMP │ │ OMP │ │ OMP │ │ OMP │ │ + │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ + │ │ ▼ ▼ ▼ ▼ ▼ ▼ ...│ │ ▼ ▼ ▼ ▼ ▼ ▼ ...│ │ ▼ ▼ ▼ ▼ ▼ ▼ ...│ │ ▼ ▼ ▼ ▼ ▼ ▼ ...│ │ + │ └───────┬────────┘ └───────┬────────┘ └───────┬────────┘ └───────┬────────┘ │ + │ ┌───────┼──────────────────┼─────AllReduce────┼──────────────────┼────────┐ │ + │ └───────┼──────────────────┼──────────────────┼──────────────────┼────────┘ │ + └─────────┼──────────────────┼──────────────────┼──────────────────┼──────────────┘ + │ │ │ │ + ▼ ▼ ▼ ▼ + Predictor(PP3) Predictor(PP3) Predictor(PP3) Predictor(PP3) + │ MPI Send/Recv │ │ │ + ▼ ▼ ▼ ▼ + Searchers(PP0) Searchers(PP0) Searchers(PP0) Searchers(PP0) + │ + ▼ + Output */ // Template parameters: From 18f44cd38f8e2c2905a3e7360d0bdd2ee3ae07d7 Mon Sep 17 00:00:00 2001 From: changqi1 Date: Thu, 8 Feb 2024 10:44:55 +0800 Subject: [PATCH 11/19] rename pipeline stage --- src/comm_helper/comm_helper.cpp | 2 +- src/models/common_decoder.h | 2 +- src/models/models.cpp | 2 +- src/utils/messenger.h | 2 +- src/utils/verbose.h | 10 +++++----- 5 files changed, 9 insertions(+), 9 deletions(-) diff --git a/src/comm_helper/comm_helper.cpp b/src/comm_helper/comm_helper.cpp index e4abd44f..fda32324 100644 --- a/src/comm_helper/comm_helper.cpp +++ b/src/comm_helper/comm_helper.cpp @@ -19,7 +19,7 @@ static ccl::communicator *pcomm; // world_color is initialized to pipeline_parallel_stages_num(pp_size) -// and will be re-assign to world_color of MPI +// and will be re-assign to world_color of MPI == ppRank extern "C" int init(int *world_size, int *world_rank, int *world_color) { ccl::init(); diff --git a/src/models/common_decoder.h b/src/models/common_decoder.h index 5d3a0a8b..f8d42149 100644 --- a/src/models/common_decoder.h +++ b/src/models/common_decoder.h @@ -574,7 +574,7 @@ class CommonDecoder : public AbstractDecoder { int maxPositions, int maxPosEmbed, int maxSeqLength, RopeParams *ropeParamsPtr) { int tpSize = messenger.getSize(); int tpRank = messenger.getRank(); - int ppSize = Env::getPipeline(); + int ppSize = Env::getPipelineStage(); int ppRank = messenger.getColor(); // printf("ppSize: %d, ppRank: %d, tpSize: %d, tpRank: %d\n", ppSize, ppRank, tpSize, tpRank); diff --git a/src/models/models.cpp b/src/models/models.cpp index 97585615..fcddb647 100644 --- a/src/models/models.cpp +++ b/src/models/models.cpp @@ -51,7 +51,7 @@ GenerationMode getGenerationMode(SearcherConfig &config_) { Model::Model() : decoder(nullptr), searcher(nullptr), isNewInput(true) { Env::initVerbose(); - Env::initPipeline(); + Env::initPipelineStage(); TimeLine::init(); } diff --git a/src/utils/messenger.h b/src/utils/messenger.h index e614a19d..70460a70 100644 --- a/src/utils/messenger.h +++ b/src/utils/messenger.h @@ -57,7 +57,7 @@ class Messenger { atexit(Messenger::mpi_finalize); - color = Env::getPipeline(); + color = Env::getPipelineStage(); int sameHostnames = (*helperInit)(&size, &rank, &color); #ifdef USE_SHM diff --git a/src/utils/verbose.h b/src/utils/verbose.h index 8d2f3cfb..3d11cff1 100644 --- a/src/utils/verbose.h +++ b/src/utils/verbose.h @@ -71,24 +71,24 @@ class Env { // Pipeline Parallel private: - static int &pipelineValue() { + static int &pipelineStageValue() { static int value = 1; return value; } public: - static void initPipeline() { + static void initPipelineStage() { char *xft_pipeline_value = getenv("XFT_PIPELINE_STAGES"); if (xft_pipeline_value != NULL) { int value = atoi(xft_pipeline_value); if (value >= 1) - pipelineValue() = value; + pipelineStageValue() = value; } else { - pipelineValue() = 1; + pipelineStageValue() = 1; } } - static int getPipeline() { return pipelineValue(); } + static int getPipelineStage() { return pipelineStageValue(); } }; #define GEMMVERBOSE(api_func, compute_func) \ From 8fa50223e1e73f1f478765ef76bbd9d585f3941d Mon Sep 17 00:00:00 2001 From: changqi1 Date: Thu, 8 Feb 2024 12:28:00 +0800 Subject: [PATCH 12/19] enable shm --- CMakeLists.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 8a91baa5..4e8ce36c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -145,7 +145,7 @@ add_definitions(-DAVX512_FP32_WEIGHT_ONLY_NF4=true) # add_definitions(-DDEBUG=true) # add_definitions(-DSTEP_BY_STEP_ATTN=true) -# add_definitions(-DUSE_SHM=true) +add_definitions(-DUSE_SHM=true) option(XFT_BUILD_TESTS "Build xfastertransformer unit tests" OFF) # timeline event From 155f85e86379c70d5ab1660d18ab9e9eb9686c63 Mon Sep 17 00:00:00 2001 From: changqi1 Date: Thu, 8 Feb 2024 14:03:07 +0800 Subject: [PATCH 13/19] Add message --- src/comm_helper/comm_helper.cpp | 16 ++++++++++++++++ src/models/common_decoder.h | 6 ++++-- src/searchers/greedy_search.cpp | 14 ++++++++++++++ src/utils/messenger.h | 24 ++++++++++++++++++++++++ 4 files changed, 58 insertions(+), 2 deletions(-) diff --git a/src/comm_helper/comm_helper.cpp b/src/comm_helper/comm_helper.cpp index fda32324..96b7e21f 100644 --- a/src/comm_helper/comm_helper.cpp +++ b/src/comm_helper/comm_helper.cpp @@ -105,4 +105,20 @@ extern "C" void broadcast(int *buf, size_t count) { extern "C" void allgatherv( const float *sendBuf, size_t count, float *recvBuf, const std::vector &recvCounts) { ccl::allgatherv(sendBuf, count, recvBuf, recvCounts, *pcomm).wait(); +} + +extern "C" void worldSendFP32(const float *buf, int count, int dest, int tag) { + MPI_Send((const void *)buf, count, MPI_FLOAT, dest, tag, MPI_COMM_WORLD); +} + +extern "C" void worldRecvFP32(float *buf, int count, int source, int tag) { + MPI_Recv((void *)buf, count, MPI_FLOAT, source, tag, MPI_COMM_WORLD, MPI_STATUS_IGNORE); +} + +extern "C" void worldSendINT32(const int32_t *buf, int count, int dest, int tag) { + MPI_Send((const void *)buf, count, MPI_INT32_T, dest, tag, MPI_COMM_WORLD); +} + +extern "C" void worldRecvINT32(int32_t *buf, int count, int source, int tag) { + MPI_Recv((void *)buf, count, MPI_INT32_T, source, tag, MPI_COMM_WORLD, MPI_STATUS_IGNORE); } \ No newline at end of file diff --git a/src/models/common_decoder.h b/src/models/common_decoder.h index f8d42149..fa2bdef6 100644 --- a/src/models/common_decoder.h +++ b/src/models/common_decoder.h @@ -19,8 +19,6 @@ #include #include -#include - #include "INIReader.h" #include "abstract_decoder.h" #include "attention.h" @@ -316,6 +314,8 @@ class CommonDecoder : public AbstractDecoder { // [MPI] Recv data from world_rank 0 int curr_world_rank = ctx->ppRank * ctx->tpSize + ctx->tpRank; int prev_world_rank = (ctx->ppRank - 1) * ctx->tpSize + ctx->tpRank; + // TODO: Error: different scope when dynamic loading so file + // this->messenger.worldRecvFP32(embBuf, batchSize * inputSeqLen * ctx->hiddenSize, prev_world_rank, curr_world_rank); MPI_Recv(embBuf, batchSize * inputSeqLen * ctx->hiddenSize, MPI_FLOAT, prev_world_rank, curr_world_rank, MPI_COMM_WORLD, MPI_STATUS_IGNORE); } @@ -377,6 +377,8 @@ class CommonDecoder : public AbstractDecoder { // If current pipeline stage isn't the end of stage, return nullptr // [MPI] Send data to next pipeline stage int next_world_rank = (ctx->ppRank + 1) * ctx->tpSize + ctx->tpRank; + // TODO: Error: different scope when dynamic loading so file + // this->messenger.worldSendFP32(embBuf, batchSize * inputSeqLen * ctx->hiddenSize, next_world_rank, next_world_rank); MPI_Send(embBuf, batchSize * inputSeqLen * ctx->hiddenSize, MPI_FLOAT, next_world_rank, next_world_rank, MPI_COMM_WORLD); return std::tuple(nullptr, 0, 0); diff --git a/src/searchers/greedy_search.cpp b/src/searchers/greedy_search.cpp index 5f807811..73739024 100644 --- a/src/searchers/greedy_search.cpp +++ b/src/searchers/greedy_search.cpp @@ -14,6 +14,7 @@ // ============================================================================ #include "greedy_search.h" #include "search_utils.h" +#include "messenger.h" GreedySearch::GreedySearch(AbstractDecoder &dec, const SearcherConfig &config) : decoder(dec), maxLen(config.maxLen), step(0), repetitionPenalty(config.repetitionPenalty) { @@ -47,18 +48,25 @@ std::vector GreedySearch::getNextToken(int *ids, int batchSize, int seqLen) std::tuple result = decoder.forward(ids, dims, this->step++); DecoderContext *ctx = decoder.getContext(); + // Messenger &messenger = decoder.getMessenger(); + if (std::get<0>(result) == nullptr) { // The first embedding pipeline parallel stage this->nextTokens = std::vector(batchSize, 0); if (ctx->ppSize > 1 && ctx->ppRank == 0) { int predictor_world_rank = (ctx->ppSize - 1) * ctx->tpSize + ctx->tpRank; + // TODO: Error: different scope when dynamic loading so file + // messenger.worldRecvINT32(this->nextTokens.data(), batchSize, predictor_world_rank, predictor_world_rank); MPI_Recv(this->nextTokens.data(), batchSize, MPI_INT32_T, predictor_world_rank, predictor_world_rank, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + } } else { // The last predictor pipeline parallel stage this->nextTokens = search(result); if (ctx->ppSize > 1 && ctx->ppRank == ctx->ppSize - 1) { int embedding_world_rank = 0 * ctx->tpSize + ctx->tpRank; int predictor_world_rank = (ctx->ppSize - 1) * ctx->tpSize + ctx->tpRank; + // TODO: Error: different scope when dynamic loading so file + // messenger.worldSendINT32(this->nextTokens.data(), batchSize, embedding_world_rank, predictor_world_rank); MPI_Send(this->nextTokens.data(), batchSize, MPI_INT32_T, embedding_world_rank, predictor_world_rank, MPI_COMM_WORLD); } @@ -79,10 +87,14 @@ std::vector GreedySearch::getNextToken() { std::tuple result = decoder.forward(nextTokens.data(), dims, this->step++); DecoderContext *ctx = decoder.getContext(); + // Messenger &messenger = decoder.getMessenger(); + if (std::get<0>(result) == nullptr) { // The first embedding pipeline parallel stage this->nextTokens = std::vector(batchSize, 0); if (ctx->ppSize > 1 && ctx->ppRank == 0) { int predictor_world_rank = (ctx->ppSize - 1) * ctx->tpSize + ctx->tpRank; + // TODO: Error: different scope when dynamic loading so file + // messenger.worldRecvINT32(this->nextTokens.data(), batchSize, predictor_world_rank, predictor_world_rank); MPI_Recv(this->nextTokens.data(), batchSize, MPI_INT32_T, predictor_world_rank, predictor_world_rank, MPI_COMM_WORLD, MPI_STATUS_IGNORE); } @@ -91,6 +103,8 @@ std::vector GreedySearch::getNextToken() { if (ctx->ppSize > 1 && ctx->ppRank == ctx->ppSize - 1) { int embedding_world_rank = 0 * ctx->tpSize + ctx->tpRank; int predictor_world_rank = (ctx->ppSize - 1) * ctx->tpSize + ctx->tpRank; + // TODO: Error: different scope when dynamic loading so file + // messenger.worldSendINT32(this->nextTokens.data(), batchSize, embedding_world_rank, predictor_world_rank); MPI_Send(this->nextTokens.data(), batchSize, MPI_INT32_T, embedding_world_rank, predictor_world_rank, MPI_COMM_WORLD); } diff --git a/src/utils/messenger.h b/src/utils/messenger.h index 70460a70..3cd2cf15 100644 --- a/src/utils/messenger.h +++ b/src/utils/messenger.h @@ -54,6 +54,10 @@ class Messenger { helperBroadcast = (void (*)(int *, size_t))dlsym(commHelperHanlde, "broadcast"); helperAllgatherv = (void (*)(const float *, size_t, float *, const std::vector &))dlsym( commHelperHanlde, "allgatherv"); + helperWorldSendFP32 = (void (*)(const float *buf, int count, int dest, int tag))dlsym(commHelperHanlde, "worldSendFP32"); + helperWorldRecvFP32 = (void (*)(float *buf, int count, int source, int tag))dlsym(commHelperHanlde, "worldRecvFP32"); + helperWorldSendINT32 = (void (*)(const int32_t *buf, int count, int dest, int tag))dlsym(commHelperHanlde, "worldSendINT32"); + helperWorldRecvINT32 = (void (*)(int32_t *buf, int count, int source, int tag))dlsym(commHelperHanlde, "worldRecvINT32"); atexit(Messenger::mpi_finalize); @@ -148,6 +152,22 @@ class Messenger { if (check()) { (*helperAllgatherv)(send_buf, count, recv_buf, recv_counts); } } + void worldSendFP32(const float *buf, int count, int dest, int tag) { + if (check()) { (*helperWorldSendFP32)(buf, count, dest, tag); } + } + + void worldRecvFP32(float *buf, int count, int source, int tag) { + if (check()) { (*helperWorldRecvFP32)(buf, count, source, tag); } + } + + void worldSendINT32(const int32_t *buf, int count, int dest, int tag) { + if (check()) { (*helperWorldSendINT32)(buf, count, dest, tag); } + } + + void worldRecvINT32(int32_t *buf, int count, int source, int tag) { + if (check()) { (*helperWorldRecvINT32)(buf, count, source, tag); } + } + bool withMpirun() { return (std::getenv("MPI_LOCALRANKID") || std::getenv("MPI_LOCALNRANKS") || std::getenv("PMI_RANK") || std::getenv("PMI_SIZE") || std::getenv("PMIX_RANK")) @@ -193,4 +213,8 @@ class Messenger { void (*helperAllreduceBF16)(bfloat16_t *, bfloat16_t *, size_t); void (*helperBroadcast)(int *, size_t); void (*helperAllgatherv)(const float *, size_t, float *, const std::vector &); + void (*helperWorldSendFP32)(const float *buf, int count, int dest, int tag); + void (*helperWorldRecvFP32)(float *buf, int count, int source, int tag); + void (*helperWorldSendINT32)(const int32_t *buf, int count, int dest, int tag); + void (*helperWorldRecvINT32)(int32_t *buf, int count, int source, int tag); }; From 9d002e7f61b206a7a989577a67d365c416d2bb3f Mon Sep 17 00:00:00 2001 From: changqi1 Date: Thu, 8 Feb 2024 14:05:59 +0800 Subject: [PATCH 14/19] format code --- src/searchers/greedy_search.cpp | 3 +-- src/utils/messenger.h | 12 ++++++++---- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/src/searchers/greedy_search.cpp b/src/searchers/greedy_search.cpp index 73739024..f1439c38 100644 --- a/src/searchers/greedy_search.cpp +++ b/src/searchers/greedy_search.cpp @@ -13,8 +13,8 @@ // limitations under the License. // ============================================================================ #include "greedy_search.h" -#include "search_utils.h" #include "messenger.h" +#include "search_utils.h" GreedySearch::GreedySearch(AbstractDecoder &dec, const SearcherConfig &config) : decoder(dec), maxLen(config.maxLen), step(0), repetitionPenalty(config.repetitionPenalty) { @@ -58,7 +58,6 @@ std::vector GreedySearch::getNextToken(int *ids, int batchSize, int seqLen) // messenger.worldRecvINT32(this->nextTokens.data(), batchSize, predictor_world_rank, predictor_world_rank); MPI_Recv(this->nextTokens.data(), batchSize, MPI_INT32_T, predictor_world_rank, predictor_world_rank, MPI_COMM_WORLD, MPI_STATUS_IGNORE); - } } else { // The last predictor pipeline parallel stage this->nextTokens = search(result); diff --git a/src/utils/messenger.h b/src/utils/messenger.h index 3cd2cf15..d4e460b9 100644 --- a/src/utils/messenger.h +++ b/src/utils/messenger.h @@ -54,10 +54,14 @@ class Messenger { helperBroadcast = (void (*)(int *, size_t))dlsym(commHelperHanlde, "broadcast"); helperAllgatherv = (void (*)(const float *, size_t, float *, const std::vector &))dlsym( commHelperHanlde, "allgatherv"); - helperWorldSendFP32 = (void (*)(const float *buf, int count, int dest, int tag))dlsym(commHelperHanlde, "worldSendFP32"); - helperWorldRecvFP32 = (void (*)(float *buf, int count, int source, int tag))dlsym(commHelperHanlde, "worldRecvFP32"); - helperWorldSendINT32 = (void (*)(const int32_t *buf, int count, int dest, int tag))dlsym(commHelperHanlde, "worldSendINT32"); - helperWorldRecvINT32 = (void (*)(int32_t *buf, int count, int source, int tag))dlsym(commHelperHanlde, "worldRecvINT32"); + helperWorldSendFP32 + = (void (*)(const float *buf, int count, int dest, int tag))dlsym(commHelperHanlde, "worldSendFP32"); + helperWorldRecvFP32 + = (void (*)(float *buf, int count, int source, int tag))dlsym(commHelperHanlde, "worldRecvFP32"); + helperWorldSendINT32 + = (void (*)(const int32_t *buf, int count, int dest, int tag))dlsym(commHelperHanlde, "worldSendINT32"); + helperWorldRecvINT32 + = (void (*)(int32_t *buf, int count, int source, int tag))dlsym(commHelperHanlde, "worldRecvINT32"); atexit(Messenger::mpi_finalize); From 33d542726590b8f4a0ba9121f52a5500c93a1dde Mon Sep 17 00:00:00 2001 From: changqi1 Date: Sun, 18 Feb 2024 12:22:16 +0800 Subject: [PATCH 15/19] format code --- src/models/common_decoder.h | 16 +++++++--------- src/utils/messenger.h | 21 +++++++++------------ 2 files changed, 16 insertions(+), 21 deletions(-) diff --git a/src/models/common_decoder.h b/src/models/common_decoder.h index fa2bdef6..a5fb6636 100644 --- a/src/models/common_decoder.h +++ b/src/models/common_decoder.h @@ -311,13 +311,12 @@ class CommonDecoder : public AbstractDecoder { // if current pipeline parallel stage rank isn't the first stage, should receive previous stage data if (ctx->ppSize > 1 && ctx->ppRank > 0) { - // [MPI] Recv data from world_rank 0 int curr_world_rank = ctx->ppRank * ctx->tpSize + ctx->tpRank; int prev_world_rank = (ctx->ppRank - 1) * ctx->tpSize + ctx->tpRank; + int count = batchSize * inputSeqLen * ctx->hiddenSize; + MPI_Recv(embBuf, count, MPI_FLOAT, prev_world_rank, curr_world_rank, MPI_COMM_WORLD, MPI_STATUS_IGNORE); // TODO: Error: different scope when dynamic loading so file - // this->messenger.worldRecvFP32(embBuf, batchSize * inputSeqLen * ctx->hiddenSize, prev_world_rank, curr_world_rank); - MPI_Recv(embBuf, batchSize * inputSeqLen * ctx->hiddenSize, MPI_FLOAT, prev_world_rank, curr_world_rank, - MPI_COMM_WORLD, MPI_STATUS_IGNORE); + // this->messenger.worldRecvFP32(embBuf, count, prev_world_rank, curr_world_rank); } // Decoder: forward @@ -373,14 +372,13 @@ class CommonDecoder : public AbstractDecoder { } } + // If current pipeline stage isn't the end of stage, should send data to next stage and return nullptr if (ctx->ppSize > 1 && ctx->ppRank < ctx->ppSize - 1) { - // If current pipeline stage isn't the end of stage, return nullptr - // [MPI] Send data to next pipeline stage int next_world_rank = (ctx->ppRank + 1) * ctx->tpSize + ctx->tpRank; + int count = batchSize * inputSeqLen * ctx->hiddenSize; + MPI_Send(embBuf, count, MPI_FLOAT, next_world_rank, next_world_rank, MPI_COMM_WORLD); // TODO: Error: different scope when dynamic loading so file - // this->messenger.worldSendFP32(embBuf, batchSize * inputSeqLen * ctx->hiddenSize, next_world_rank, next_world_rank); - MPI_Send(embBuf, batchSize * inputSeqLen * ctx->hiddenSize, MPI_FLOAT, next_world_rank, next_world_rank, - MPI_COMM_WORLD); + // this->messenger.worldSendFP32(embBuf, count, next_world_rank, next_world_rank); return std::tuple(nullptr, 0, 0); } diff --git a/src/utils/messenger.h b/src/utils/messenger.h index d4e460b9..1c37a65a 100644 --- a/src/utils/messenger.h +++ b/src/utils/messenger.h @@ -54,14 +54,11 @@ class Messenger { helperBroadcast = (void (*)(int *, size_t))dlsym(commHelperHanlde, "broadcast"); helperAllgatherv = (void (*)(const float *, size_t, float *, const std::vector &))dlsym( commHelperHanlde, "allgatherv"); - helperWorldSendFP32 - = (void (*)(const float *buf, int count, int dest, int tag))dlsym(commHelperHanlde, "worldSendFP32"); - helperWorldRecvFP32 - = (void (*)(float *buf, int count, int source, int tag))dlsym(commHelperHanlde, "worldRecvFP32"); - helperWorldSendINT32 - = (void (*)(const int32_t *buf, int count, int dest, int tag))dlsym(commHelperHanlde, "worldSendINT32"); - helperWorldRecvINT32 - = (void (*)(int32_t *buf, int count, int source, int tag))dlsym(commHelperHanlde, "worldRecvINT32"); + + helperWorldSendFP32 = (void (*)(const float *, int, int, int))dlsym(commHelperHanlde, "worldSendFP32"); + helperWorldRecvFP32 = (void (*)(float *, int, int, int))dlsym(commHelperHanlde, "worldRecvFP32"); + helperWorldSendINT32 = (void (*)(const int32_t *, int, int, int))dlsym(commHelperHanlde, "worldSendINT32"); + helperWorldRecvINT32 = (void (*)(int32_t *, int, int, int))dlsym(commHelperHanlde, "worldRecvINT32"); atexit(Messenger::mpi_finalize); @@ -217,8 +214,8 @@ class Messenger { void (*helperAllreduceBF16)(bfloat16_t *, bfloat16_t *, size_t); void (*helperBroadcast)(int *, size_t); void (*helperAllgatherv)(const float *, size_t, float *, const std::vector &); - void (*helperWorldSendFP32)(const float *buf, int count, int dest, int tag); - void (*helperWorldRecvFP32)(float *buf, int count, int source, int tag); - void (*helperWorldSendINT32)(const int32_t *buf, int count, int dest, int tag); - void (*helperWorldRecvINT32)(int32_t *buf, int count, int source, int tag); + void (*helperWorldSendFP32)(const float *, int, int, int); + void (*helperWorldRecvFP32)(float *, int, int, int); + void (*helperWorldSendINT32)(const int32_t *, int, int, int); + void (*helperWorldRecvINT32)(int32_t *, int, int, int); }; From 79bded3c9967d34c4417cb3cfb165051083a6643 Mon Sep 17 00:00:00 2001 From: changqi1 Date: Sun, 18 Feb 2024 12:35:32 +0800 Subject: [PATCH 16/19] clear code --- src/searchers/greedy_search.cpp | 86 ++++++++++++--------------------- src/searchers/greedy_search.h | 1 + 2 files changed, 32 insertions(+), 55 deletions(-) diff --git a/src/searchers/greedy_search.cpp b/src/searchers/greedy_search.cpp index f1439c38..8d5b20e5 100644 --- a/src/searchers/greedy_search.cpp +++ b/src/searchers/greedy_search.cpp @@ -28,25 +28,7 @@ GreedySearch::GreedySearch(AbstractDecoder &dec, const SearcherConfig &config) stopWordsIndex = {}; } -// Get next tokens accoring to the prompt IDs -std::vector GreedySearch::getNextToken(int *ids, int batchSize, int seqLen) { - TimeLine t("1st Token"); - this->step = 0; - this->batchSize = batchSize; - this->curLen = seqLen; - this->doneBatch = std::vector(batchSize, 0); - - if (!this->stopWordsList.empty()) { - stopWordsIndex = std::vector>(stopWordsList.size(), std::vector(batchSize, 0)); - } - - this->output.resize(batchSize * seqLen); - std::copy(ids, ids + batchSize * seqLen, output.begin()); - - int64_t dims[3] = {batchSize, 1, seqLen}; - - std::tuple result = decoder.forward(ids, dims, this->step++); - +std::vector GreedySearch::syncToken(std::tuple &result) { DecoderContext *ctx = decoder.getContext(); // Messenger &messenger = decoder.getMessenger(); @@ -54,20 +36,20 @@ std::vector GreedySearch::getNextToken(int *ids, int batchSize, int seqLen) this->nextTokens = std::vector(batchSize, 0); if (ctx->ppSize > 1 && ctx->ppRank == 0) { int predictor_world_rank = (ctx->ppSize - 1) * ctx->tpSize + ctx->tpRank; - // TODO: Error: different scope when dynamic loading so file - // messenger.worldRecvINT32(this->nextTokens.data(), batchSize, predictor_world_rank, predictor_world_rank); MPI_Recv(this->nextTokens.data(), batchSize, MPI_INT32_T, predictor_world_rank, predictor_world_rank, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + // TODO: Error: different scope when dynamic loading so file + // messenger.worldRecvINT32(this->nextTokens.data(), batchSize, predictor_world_rank, predictor_world_rank); } } else { // The last predictor pipeline parallel stage - this->nextTokens = search(result); + this->nextTokens = this->search(result); if (ctx->ppSize > 1 && ctx->ppRank == ctx->ppSize - 1) { int embedding_world_rank = 0 * ctx->tpSize + ctx->tpRank; int predictor_world_rank = (ctx->ppSize - 1) * ctx->tpSize + ctx->tpRank; - // TODO: Error: different scope when dynamic loading so file - // messenger.worldSendINT32(this->nextTokens.data(), batchSize, embedding_world_rank, predictor_world_rank); MPI_Send(this->nextTokens.data(), batchSize, MPI_INT32_T, embedding_world_rank, predictor_world_rank, MPI_COMM_WORLD); + // TODO: Error: different scope when dynamic loading so file + // messenger.worldSendINT32(this->nextTokens.data(), batchSize, embedding_world_rank, predictor_world_rank); } } @@ -79,42 +61,36 @@ std::vector GreedySearch::getNextToken(int *ids, int batchSize, int seqLen) return this->nextTokens; } -// Get next tokens according to previous predicted ID +// Get next tokens accoring to the prompt IDs for first token +std::vector GreedySearch::getNextToken(int *ids, int batchSize, int seqLen) { + TimeLine t("1st Token"); + this->step = 0; + this->batchSize = batchSize; + this->curLen = seqLen; + this->doneBatch = std::vector(batchSize, 0); + + if (!this->stopWordsList.empty()) { + stopWordsIndex = std::vector>(stopWordsList.size(), std::vector(batchSize, 0)); + } + + this->output.resize(batchSize * seqLen); + std::copy(ids, ids + batchSize * seqLen, output.begin()); + + int64_t dims[3] = {batchSize, 1, seqLen}; + + std::tuple result = decoder.forward(ids, dims, this->step++); + + return this->syncToken(result); +} + +// Get next tokens according to previous predicted ID for next tokens std::vector GreedySearch::getNextToken() { TimeLine t("Next Token"); int64_t dims[3] = {batchSize, 1, 1}; - std::tuple result = decoder.forward(nextTokens.data(), dims, this->step++); - - DecoderContext *ctx = decoder.getContext(); - // Messenger &messenger = decoder.getMessenger(); - - if (std::get<0>(result) == nullptr) { // The first embedding pipeline parallel stage - this->nextTokens = std::vector(batchSize, 0); - if (ctx->ppSize > 1 && ctx->ppRank == 0) { - int predictor_world_rank = (ctx->ppSize - 1) * ctx->tpSize + ctx->tpRank; - // TODO: Error: different scope when dynamic loading so file - // messenger.worldRecvINT32(this->nextTokens.data(), batchSize, predictor_world_rank, predictor_world_rank); - MPI_Recv(this->nextTokens.data(), batchSize, MPI_INT32_T, predictor_world_rank, predictor_world_rank, - MPI_COMM_WORLD, MPI_STATUS_IGNORE); - } - } else { // The last predictor pipeline parallel stage - this->nextTokens = search(result); - if (ctx->ppSize > 1 && ctx->ppRank == ctx->ppSize - 1) { - int embedding_world_rank = 0 * ctx->tpSize + ctx->tpRank; - int predictor_world_rank = (ctx->ppSize - 1) * ctx->tpSize + ctx->tpRank; - // TODO: Error: different scope when dynamic loading so file - // messenger.worldSendINT32(this->nextTokens.data(), batchSize, embedding_world_rank, predictor_world_rank); - MPI_Send(this->nextTokens.data(), batchSize, MPI_INT32_T, embedding_world_rank, predictor_world_rank, - MPI_COMM_WORLD); - } - } - this->curLen++; - for (int batchId = 0; batchId < batchSize; ++batchId) { - output.insert(output.begin() + (batchId + 1) * curLen - 1, nextTokens[batchId]); - } + std::tuple result = decoder.forward(nextTokens.data(), dims, this->step++); - return this->nextTokens; + return this->syncToken(result); } bool GreedySearch::isDone() { diff --git a/src/searchers/greedy_search.h b/src/searchers/greedy_search.h index ee478875..af33e5a9 100644 --- a/src/searchers/greedy_search.h +++ b/src/searchers/greedy_search.h @@ -34,6 +34,7 @@ class GreedySearch : public AbstractSearcher { bool setStopWords(std::vector> stopWordsList); private: + std::vector syncToken(std::tuple &result); std::vector search(std::tuple &result); AbstractDecoder &decoder; From 841c3dbc537adfb7657d79678bb85c51d7f54ec3 Mon Sep 17 00:00:00 2001 From: changqi1 Date: Sun, 18 Feb 2024 13:04:12 +0800 Subject: [PATCH 17/19] Add build option --- CMakeLists.txt | 7 +++++++ src/models/common_decoder.h | 4 ++++ src/searchers/greedy_search.cpp | 5 +++++ src/utils/verbose.h | 4 ++++ 4 files changed, 20 insertions(+) diff --git a/CMakeLists.txt b/CMakeLists.txt index 4e8ce36c..21783d16 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -132,6 +132,13 @@ else() list(APPEND 3RDPART_LIB_LIST "xdnn_static") endif() +# pipeline parallel feature +option(WITH_PIPELINE_PARALLEL "Build with pipeline parallel" OFF) +if(WITH_PIPELINE_PARALLEL) + message(STATUS "Notice: Building with pipeline parallel.") + add_definitions(-DPIPELINE_PARALLEL=true) +endif() + # Enable AVX512_FP16 optimization # add_definitions(-DAVX512_FP32_WEIGHT_ONLY_FP16=true) add_definitions(-DAVX512_FP16_WEIGHT_ONLY_FP16=true) diff --git a/src/models/common_decoder.h b/src/models/common_decoder.h index a5fb6636..02db06a9 100644 --- a/src/models/common_decoder.h +++ b/src/models/common_decoder.h @@ -309,6 +309,7 @@ class CommonDecoder : public AbstractDecoder { int *positionIds = this->getPositionIds(ids, batchSize, inputSeqLen, step + this->prefixSharing); t1.release(); +#ifdef PIPELINE_PARALLEL // if current pipeline parallel stage rank isn't the first stage, should receive previous stage data if (ctx->ppSize > 1 && ctx->ppRank > 0) { int curr_world_rank = ctx->ppRank * ctx->tpSize + ctx->tpRank; @@ -318,6 +319,7 @@ class CommonDecoder : public AbstractDecoder { // TODO: Error: different scope when dynamic loading so file // this->messenger.worldRecvFP32(embBuf, count, prev_world_rank, curr_world_rank); } +#endif // Decoder: forward int hiddenSize = ctx->hiddenSize; @@ -372,6 +374,7 @@ class CommonDecoder : public AbstractDecoder { } } +#ifdef PIPELINE_PARALLEL // If current pipeline stage isn't the end of stage, should send data to next stage and return nullptr if (ctx->ppSize > 1 && ctx->ppRank < ctx->ppSize - 1) { int next_world_rank = (ctx->ppRank + 1) * ctx->tpSize + ctx->tpRank; @@ -381,6 +384,7 @@ class CommonDecoder : public AbstractDecoder { // this->messenger.worldSendFP32(embBuf, count, next_world_rank, next_world_rank); return std::tuple(nullptr, 0, 0); } +#endif // Prepare input for final Layer Norm (only care about the last row of the result) // Shape of embBuf: (bs, seqLen, hiddenSize) diff --git a/src/searchers/greedy_search.cpp b/src/searchers/greedy_search.cpp index 8d5b20e5..0e55648e 100644 --- a/src/searchers/greedy_search.cpp +++ b/src/searchers/greedy_search.cpp @@ -29,6 +29,8 @@ GreedySearch::GreedySearch(AbstractDecoder &dec, const SearcherConfig &config) } std::vector GreedySearch::syncToken(std::tuple &result) { + // send data from last predictor stage to first embedding stage in pipeline parallel +#ifdef PIPELINE_PARALLEL DecoderContext *ctx = decoder.getContext(); // Messenger &messenger = decoder.getMessenger(); @@ -52,6 +54,9 @@ std::vector GreedySearch::syncToken(std::tuple &result) // messenger.worldSendINT32(this->nextTokens.data(), batchSize, embedding_world_rank, predictor_world_rank); } } +#else + this->nextTokens = this->search(result); +#endif this->curLen++; for (int batchId = 0; batchId < batchSize; ++batchId) { diff --git a/src/utils/verbose.h b/src/utils/verbose.h index 3d11cff1..e19c9103 100644 --- a/src/utils/verbose.h +++ b/src/utils/verbose.h @@ -80,9 +80,13 @@ class Env { static void initPipelineStage() { char *xft_pipeline_value = getenv("XFT_PIPELINE_STAGES"); if (xft_pipeline_value != NULL) { +#ifdef PIPELINE_PARALLEL int value = atoi(xft_pipeline_value); if (value >= 1) pipelineStageValue() = value; +#else + printf("[WARNING] XFT_PIPELINE_STAGES need to build with WITH_PIPELINE_PARALLEL=ON.\n"); +#endif } else { pipelineStageValue() = 1; } From d8a4f311a7a042962f61fba7f47e6e0c9944fada Mon Sep 17 00:00:00 2001 From: changqi1 Date: Sun, 18 Feb 2024 16:45:15 +0800 Subject: [PATCH 18/19] decouple mpi --- src/models/CMakeLists.txt | 12 +++++++----- src/utils/messenger.h | 2 +- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/src/models/CMakeLists.txt b/src/models/CMakeLists.txt index 7ee1119d..8b5bc081 100644 --- a/src/models/CMakeLists.txt +++ b/src/models/CMakeLists.txt @@ -14,12 +14,14 @@ # ============================================================================ cmake_minimum_required(VERSION 3.15.1) -find_package(MPI REQUIRED) -include_directories(${MPI_INCLUDE_PATH}) -add_definitions(${MPI_CXX_COMPILE_FLAGS}) - aux_source_directory(${CMAKE_CURRENT_SOURCE_DIR} MODEL_SRCS) add_library(models OBJECT ${MODEL_SRCS}) add_dependencies(models utils) -target_link_libraries(models ${MPI_CXX_LIBRARIES}) \ No newline at end of file + +if(WITH_PIPELINE_PARALLEL) + find_package(MPI REQUIRED) + include_directories(${MPI_INCLUDE_PATH}) + add_definitions(${MPI_CXX_COMPILE_FLAGS}) + target_link_libraries(models ${MPI_CXX_LIBRARIES}) +endif() \ No newline at end of file diff --git a/src/utils/messenger.h b/src/utils/messenger.h index 1c37a65a..f0325b21 100644 --- a/src/utils/messenger.h +++ b/src/utils/messenger.h @@ -201,7 +201,7 @@ class Messenger { private: int size; int rank; - int color; + int color; // Processes with the same color will be placed into the same sub-communicator bool localRanksFlag; #ifdef USE_SHM From 40bacdb29ba980f230515e25d78aa8a95837462b Mon Sep 17 00:00:00 2001 From: changqi1 Date: Mon, 19 Feb 2024 11:22:51 +0800 Subject: [PATCH 19/19] Add more TODO --- src/models/common_decoder.h | 2 ++ src/searchers/sample_search.cpp | 2 ++ 2 files changed, 4 insertions(+) diff --git a/src/models/common_decoder.h b/src/models/common_decoder.h index 02db06a9..5e4e86f0 100644 --- a/src/models/common_decoder.h +++ b/src/models/common_decoder.h @@ -212,6 +212,7 @@ class CommonDecoder : public AbstractDecoder { if (layers % ctx->ppSize != 0) { std::cerr << "Warning: layers cannot be evenly divided by pipeline parallel stage size(ppSize)." << std::endl; + std::exit(-1); } int layers_per_pp_stage = layers / ctx->ppSize; @@ -483,6 +484,7 @@ class CommonDecoder : public AbstractDecoder { t1.release(); // Decoder: forward + // TODO: Add PIPELINE_PARALLEL feature int hiddenSize = ctx->hiddenSize; for (int i = 0; i < this->decoders.size(); ++i) { int workers = this->messenger.getSize(); diff --git a/src/searchers/sample_search.cpp b/src/searchers/sample_search.cpp index 6ae6c2db..872f5dc8 100644 --- a/src/searchers/sample_search.cpp +++ b/src/searchers/sample_search.cpp @@ -59,6 +59,7 @@ std::vector SampleSearch::getNextToken(int *ids, int batchSize, int seqLen) std::tuple result = decoder.forward(ids, dims, this->step++); nextTokens.resize(batchSize); + // TODO: Add PIPELINE_PARALLEL feature sample(result); this->curLen++; @@ -75,6 +76,7 @@ std::vector SampleSearch::getNextToken() { int64_t dims[3] = {batchSize, 1, 1}; std::tuple result = decoder.forward(nextTokens.data(), dims, this->step++); + // TODO: Add PIPELINE_PARALLEL feature sample(result); this->curLen++;