@@ -15,173 +15,151 @@ Status CompressorAction::Setup(const Config& config)
1515{
1616 backend_ = config.storeBackend ;
1717 shardSize_ = config.shardSize ;
18+ decompressThreadNum = config.decompressThreadNum ;
19+
1820 switch (config.compressRatio ) {
1921 case 32 : ratio = R1; break ;
20- case 24 : ratio = R133; break ;
21- case 23 : ratio = R139; break ;
22- case 22 : ratio = R145; break ;
23- case 21 : ratio = R152; break ;
2422 case 16 : ratio = R200; break ;
25- default : return Status::InvalidParam (" invalid compressRatio({})" , config.compressRatio );
23+ default : return Status::InvalidParam (" Invalid compressRatio({})" , config.compressRatio );
2624 }
2725
2826 switch (config.dataType ) {
2927 case 0 : dataType = DT_BF16; break ;
30- default : return Status::InvalidParam (" invalid compress dataType({})" , config.dataType );
28+ default : return Status::InvalidParam (" Invalid compress dataType({})" , config.dataType );
3129 }
3230
3331 // init thread pool
34- dump_pool_.SetNWorker (config.streamNumber / 2 )
32+ dump_pool_.SetNWorker (config.streamNumber >> 1 )
3533 .SetWorkerFn ([this ](auto & ct, auto &) { Compress_Dump (ct); })
3634 .Run ();
35+
36+ load_pool_.SetNWorker (decompressThreadNum)
37+ .SetWorkerFn ([this ](auto & ct, auto &) { Compress_Load (ct); })
38+ .Run ();
39+
40+ UC_DEBUG (" Compress Storge Load thread num: {}." , decompressThreadNum);
3741
3842 threadBuf_ = std::make_unique<uint8_t []>(shardSize_);
3943 return Status::OK ();
4044}
4145
4246void CompressorAction::Push (TaskPtr task, WaiterPtr waiter)
4347{
44- UC_DEBUG (" task {}, push size is {}" , task->id , task->desc .size ());
45- // waiter->Set(1);
48+ UC_DEBUG (" task {}, push size: {}" , task->id , task->desc .size ());
49+ waiter->Set (1 );
4650 if (task->type == TransTask::Type::DUMP) {
4751 dump_pool_.Push (CompressTask {
4852 task,
4953 waiter
5054 });
51- } else {
52- // load_pool_.Push(CompressTask {
53- // task,
54- // waiter
55- // });
56- }
55+ } else if (task-> type == TransTask::Type::LOAD) {
56+ load_pool_.Push (CompressTask {
57+ task,
58+ waiter
59+ });
60+ }
5761}
5862
59- void CompressorAction::Compress_Load (TaskPtr t, WaiterPtr w )
63+ void CompressorAction::Compress_Load (CompressTask& ct )
6064{
61- #ifdef USE_C_COMPRESS
62- auto t0 = std::chrono::high_resolution_clock::now ();
63- auto result = backend_->Load (t->desc );
64- if (result.Value () > 0 ) {
65+ UC_DEBUG (" COMPRESS LOAD START." );
66+ if (ratio == R1) {
67+ auto result = backend_->Load (std::move (ct.task ->desc ));
6568 backend_->Wait (result.Value ());
66- }
67- auto t1 = std::chrono::high_resolution_clock::now ();
68- UC_INFO (" task id {} backend load and wait time: {} us" , t->id , t1 - t0);
69-
70- auto t3 = std::chrono::high_resolution_clock::now ();
71- const auto & shards = t->desc ;
72- const size_t sz = shards.size ();
73- const size_t numThreads = 4 ; // 线程数量
74- std::vector<std::thread> threads;
75- const size_t batch_sz = (sz + numThreads - 1 ) / numThreads;
76- for (size_t t_id = 0 ; t_id < numThreads; ++t_id) {
77- size_t start = t_id * batch_sz;
78- size_t end = std::min (start + batch_sz, sz);
79- if (start < end) {
80- // 直接创建线程并执行逻辑
81- threads.emplace_back ([&, start, end]() {
82- // 每个线程私有的缓冲区,防止竞争
83- std::unique_ptr<uint8_t []> localBuf (new uint8_t [shardSize_]);
84- for (size_t i = start; i < end; ++i) {
85- auto & s = shards[i];
86- uint8_t * src = static_cast <uint8_t *>(s.addrs [0 ]);
87-
88- if (ratio == R1) {
89- continue ;
90- } else {
91- size_t n_bf16 = shardSize_ >> 1 ;
92- TunstallDecompressBF16 ((uint16_t *)localBuf.get (), src, n_bf16);
93- memcpy (src, localBuf.get (), shardSize_);
94- }
69+ } else {
70+ auto result = backend_->Load (ct.task ->desc );
71+ if (result.Value () > 0 ) {
72+ backend_->Wait (result.Value ());
73+ }
74+
75+ const auto & shards = ct.task ->desc ;
76+ const size_t sz = shards.size ();
77+ UC_DEBUG (" shards.size: {}" , sz);
78+
79+ for (size_t i = 0 ; i < sz; ++i) {
80+ if (ratio == R200) {
81+ size_t n_bf16 = shardSize_ >> 1 ;
82+ int err = TunstallDecompressBF16Inplace (static_cast <uint8_t *>(shards[i].addrs [0 ]), n_bf16);
83+ if (err != 0 ) {
84+ UC_ERROR (" Failed to decompress BF16 data for shard {}" , shards[i].index );
9585 }
96- });
86+ }
9787 }
9888 }
99- auto t4 = std::chrono::high_resolution_clock::now ();
100- UC_INFO (" create threads time: {}" , t4 - t3);
101-
102- auto t5 = std::chrono::high_resolution_clock::now ();
103- for (auto & th : threads) th.join ();
104- auto t6 = std::chrono::high_resolution_clock::now ();
105- UC_INFO (" threads finish time: {} thread num({})" , t6- t5, numThreads);
106- #else
107- // to posix load
108- /* 原路径:直接调用 PosixStore */
109- auto result = backend_->Load (std::move (t->desc ));
110- backend_->Wait (result.Value ());
111- UC_INFO (" COMPRESS LOAD END." );
112- #endif
113- // UC_INFO("COMPRESS LOAD END, task: {}", ct.task->id);
114- w->Done ();
89+
90+ ct.waiter ->Done ();
91+ UC_DEBUG (" COMPRESS LOAD END." );
11592}
11693
11794void CompressorAction::Compress_Dump (CompressTask& ct)
11895{
119- #ifdef USE_C_COMPRESS
120- UC_INFO (" COMPRESS DUMP STARTING..." );
121- const auto & desc = ct.task ->desc ;
122- if (desc.empty ()) {
123- UC_INFO (" COMPRESS DUMP desc is empty..." );
124- return ;
125- }
96+ UC_DEBUG (" COMPRESS DUMP STARTING." );
12697
127- size_t srcSize = shardSize_;
128- size_t compBufSize = srcSize + 4096 ; // 压缩后缓冲区的可用大小
129-
130- Detail::TaskDesc backendDesc;
131- backendDesc.brief = ct.task ->desc .brief ;
132- std::vector<void *> blockToFree;
133- std::unique_ptr<MemoryPool> dump_memoryPool_ = std::make_unique<MemoryPool>(compBufSize, ct.task ->desc .size ());
134- if (!dump_memoryPool_) {
135- UC_INFO (" Out of memory: failed to allocate {} B" , shardSize_ * ct.task ->desc .size ());
136- Status::NoSpace ();
137- }
138-
139- for (const UC::Detail::Shard& s : desc) {
140- UC_INFO (" Task id: {} Shard index: {} Compress start..." , ct.task ->id , s.index );
141-
142- uint8_t * compBuf = static_cast <uint8_t *>(dump_memoryPool_->allocate ());
143- uint16_t * src = static_cast <uint16_t *>(s.addrs [0 ]);
144-
145- size_t compBytes = 0 ;
146- if (ratio == R1) {
147- memcpy (compBuf, src, srcSize);
148- compBytes = srcSize;
149- } else {
150- size_t n_bf16 = shardSize_ >> 1 ;
151- compBytes = TunstallCompressBF16 (compBuf, (uint16_t *)src, n_bf16);
98+ if (ratio == R1) {
99+ const auto n = ct.task ->desc .size ();
100+ if (n > 0 ) {
101+ backend_->Dump (std::move (ct.task ->desc ));
152102 }
103+ } else {
104+ const auto & desc = ct.task ->desc ;
105+ if (desc.empty ()) {
106+ UC_ERROR (" COMPRESS DUMP desc is empty..." );
107+ return ;
108+ }
109+
110+ size_t srcSize = shardSize_;
111+ size_t compBufSize = srcSize + 4096 ;
153112
154- std::vector<void *> _addrs{static_cast <void *>(compBuf)};
113+ Detail::TaskDesc backendDesc;
114+ backendDesc.brief = ct.task ->desc .brief ;
115+ std::vector<void *> blockToFree;
116+ std::unique_ptr<MemoryPool> dump_memoryPool_ = std::make_unique<MemoryPool>(compBufSize, ct.task ->desc .size ());
117+ if (!dump_memoryPool_) {
118+ UC_ERROR (" Out of memory: failed to allocate {} B" , shardSize_ * ct.task ->desc .size ());
119+ Status::NoSpace ();
120+ }
155121
156- backendDesc.push_back (Detail::Shard {
157- s.owner ,
158- s.index ,
159- _addrs
160- });
122+ for (const UC::Detail::Shard& s : desc) {
123+ UC_DEBUG (" Task id: {} Shard index: {} Compress start..." , ct.task ->id , s.index );
161124
162- UC_INFO (" Shard index: {} compress end... compBytes is {}" , s.index , compBytes);
163- blockToFree.push_back (static_cast <void *>(compBuf));
164- }
125+ uint8_t * compBuf = static_cast <uint8_t *>(dump_memoryPool_->allocate ());
126+ uint16_t * src = static_cast <uint16_t *>(s.addrs [0 ]);
165127
166- auto res = backend_->Dump (std::move (backendDesc));
128+ size_t compBytes = 0 ;
129+ if (ratio == R200) {
130+ size_t n_bf16 = shardSize_ >> 1 ;
131+ int err = TunstallCompressBF16 (compBuf, (uint16_t *)src, n_bf16);
132+ if (err != 0 ) {
133+ UC_ERROR (" Failed to compress BF16 data for shard {}" , s.index );
134+ }
135+ compBytes = n_bf16;
136+ } else {
137+ UC_ERROR (" Unsupported ratio: {}" , static_cast <int >(ratio));
138+ return ;
139+ }
140+
141+ std::vector<void *> _addrs{static_cast <void *>(compBuf)};
142+
143+ backendDesc.push_back (Detail::Shard {
144+ s.owner ,
145+ s.index ,
146+ _addrs
147+ });
167148
168- if (!blockToFree.empty () && res.Value () > 0 ) {
169- backend_->Wait (res.Value ());
170- dump_memoryPool_->deallocate (blockToFree);
171- }
149+ UC_DEBUG (" Shard index: {} compress end... compBytes is {}" , s.index , compBytes);
150+ blockToFree.push_back (static_cast <void *>(compBuf));
151+ }
172152
173- UC_INFO (" COMPRESS DUMP END." );
174- #else
175- // to posix dump
176- const auto n = ct.task ->desc .size ();
177- if (n > 0 )
178- {
179- backend_->Dump (std::move (ct.task ->desc ));
180- }
153+ auto res = backend_->Dump (std::move (backendDesc));
181154
182- UC_INFO (" COMPRESS DUMP END." );
183- #endif
155+ if (!blockToFree.empty () && res.Value () > 0 ) {
156+ backend_->Wait (res.Value ());
157+ dump_memoryPool_->deallocate (blockToFree);
158+ }
159+ }
160+
184161 ct.waiter ->Done ();
162+ UC_DEBUG (" COMPRESS DUMP END." );
185163}
186164
187165}
0 commit comments