-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathdocker_sim_manager.py
More file actions
315 lines (283 loc) · 12.6 KB
/
docker_sim_manager.py
File metadata and controls
315 lines (283 loc) · 12.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
#!/usr/bin/env python
"""Provides a light-weight framework to scale up simulations tasks with docker containers
Disclaimer: This code is part of an example project. In order to reduce complexity,
I decided to use a simple method and class design. There is still a massive potential in
error handling and generalization of methods and classes. Feel free to use this code as a
starting point.
"""
import datetime
import shutil
import threading
import time
from pathlib import Path
import concurrent.futures
import platform
import dateutil
import os
import docker
from docker.errors import DockerException, NotFound, APIError
from docker.types import Mount, LogConfig
from halo import Halo
from loguru import logger
from getpass import getpass
__author__ = "Michael Wittmann and Maximilian Speicher"
__copyright__ = "Copyright 2020, Michael Wittmann and Maximilian Speicher"
__license__ = "MIT"
__version__ = "1.0.0"
__maintainer__ = "Michael Wittmann"
__email__ = "[email protected]"
__status__ = "Example"
class SimJob():
def __init__(self, sim_Name, templates:Path, command=None) -> None:
"""
Creates a distinct job
:param sim_Name: Simulation name (must be unique)
:param templates: files to be copied from your host to the container
:param command: command to be appended at containers entry point
"""
self.templates = templates
self.sim_name = sim_Name
self.command = command
def __str__(self) -> str:
return self.sim_name
class DockerSimManager():
def __init__(self,
docker_container_url:str,
max_workers:int,
data_directory:Path,
docker_repo_tag= 'latest'
) -> None:
"""
:param docker_container_url: Simulation container URL at container registry
:param max_workers: number of parallel workers
:param data_directory: path to the output directory on your host
:param docker_repo_tag: container tag, Default: latest
"""
self._data_directory = data_directory
self._max_workers = max_workers
self._thread_pool_executor = concurrent.futures.ThreadPoolExecutor(max_workers=self._max_workers)
self._container_prefix = 'DockerSim'
self._docker_client = docker.from_env()
self._authenticate_at_container_registry()
with Halo(text='Pulling latest docker_sim image', spinner='dots'):
self._docker_image = self._docker_client.images.pull(
repository=docker_container_url,
tag=docker_repo_tag
)
self._io_lock = threading.Lock()
self._monitoring_frequency = 300
self._minimum_runtime = 300
self._maximum_inactivity_time = 30 * 60
self.job_list = []
def add_sim_job(self, job:SimJob)->None:
"""
Adds a simulation job into the queue
:param job: simulation job to be added
"""
self.job_list.append(job)
def start_computation(self):
"""
Starts computation of all jobs inside the queue.
Jobs must be added before calling this function
"""
self.start_monitoring_thread()
with self._thread_pool_executor as executor:
futures = {executor.submit(self._process_sim_job, sim_job): sim_job
for sim_job in self.job_list}
for future in concurrent.futures.as_completed(futures):
logger.info(f'Run {futures[future]} did finish')
def _process_sim_job(self, sim_job: SimJob)->None:
"""
Triggers processing steps for a single job.
1. _init_simulation
2. _run_docker_container
3. _cleanup_sim_objects
:param sim_job: SimJob to be processed
:return: True if processing succeeded, False otherwise.
"""
sim_paths = self._init_simulation(sim_job=sim_job)
if sim_paths is None:
logger.error(f'Error during initialization for simulation {sim_job}')
return False
try:
(
working_dir,
*file_objects
) = sim_paths
except:
try:
working_dir=sim_paths
except:
logger.error(f'Error during initialization for simulation {str(sim_job)}')
return False
self._run_docker_container(container_name=sim_job.sim_name, working_dir=working_dir, command=sim_job.command)
self.cleanup_sim_objects(sim_job=sim_job, file_objects=file_objects)
return True
def _init_simulation(self, sim_job):
"""
Initialize simulation. May be overridden with custom function.
- Create output folders
- Copy file templates
- ...
:param sim_job: SimJob to be processed
:return: Path to working directory on your host's filesystem for this SimJob
"""
# prepare your data for your scenario here
output_folder_name = f'job_{sim_job.sim_name}'
working_dir = self._data_directory.joinpath(output_folder_name)
try:
with self._io_lock:
working_dir.mkdir(exist_ok=False, parents=True)
# if you need additional files in your simulation e.g. config files, data, add them here example_monte_carlo_pi here
return working_dir
except Exception as e:
logger.warning(e)
return None
pass
def _run_docker_container(self, container_name, working_dir, command):
"""
Triggers the simulation run in a separate Docker container.
:param container_name: the container's name
:param working_dir: working directory on your host's file system
:param command: container command (eg. name of script, cli arguments ...) Must match to your docker entry point.
"""
try:
system_platform = platform.system()
if system_platform == "Windows":
self._docker_client.containers.run(
image=self._docker_image,
command=command,
mounts=[Mount(
target='/mnt/data',
source=str(working_dir.resolve()),
type='bind'
)],
#working_dir='/simulation',
name=container_name,
environment={
# If you need add your environment variables here
},
log_config=LogConfig(type=LogConfig.types.JSON, config={
'max-size': '500m',
'max-file': '3'
})
)
else:
user_id = os.getuid()
self._docker_client.containers.run(
image=self._docker_image,
command=command,
mounts=[Mount(
target='/mnt/data',
source=str(working_dir.resolve()),
type='bind'
)],
working_dir='/mnt/data',
name=container_name,
environment={
# If you need add your environment variables here
},
log_config=LogConfig(type=LogConfig.types.JSON, config={
'max-size': '500m',
'max-file': '3'
}),
user=user_id
)
except DockerException as e:
logger.warning('Error in run {container_name}: {e}.')
finally:
try:
self.write_container_logs_and_remove_it(
container_name=container_name,
working_dir=working_dir
)
except NotFound:
logger.warning(f'Can not save logs for {container_name}, because container does not exist')
def start_monitoring_thread(self):
"""
Start a monitoring thread, which observes running docker containers.
"""
monitoring_thread = threading.Thread(target=self._monitor_containers,
args=(self._container_prefix,),
daemon=True,
name='monitoring')
monitoring_thread.start()
def write_container_logs_and_remove_it(self, container_name, working_dir):
"""
Write container logs and remove the container from your docker server
:param container_name: The container's name, which shall be removed
:param working_dir: path, where logfiles shall be written to
"""
container = self._docker_client.containers.get(container_name)
with open(working_dir.joinpath('log.txt'), 'w') as f:
f.write(container.logs().decode('utf-8'))
container.remove()
def _authenticate_at_container_registry(self):
"""
Authenticate at container registry. NOTE: GitHub container registry is used in this example.
If you want to user other container registries, like DockerHub or GitLab, feel free to adapt this method.
Function either uses Environment variables for authentication or asks for login credentials in the command line.
Note: GitHub container registry does not accept your personal password. You need to generate a personal access token (PAT)
"""
username = os.environ.get('GITHUB_USERNAME')
password = os.environ.get('GITHUB_PAT')
if username is None:
username = input('Enter username for container registry: ')
if password is None:
password = getpass('Enter password for container registry: ')
login_result = self._docker_client.login(
registry='ghcr.io',
username=username,
password=password,
reauth=True
)
if login_result['Status'] != 'Login Succeeded':
raise RuntimeError("Could not authenticate at GitHub container registry")
else:
logger.info("Successfully authenticated at GitHub container registry.")
def _monitor_containers(self, container_prefix):
"""
Monitors all running docker containers. Inactive containers get killed after self._maximum_inactivity_time
:param container_prefix: The containers prefix used for all containers in this simulation
"""
while True:
containers = self._docker_client.containers.list()
for container in containers:
try:
if container_prefix in container.name:
container_start = dateutil.parser.isoparse(container.attrs['State']['StartedAt'])
now = datetime.datetime.now(datetime.timezone.utc)
uptime = (now - container_start).total_seconds()
logs = container.logs(since=int(time.time() - self._maximum_inactivity_time))
if uptime > self._minimum_runtime and not logs:
logger.warning(f'Container {container.name} ran for more than '
f'{self._minimum_runtime} seconds and showed no log activity for '
f'{self._maximum_inactivity_time} seconds.'
f'It will be stopped.')
container.stop()
except APIError as e:
logger.warning(f'Error during thread monitoring: {str(e)}')
time.sleep(self._monitoring_frequency)
@staticmethod
def cleanup_sim_objects(sim_job:SimJob, file_objects):
"""
Clean up function for finished simulations. Removes all files given in file objects.
:param sim_job: SimJob to be processed
:param file_objects: files/directories to be removed after simulation
"""
file_object:Path
for file_object in file_objects:
if file_object.is_file():
try:
file_object.unlink()
except Exception as e:
logger.warning(e)
logger.warning(f'Error during cleanup for simulation {sim_job.sim_name}')
elif file_object.is_dir():
try:
shutil.rmtree(file_object)
except Exception as e:
logger.warning(e)
logger.warning(f'Error during cleanup for simulation {sim_job.sim_name}')
else:
logger.warning(f"{file_object} is not a file or directory.")