Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 64 additions & 7 deletions benchmarks/benchmark_workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class BenchmarkConfig:
shuffle_ids: bool = False
warmup_round: int = 1
benchmark_round: int = 10
bidirectional: bool = False

def make_configs(args: dict) -> Tuple[ModelConfig, CacheConfig, BenchmarkConfig]:
config_file = args.config
Expand All @@ -44,6 +45,7 @@ def make_configs(args: dict) -> Tuple[ModelConfig, CacheConfig, BenchmarkConfig]
bench_config.shuffle_ids = args.shuffle_ids
bench_config.warmup_round = args.warmup_round
bench_config.benchmark_round = args.benchmark_round
bench_config.bidirectional = args.bi
return model_config, cache_config, bench_config
except Exception as e:
raise ValueError(f"Failed to load config file {config_file}: {e}") from None
Expand Down Expand Up @@ -86,7 +88,7 @@ def create_cpu_gpu_worker(
# max_op_num=4, max_block_num should be larger than num_blocks_to_transfer
max_block_num = max(1024, cache_config.num_cpu_blocks)
op_buffer_tensor = torch.empty((4, max_block_num), dtype=torch.int64).share_memory_()

if model_config.tp_size == 1:
worker_handle = GPUCPUTransferWorker.create_worker(
mp_ctx=mp.get_context('spawn'),
Expand Down Expand Up @@ -161,7 +163,7 @@ def create_cpu_ssd_worker(
# max_op_num=4, max_block_num should be larger than num_blocks_to_transfer
max_block_num = max(1024, cache_config.num_cpu_blocks)
op_buffer_tensor = torch.empty((4, max_block_num), dtype=torch.int64).share_memory_()

worker_handle = CPUSSDDiskTransferWorker.create_worker(
mp_ctx=mp.get_context('spawn'),
finished_ops_queue=finished_ops_queue,
Expand All @@ -182,11 +184,18 @@ def create_cpu_ssd_worker(
def launch_transfer(worker_handle: WorkerHandle,
finished_ops_queue: mp.Queue,
transfer_op: TransferOp):
op_id = transfer_op.op_id
worker_handle.submit_transfer(transfer_op)
ret_op_id = finished_ops_queue.get()
assert ret_op_id == op_id
return True

def sync_all(finished_ops_queue: mp.Queue, num_ops: int):
for _ in range(num_ops):
finished_ops_queue.get()

REVERSE_TYPE_MAP = {
TransferType.D2H: TransferType.H2D,
TransferType.H2D: TransferType.D2H,
TransferType.DISK2H: TransferType.H2DISK,
TransferType.H2DISK: TransferType.DISK2H,
}

def bench_worker(args):
model_config, cache_config, bench_config = make_configs(args)
Expand All @@ -204,6 +213,7 @@ def bench_worker(args):
num_layers_to_transfer = model_config.num_layers
num_blocks_to_transfer = bench_config.num_blocks_to_transfer
shuffle_ids = bench_config.shuffle_ids
bidirectional = bench_config.bidirectional

if transfer_type == TransferType.H2D or transfer_type == TransferType.D2H:
worker_handle, finished_ops_queue = create_cpu_gpu_worker(model_config, cache_config)
Expand All @@ -213,6 +223,15 @@ def bench_worker(args):
raise ValueError(f"Unsupported transfer type: {transfer_type} for benchmark, "
f"currently only support {TransferType.H2D.name}, {TransferType.D2H.name}, "
f"{TransferType.H2DISK.name}, {TransferType.DISK2H.name}")
reverse_worker_handle = None
reverse_finished_ops_queue = None
if bidirectional:
if transfer_type == TransferType.H2D or transfer_type == TransferType.D2H:
reverse_worker_handle, reverse_finished_ops_queue = \
create_cpu_gpu_worker(model_config, cache_config)
elif transfer_type == TransferType.H2DISK or transfer_type == TransferType.DISK2H:
reverse_worker_handle, reverse_finished_ops_queue = \
create_cpu_ssd_worker(model_config, cache_config)

if shuffle_ids:
block_ids = torch.randperm(num_blocks_to_transfer).numpy()
Expand All @@ -230,21 +249,54 @@ def bench_worker(args):
successors=[],
predecessors=[],
)
if transfer_type == TransferType.DISK2H:

reverse_transfer_op = None
if bidirectional:
reverse_type = REVERSE_TYPE_MAP.get(transfer_type)
if reverse_type is None:
raise ValueError(f"Bidirectional test not supported for transfer type: {transfer_type}")

reverse_block_ids = torch.randperm(num_blocks_to_transfer).numpy()

reverse_transfer_op = TransferOp(
transfer_type=reverse_type,
layer_id=0,
layer_granularity=num_layers_to_transfer,
src_block_ids=reverse_block_ids,
dst_block_ids=reverse_block_ids,
graph_id=1,
dp_id=0,
successors=[],
predecessors=[],
)
if transfer_type == TransferType.DISK2H or transfer_type == TransferType.H2DISK:
tmp_op = copy.deepcopy(transfer_op)
tmp_op.transfer_type = TransferType.H2DISK
tmp_op.src_block_ids = transfer_op.dst_block_ids
tmp_op.dst_block_ids = transfer_op.src_block_ids
launch_transfer(worker_handle, finished_ops_queue, tmp_op)
sync_all(finished_ops_queue, 1)

for _ in range(warmup_round):
if bidirectional:
launch_transfer(reverse_worker_handle, reverse_finished_ops_queue, reverse_transfer_op)
launch_transfer(worker_handle, finished_ops_queue, transfer_op)
sync_all(finished_ops_queue, warmup_round)
if bidirectional:
sync_all(reverse_finished_ops_queue, warmup_round)

pbar = tqdm(total=benchmark_round, desc="Benchmarking")
start_time = time.time()
for _ in range(benchmark_round):
if bidirectional:
launch_transfer(reverse_worker_handle, reverse_finished_ops_queue, reverse_transfer_op)
launch_transfer(worker_handle, finished_ops_queue, transfer_op)
pbar.update(1)
pbar.close()
sync_all(finished_ops_queue, benchmark_round)
end_time = time.time()
if bidirectional:
sync_all(reverse_finished_ops_queue, benchmark_round)
total_data_size_GB = (
num_blocks_to_transfer *
cache_config.tokens_per_block *
Expand All @@ -257,6 +309,8 @@ def bench_worker(args):
print(f"Avg Time taken: {avg_time} seconds")
print(f"Avg Bandwidth: {total_data_size_GB / avg_time} GB/s")
worker_handle.shutdown()
if bidirectional:
reverse_worker_handle.shutdown()

def parse_args():
parser = ArgumentParser()
Expand All @@ -280,6 +334,9 @@ def parse_args():
parser.add_argument("--benchmark-round",
type=int,
default=10)
parser.add_argument("--bi",
action="store_true",
help="benchmark bidirectional bandwidth")
return parser.parse_args()

if __name__ == "__main__":
Expand Down
4 changes: 2 additions & 2 deletions benchmarks/example_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
"transfer_sms_d2h": 8,
"max_blocks_per_file": 32000,
"ssd_cache_dir": "./ssd_cache1/",
"ssd_cache_iouring_entries": 32,
"ssd_cache_iouring_flags": 0,
"ssd_cache_iouring_entries": 512,
"ssd_cache_iouring_flags": 1,
"remote_cache_size_mode": "file_size",
"remote_file_size": null,
"remote_file_num": null,
Expand Down
82 changes: 61 additions & 21 deletions csrc/transfer_ssd.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ static void _transfer_iouring_impl(
int64_t cpu_layer_stride_in_bytes, int64_t ssd_layer_stride_in_bytes,
int64_t cpu_kv_stride_in_bytes, int64_t ssd_kv_stride_in_bytes,
int64_t chunk_size_in_bytes, int64_t block_stride_in_bytes,
int num_files_per_device, bool is_read, bool is_mla) {
int num_files_per_device, bool is_read, bool is_mla,
bool enable_block_first_transfer) {
int num_blocks = end_block - start_block;
int rc;

Expand All @@ -53,6 +54,39 @@ static void _transfer_iouring_impl(
int fd = fd_list[ssd_block_id % num_files_per_device];
ssd_block_id /= num_files_per_device; // block id in single file

if (enable_block_first_transfer) {
int layers_chunk_size_in_bytes =
cpu_layer_stride_in_bytes * (end_layer - start_layer);
int cpu_layers_chunk_offset = start_layer * cpu_layer_stride_in_bytes;
int ssd_layers_chunk_offset = start_layer * ssd_layer_stride_in_bytes;
void *cpu_block_ptr = reinterpret_cast<char *>(cpu_tensor_ptr) +
block_stride_in_bytes * cpu_block_id +
cpu_layers_chunk_offset;
int ssd_block_offset =
ssd_block_id * block_stride_in_bytes + ssd_layers_chunk_offset;

ssize_t bytes_transfer = 0;
if (is_read) {
rc = iouring.prep_read(fd, cpu_block_ptr, layers_chunk_size_in_bytes,
ssd_block_offset);
if (rc < 0) {
bytes_transfer = pread(fd, cpu_block_ptr, layers_chunk_size_in_bytes,
ssd_block_offset);
}
} else {
rc = iouring.prep_write(fd, cpu_block_ptr, layers_chunk_size_in_bytes,
ssd_block_offset);
if (rc < 0) {
bytes_transfer = pwrite(fd, cpu_block_ptr, layers_chunk_size_in_bytes,
ssd_block_offset);
}
}
if (bytes_transfer && (bytes_transfer != layers_chunk_size_in_bytes)) {
throw std::runtime_error("Failed to transfer block");
}
continue;
}

for (int lid = start_layer; lid < end_layer; lid++) {
int64_t ssd_k_block_offset = ssd_block_id * block_stride_in_bytes +
lid * ssd_layer_stride_in_bytes;
Expand All @@ -71,15 +105,15 @@ static void _transfer_iouring_impl(
rc = iouring.prep_read(fd, cpu_k_block_ptr, chunk_size_in_bytes,
ssd_k_block_offset);
if (rc < 0) {
bytes_transfer = pread(fd, cpu_k_block_ptr, chunk_size_in_bytes,
ssd_k_block_offset);
bytes_transfer = pread(fd, cpu_k_block_ptr, chunk_size_in_bytes,
ssd_k_block_offset);
}
} else {
rc = iouring.prep_write(fd, cpu_k_block_ptr, chunk_size_in_bytes,
ssd_k_block_offset);
if (rc < 0) {
bytes_transfer = pwrite(fd, cpu_k_block_ptr, chunk_size_in_bytes,
ssd_k_block_offset);
bytes_transfer = pwrite(fd, cpu_k_block_ptr, chunk_size_in_bytes,
ssd_k_block_offset);
}
}

Expand All @@ -96,20 +130,20 @@ static void _transfer_iouring_impl(
rc = iouring.prep_read(fd, cpu_v_block_ptr, chunk_size_in_bytes,
ssd_v_block_offset);
if (rc < 0) {
bytes_transfer = pread(fd, cpu_v_block_ptr, chunk_size_in_bytes,
ssd_v_block_offset);
bytes_transfer = pread(fd, cpu_v_block_ptr, chunk_size_in_bytes,
ssd_v_block_offset);
}
} else {
rc = iouring.prep_write(fd, cpu_v_block_ptr, chunk_size_in_bytes,
ssd_v_block_offset);
if (rc < 0) {
bytes_transfer = pwrite(fd, cpu_v_block_ptr, chunk_size_in_bytes,
ssd_v_block_offset);
bytes_transfer = pwrite(fd, cpu_v_block_ptr, chunk_size_in_bytes,
ssd_v_block_offset);
}
}

if (bytes_transfer && (bytes_transfer != chunk_size_in_bytes)) {
throw std::runtime_error("Failed to transfer K block");
throw std::runtime_error("Failed to transfer K block");
}
} // end layer loop
} // end block loop
Expand Down Expand Up @@ -181,10 +215,10 @@ static void _transfer_single_thread_impl(
// NOTE that we may also use other techniques such as
// AIO, O_DIRECT, and etc to improve the performance
void transfer_kv_blocks_ssd(
SSDIOCTX &ioctx,
const torch::Tensor &cpu_layer_id_list, int64_t cpu_tensor_ptr,
const torch::Tensor &ssd_block_ids, const torch::Tensor &cpu_block_ids,
int64_t cpu_layer_stride_in_bytes, int64_t cpu_kv_stride_in_bytes,
SSDIOCTX &ioctx, const torch::Tensor &cpu_layer_id_list,
int64_t cpu_tensor_ptr, const torch::Tensor &ssd_block_ids,
const torch::Tensor &cpu_block_ids, int64_t cpu_layer_stride_in_bytes,
int64_t cpu_kv_stride_in_bytes,
int64_t ssd_layer_stride_in_bytes, // in single file
int64_t ssd_kv_stride_in_bytes, // in single file
int64_t chunk_size_in_bytes, int64_t block_stride_in_bytes, bool is_read,
Expand Down Expand Up @@ -212,10 +246,17 @@ void transfer_kv_blocks_ssd(
cpu_block_id_ptr, ssd_block_id_ptr, num_blocks, num_devices, round_robin,
cpu_blocks_partition, ssd_blocks_partition);

const bool cpu_is_block_first =
block_stride_in_bytes > cpu_layer_stride_in_bytes;
const bool ssd_is_block_first =
block_stride_in_bytes > ssd_layer_stride_in_bytes;
const bool enable_block_first_transfer =
cpu_is_block_first && ssd_is_block_first;

std::vector<std::thread> threads;
std::vector<std::future<std::exception_ptr>> futures;
for (int d = 0; d < num_devices; d++) {
for (int t = 0; t < num_threads_per_device; t++) {
for (int t = 0; t < num_threads_per_device; t++) {
for (int d = 0; d < num_devices; d++) {
int start_layer = cpu_layer_id_list_ptr[0];
int end_layer = cpu_layer_id_list_ptr[0] + num_layers;
int num_transfer_blocks = cpu_blocks_partition[d].size();
Expand All @@ -228,13 +269,12 @@ void transfer_kv_blocks_ssd(
if (start_block < end_block) {
if (iouring.enabled()) {
_transfer_iouring_impl(
iouring, fds[d],
cpu_blocks_partition[d], ssd_blocks_partition[d],
iouring, fds[d], cpu_blocks_partition[d], ssd_blocks_partition[d],
start_layer, end_layer, start_block, end_block, cpu_tensor_ptr,
cpu_layer_stride_in_bytes, ssd_layer_stride_in_bytes,
cpu_kv_stride_in_bytes, ssd_kv_stride_in_bytes,
chunk_size_in_bytes, block_stride_in_bytes, num_files_per_device,
is_read, is_mla);
is_read, is_mla, enable_block_first_transfer);
continue;
}

Expand Down Expand Up @@ -262,8 +302,8 @@ void transfer_kv_blocks_ssd(
}
});
}
} // end thread loop
} // end device loop
} // end device loop
} // end thread loop

if (iouring.enabled()) {
if (iouring.wait_completion()) {
Expand Down
Loading