This repository was archived by the owner on Mar 7, 2025. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 23
Expand file tree
/
Copy pathresource.py
More file actions
executable file
·339 lines (258 loc) · 12.9 KB
/
resource.py
File metadata and controls
executable file
·339 lines (258 loc) · 12.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
#!/usr/bin/env python
# encoding: utf-8
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# author: Paco Nathan
# https://github.com/ceteri/exelixi
from json import dumps, loads
from service import Framework, Worker, WorkerInfo
from threading import Thread
from util import get_telemetry
from uuid import uuid1
import logging
import mesos
import mesos_pb2
import os
import subprocess
import sys
import time
######################################################################
## class definitions
class MesosScheduler (mesos.Scheduler):
# https://github.com/apache/mesos/blob/master/src/python/src/mesos.py
def __init__ (self, executor, exe_path, n_workers, uow_name, prefix, cpu_alloc, mem_alloc):
self.executor = executor
self.taskData = {}
self.tasksLaunched = 0
self.tasksFinished = 0
self.messagesSent = 0
self.messagesReceived = 0
# resource requirements
self._cpu_alloc = cpu_alloc
self._mem_alloc = mem_alloc
# protected members to customize for Exelixi needs
self._executors = {}
self._exe_path = exe_path
self._n_workers = n_workers
self._uow_name = uow_name
self._prefix = prefix
def registered (self, driver, frameworkId, masterInfo):
"""
Invoked when the scheduler successfully registers with a Mesos
master. It is called with the frameworkId, a unique ID
generated by the master, and the masterInfo which is
information about the master itself.
"""
logging.info("registered with framework ID %s", frameworkId.value)
def resourceOffers (self, driver, offers):
"""
Invoked when resources have been offered to this framework. A
single offer will only contain resources from a single slave.
Resources associated with an offer will not be re-offered to
_this_ framework until either (a) this framework has rejected
those resources (see SchedulerDriver.launchTasks) or (b) those
resources have been rescinded (see Scheduler.offerRescinded).
Note that resources may be concurrently offered to more than
one framework at a time (depending on the allocator being
used). In that case, the first framework to launch tasks
using those resources will be able to use them while the other
frameworks will have those resources rescinded (or if a
framework has already launched tasks with those resources then
those tasks will fail with a TASK_LOST status and a message
saying as much).
"""
logging.debug("Mesos Scheduler: received %d resource offers", len(offers))
for offer in offers:
tasks = []
logging.debug("Mesos Scheduler: received resource offer %s", offer.id.value)
## NB: currently we force 'offer.hostname' to be unique per Executor...
## could be changed, but we'd need to juggle the service port numbers
if self.tasksLaunched < self._n_workers and offer.hostname not in self._executors:
tid = self.tasksLaunched
self.tasksLaunched += 1
logging.debug("Mesos Scheduler: accepting offer on slave %s to start task %d", offer.hostname, tid)
task = mesos_pb2.TaskInfo()
task.task_id.value = str(tid)
task.slave_id.value = offer.slave_id.value
task.name = "task %d" % tid
task.executor.MergeFrom(self.executor)
cpus = task.resources.add()
cpus.name = "cpus"
cpus.type = mesos_pb2.Value.SCALAR
cpus.scalar.value = self._cpu_alloc
mem = task.resources.add()
mem.name = "mem"
mem.type = mesos_pb2.Value.SCALAR
mem.scalar.value = self._mem_alloc
tasks.append(task)
self.taskData[task.task_id.value] = (offer.slave_id, task.executor.executor_id)
# record and report the Mesos slave node's telemetry and state
self._executors[offer.hostname] = WorkerInfo(offer, task)
for exe in self._executors.values():
logging.debug(exe.report())
# request the driver to launch the task
driver.launchTasks(offer.id, tasks)
def statusUpdate (self, driver, update):
"""
Invoked when the status of a task has changed (e.g., a slave
is lost and so the task is lost, a task finishes and an
executor sends a status update saying so, etc.) Note that
returning from this callback acknowledges receipt of this
status update. If for whatever reason the scheduler aborts
during this callback (or the process exits) another status
update will be delivered. Note, however, that this is
currently not true if the slave sending the status update is
lost or fails during that time.
"""
logging.debug("Mesos Scheduler: task %s is in state %d", update.task_id.value, update.state)
if update.state == mesos_pb2.TASK_FINISHED:
self.tasksFinished += 1
slave_id, executor_id = self.taskData[update.task_id.value]
# update WorkerInfo with telemetry from initial discovery task
telemetry = loads(str(update.data))
logging.info("telemetry from slave %s, executor %s\n%s", slave_id.value, executor_id.value, str(update.data))
exe = self.lookup_executor(slave_id.value, executor_id.value)
exe.ip_addr = telemetry["ip_addr"]
## NB: TODO make the service port a parameter
exe.port = Worker.DEFAULT_PORT
if self.tasksFinished == self._n_workers:
logging.info("Mesos Scheduler: %d init tasks completed", self._n_workers)
# request to launch service as a child process
self.messagesSent += 1
message = str(dumps([ self._exe_path, "-p", exe.port ]))
driver.sendFrameworkMessage(executor_id, slave_id, message)
def frameworkMessage (self, driver, executorId, slaveId, message):
"""
Invoked when an executor sends a message. These messages are
best effort; do not expect a framework message to be
retransmitted in any reliable fashion.
"""
self.messagesReceived += 1
logging.info("Mesos Scheduler: slave %s executor %s", slaveId.value, executorId.value)
logging.info("message %d received: %s", self.messagesReceived, str(message))
if self.messagesReceived == self._n_workers:
if self.messagesReceived != self.messagesSent:
logging.critical("Mesos Scheduler: framework messages lost! sent %d received %d", self.messagesSent, self.messagesReceived)
sys.exit(1)
for exe in self._executors.values():
logging.debug(exe.report())
logging.info("all worker services launched and init tasks completed")
exe_info = self._executors.values()
worker_list = [ exe.get_shard_uri() for exe in exe_info ]
# run UnitOfWork orchestration via REST endpoints on the workers
fra = Framework(self._uow_name, self._prefix)
fra.set_worker_list(worker_list, exe_info)
time.sleep(1)
fra.orchestrate_uow()
# shutdown the Executors after the end of an algorithm run
driver.stop()
def lookup_executor (self, slave_id, executor_id):
"""lookup the Executor based on IDs"""
for exe in self._executors.values():
if exe.slave_id == slave_id:
return exe
@staticmethod
def start_framework (master_uri, exe_path, n_workers, uow_name, prefix, cpu_alloc, mem_alloc):
# initialize an executor
executor = mesos_pb2.ExecutorInfo()
executor.executor_id.value = uuid1().hex
executor.command.value = exe_path
executor.name = "Exelixi Executor"
executor.source = "per-job build"
## NB: TODO download tarball/container from HDFS
#uri = executor.command.uris.add()
#uri.executable = false
#uri.value = "hdfs://namenode/exelixi/exelixi.tgz"
# initialize the framework
framework = mesos_pb2.FrameworkInfo()
framework.user = "" # have Mesos fill in the current user
framework.name = "Exelixi Framework"
if os.getenv("MESOS_CHECKPOINT"):
logging.debug("Mesos Scheduler: enabling checkpoint for the framework")
framework.checkpoint = True
# create a scheduler and capture the command line options
sched = MesosScheduler(executor, exe_path, n_workers, uow_name, prefix, cpu_alloc, mem_alloc)
# initialize a driver
if os.getenv("MESOS_AUTHENTICATE"):
logging.debug("Mesos Scheduler: enabling authentication for the framework")
if not os.getenv("DEFAULT_PRINCIPAL"):
logging.critical("Mesos Scheduler: expecting authentication principal in the environment")
sys.exit(1);
if not os.getenv("DEFAULT_SECRET"):
logging.critical("Mesos Scheduler: expecting authentication secret in the environment")
sys.exit(1);
credential = mesos_pb2.Credential()
credential.principal = os.getenv("DEFAULT_PRINCIPAL")
credential.secret = os.getenv("DEFAULT_SECRET")
driver = mesos.MesosSchedulerDriver(sched, framework, master_uri, credential)
else:
driver = mesos.MesosSchedulerDriver(sched, framework, master_uri)
return driver
@staticmethod
def stop_framework (driver):
"""ensure that the driver process terminates"""
status = 0 if driver.run() == mesos_pb2.DRIVER_STOPPED else 1
driver.stop();
sys.exit(status)
class MesosExecutor (mesos.Executor):
# https://github.com/apache/mesos/blob/master/src/python/src/mesos.py
def launchTask (self, driver, task):
"""
Invoked when a task has been launched on this executor
(initiated via Scheduler.launchTasks). Note that this task
can be realized with a thread, a process, or some simple
computation, however, no other callbacks will be invoked on
this executor until this callback has returned.
"""
## NB: the following code runs on the Mesos slave (source of the resource offer)
def run_task():
logging.debug("Mesos Executor: requested task %s", task.task_id.value)
update = mesos_pb2.TaskStatus()
update.task_id.value = task.task_id.value
update.state = mesos_pb2.TASK_RUNNING
update.data = str("running discovery task")
logging.debug(update.data)
driver.sendStatusUpdate(update)
update = mesos_pb2.TaskStatus()
update.task_id.value = task.task_id.value
update.state = mesos_pb2.TASK_FINISHED
## NB: TODO test port availability...
update.data = str(dumps(get_telemetry(), indent=4))
## NB: TODO download tarball/container for service launch
# notify scheduler: ready to launch service
logging.debug(update.data)
driver.sendStatusUpdate(update)
# now create a thread to run the requested task: run tasks in
# new threads or processes, rather than inside launchTask...
# NB: gevent/coroutines/Greenlets conflict here... must run
# those in a child shell process
thread = Thread(target=run_task)
thread.start()
def frameworkMessage (self, driver, message):
"""
Invoked when a framework message has arrived for this
executor. These messages are best effort; do not expect a
framework message to be retransmitted in any reliable fashion.
"""
# launch service
logging.info("Mesos Executor: service launched: %s", message)
subprocess.Popen(loads(message))
# notify scheduler: service was successfully launched
driver.sendFrameworkMessage(str("service launched"))
@staticmethod
def run_executor ():
"""run the executor until it is stopped externally by the framework"""
driver = mesos.MesosExecutorDriver(MesosExecutor())
sys.exit(0 if driver.run() == mesos_pb2.DRIVER_STOPPED else 1)
if __name__=='__main__':
print "Starting executor..."
MesosExecutor.run_executor()