-
Notifications
You must be signed in to change notification settings - Fork 9
Expand file tree
/
Copy pathorchestrator.py
More file actions
114 lines (94 loc) · 3.52 KB
/
orchestrator.py
File metadata and controls
114 lines (94 loc) · 3.52 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
import multiprocessing as mp
import random
import time
import signal
import sys
from contextlib import contextmanager
"""
Orchestrator for concurrent GPU usage.
This is used to ensure that we don't have multiple processes
accessing the same GPU.
"""
class GPUOrchestrator:
def __init__(self, num_gpus=8):
self.num_gpus = num_gpus
# Create a semaphore to limit total GPU access
self.gpu_semaphore = mp.Semaphore(num_gpus)
# Track which GPUs are in use
self.gpu_status = mp.Array("i", [0] * num_gpus)
# Lock for accessing gpu_status
self.status_lock = mp.Lock()
print(f"[Orchestration] GPU Orchestrator initialized with {num_gpus} GPUs")
# Create a listener for cleanup on shutdown
signal.signal(signal.SIGINT, self.cleanup)
signal.signal(signal.SIGTERM, self.cleanup)
def cleanup(self, *args):
print("\n[Orchestration] Cleaning up GPU Orchestrator...")
sys.exit(0)
def get_available_gpu(self):
"""Find and reserve an available GPU."""
with self.status_lock:
for i in range(self.num_gpus):
if self.gpu_status[i] == 0:
self.gpu_status[i] = 1
return i
return None
def release_gpu(self, gpu_id):
"""Release a GPU back to the pool."""
with self.status_lock:
self.gpu_status[gpu_id] = 0
@contextmanager
def reserve_gpu(self):
"""Context manager for GPU reservation."""
# Wait for a GPU to become available
self.gpu_semaphore.acquire()
gpu_id = self.get_available_gpu()
try:
yield gpu_id
finally:
self.release_gpu(gpu_id)
self.gpu_semaphore.release()
def worker_process(process_id, orchestrator):
"""Simulated worker process that needs GPU access."""
while True:
# Simulate some CPU work before needing GPU
time.sleep(random.uniform(0.1, 2))
print(f"[Orchestration] Process {process_id} requesting GPU...")
with orchestrator.reserve_gpu() as gpu_id:
print(f"[Orchestration] Process {process_id} acquired GPU {gpu_id}")
# Simulate GPU work
try:
# device = torch.device(f"cuda:{gpu_id}")
# Simulate some GPU computation
work_time = random.uniform(1, 5)
print(
f"[Orchestration] Process {process_id} working on GPU {gpu_id} for {work_time:.2f} seconds"
)
time.sleep(work_time)
except Exception as e:
print(f"Process {process_id} encountered error: {e}")
print(f"[Orchestration] Process {process_id} releasing GPU {gpu_id}")
def main():
# Create the GPU orchestrator
orchestrator = GPUOrchestrator(num_gpus=8)
# Create multiple worker processes
num_workers = 12 # More workers than GPUs to demonstrate queuing
processes = []
try:
# Start worker processes
for i in range(num_workers):
p = mp.Process(target=worker_process, args=(i, orchestrator))
p.start()
processes.append(p)
# Wait for all processes to complete (they won't in this case, need manual interrupt)
for p in processes:
p.join()
except KeyboardInterrupt:
print("\nShutting down...")
# Terminate all processes
for p in processes:
p.terminate()
p.join()
if __name__ == "__main__":
mp.freeze_support()
main()