Skip to content
This repository was archived by the owner on Mar 27, 2025. It is now read-only.

Commit 466e91c

Browse files
committed
wip queue
1 parent 56b36ae commit 466e91c

File tree

2 files changed

+157
-76
lines changed

2 files changed

+157
-76
lines changed

salmon/queue.py

Lines changed: 93 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -42,14 +42,6 @@ def _create_tmp(self):
4242
raise mailbox.ExternalClashError('Name clash prevented file creation: %s' % path)
4343

4444

45-
class QueueError(Exception):
46-
47-
def __init__(self, msg, data):
48-
Exception.__init__(self, msg)
49-
self._message = msg
50-
self.data = data
51-
52-
5345
class Queue:
5446
"""
5547
Provides a simplified API for dealing with 'queues' in Salmon.
@@ -104,6 +96,16 @@ def push(self, message):
10496
message = str(message)
10597
return self.mbox.add(message)
10698

99+
def _move_oversize(self, name):
100+
if self.oversize_dir:
101+
logging.info("Message key %s over size limit %d, moving to %s.",
102+
key, self.pop_limit, self.oversize_dir)
103+
os.rename(name, os.path.join(self.oversize_dir, key))
104+
else:
105+
logging.info("Message key %s over size limit %d, DELETING (set oversize_dir).",
106+
key, self.pop_limit)
107+
os.unlink(name)
108+
107109
def pop(self):
108110
"""
109111
Pops a message off the queue, order is not really maintained
@@ -115,21 +117,10 @@ def pop(self):
115117
over, over_name = self.oversize(key)
116118

117119
if over:
118-
if self.oversize_dir:
119-
logging.info("Message key %s over size limit %d, moving to %s.",
120-
key, self.pop_limit, self.oversize_dir)
121-
os.rename(over_name, os.path.join(self.oversize_dir, key))
122-
else:
123-
logging.info("Message key %s over size limit %d, DELETING (set oversize_dir).",
124-
key, self.pop_limit)
125-
os.unlink(over_name)
120+
self._move_oversize(over_name)
126121
else:
127-
try:
128-
msg = self.get(key)
129-
except QueueError as exc:
130-
raise exc
131-
finally:
132-
self.remove(key)
122+
msg = self.get(key)
123+
self.remove(key)
133124
return key, msg
134125

135126
return None, None
@@ -149,11 +140,11 @@ def get(self, key):
149140
try:
150141
return mail.MailRequest(self.dir, None, None, msg_data)
151142
except Exception as exc:
152-
logging.exception("Failed to decode message: %s; msg_data: %r", exc, msg_data)
143+
logging.exception("Failed to decode message: %s; msg_data: %r", exc, msg_data)
153144
return None
154145

155146
def remove(self, key):
156-
"""Removes the queue, but not returned."""
147+
"""Removes key the queue."""
157148
self.mbox.remove(key)
158149

159150
def __len__(self):
@@ -166,15 +157,8 @@ def __len__(self):
166157
def clear(self):
167158
"""
168159
Clears out the contents of the entire queue.
169-
170-
Warning: This could be horribly inefficient since it pops messages
171-
until the queue is empty. It could also cause an infinite loop if
172-
another process is writing to messages to the Queue faster than we can
173-
pop.
174160
"""
175-
# man this is probably a really bad idea
176-
while len(self) > 0:
177-
self.pop()
161+
self.mbox.clear()
178162

179163
def keys(self):
180164
"""
@@ -188,3 +172,80 @@ def oversize(self, key):
188172
return os.path.getsize(file_name) > self.pop_limit, file_name
189173
else:
190174
return False, None
175+
176+
177+
class Metadata:
178+
def __init__(self, path):
179+
# mkdir dir+metadata
180+
self.path = os.path.join(path, "metadata")
181+
os.mkdir(self.path)
182+
183+
def get(self, key):
184+
return json.load(open(os.path.join(self.path, key), "r"))
185+
186+
def set(self, key, data):
187+
json.dump(open(os.path.join(self.path, key), "w"), data)
188+
189+
def remove(self, key):
190+
os.unlink(open(os.path.join(self.path, key))
191+
192+
@contextlib.contextmanager
193+
def lock(self, key):
194+
i = 0
195+
meta_file = open(os.path.join(self.path, key), "rw")
196+
while True:
197+
# try for a lock using exponential backoff
198+
try:
199+
fcntl.flock(meta_file, fcntl.LOCK_EX | fcntl.LOCK_NB)
200+
except BlockingIOError:
201+
if i > 5:
202+
# 2**5 is 30 seconds which is far too long
203+
raise
204+
time.sleep(2**i)
205+
i += 1
206+
else:
207+
break
208+
209+
try:
210+
yield meta_file
211+
finally:
212+
fcntl.flock(meta_file, fcntl.LOCK_UN)
213+
meta_file.close()
214+
215+
216+
class QueueWithMetadata(Queue):
217+
"""Just like Queue, except it stores envolope data"""
218+
def __init__(self, *args, **kwargs):
219+
super().__init__(self, *args, **kwargs)
220+
self.metadata = Metadata(self.dir)
221+
222+
def push(self, message, Peer, From, To):
223+
if not isinstance(To, list):
224+
To = [To]
225+
key = super().push(message)
226+
with self.metadata.lock(key):
227+
self.metadata.set(key, {"Peer": Peer, "From": From, "To": To})
228+
return key
229+
230+
def get(self, key):
231+
with self.metadata.lock(key):
232+
msg = super().get(key)
233+
metadata = self.metadata.get(key)
234+
# move data from metadata to msg obj
235+
for k, v in metadata.items():
236+
setattr(msg, k, v)
237+
metadata["To"].remove(msg.To)
238+
self.metadata.set(key, metadata)
239+
return msg
240+
241+
def remove(self, key):
242+
with self.metadata.lock(key) as meta_file:
243+
metadata = self.metadata.get(key)
244+
# if there's still a To to be processed, leave the message on disk
245+
if not metadata.get("To"):
246+
super().remove(key)
247+
self.metadata.remove(key)
248+
249+
def clear(self):
250+
self.metadata.clear()
251+
super().clear()

salmon/server.py

Lines changed: 64 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -292,40 +292,46 @@ def process_message(self, Peer, From, To, Data, **kwargs):
292292
return _deliver(self, Peer, From, To, Data, **kwargs)
293293

294294

295-
class SMTPOnlyOneRcpt(SMTP):
296-
async def smtp_RCPT(self, arg):
297-
if self.envelope.rcpt_tos:
298-
await self.push(SMTP_MULTIPLE_RCPTS_ERROR)
299-
else:
300-
await super().smtp_RCPT(arg)
301-
302-
303295
class SMTPHandler:
304296
def __init__(self, executor=None):
305297
self.executor = executor
306298

307299
async def handle_DATA(self, server, session, envelope):
308-
status = await server.loop.run_in_executor(self.executor, partial(
309-
_deliver,
310-
self,
311-
session.peer,
312-
envelope.mail_from,
313-
envelope.rcpt_tos[0],
314-
envelope.content,
315-
))
316-
return status or "250 Ok"
300+
try:
301+
status = await server.loop.run_in_executor(self.executor, partial(
302+
self.queue.queue.push,
303+
envolope.content,
304+
session.peer,
305+
envolope.mail_from,
306+
envolope.rcpt_tos,
307+
))
308+
status = "250 Ok"
309+
except Exception:
310+
logging.exception("Raised exception while trying to push to Queue: %r, Peer: %r, From: %r, To: %r")
311+
status = "550 Server error"
312+
return status
317313

318314

319315
class AsyncSMTPReceiver(Controller):
320316
"""Receives emails and hands it to the Router for further processing."""
321-
def __init__(self, handler=None, **kwargs):
317+
def __init__(self, handler=None, queue=None, **kwargs):
322318
if handler is None:
323319
handler = SMTPHandler()
320+
if queue is None:
321+
queue = QueueReceiver(queue.QueueWithMetadata(IN_QUEUE))
322+
self.queue = queue
324323
super().__init__(handler, **kwargs)
325324

326325
def factory(self):
327-
# TODO implement a queue
328-
return SMTPOnlyOneRcpt(self.handler, enable_SMTPUTF8=self.enable_SMTPUTF8, ident=ROUTER_VERSION_STRING)
326+
return SMTP(self.handler, enable_SMTPUTF8=self.enable_SMTPUTF8, ident=ROUTER_VERSION_STRING)
327+
328+
def start(self):
329+
super().start()
330+
self.queue.start()
331+
332+
def stop(self):
333+
super().stop()
334+
self.queue.stop()
329335

330336

331337
class LMTPHandler:
@@ -340,7 +346,8 @@ async def handle_DATA(self, server, session, envelope):
340346
self,
341347
session.peer,
342348
envelope.mail_from,
343-
rcpt, envelope.content,
349+
rcpt,
350+
envelope.content,
344351
))
345352
statuses.append(status or "250 Ok")
346353
return "\r\n".join(statuses)
@@ -389,20 +396,25 @@ class QueueReceiver:
389396
same way otherwise.
390397
"""
391398

392-
def __init__(self, queue_dir, sleep=10, size_limit=0, oversize_dir=None, workers=10):
399+
def __init__(self, queue, sleep=10, size_limit=0, oversize_dir=None, workers=10):
393400
"""
394401
The router should be fully configured and ready to work, the queue_dir
395402
can be a fully qualified path or relative. The option workers dictates
396403
how many threads are started to process messages. Consider adding
397404
``@nolocking`` to your handlers if you are able to.
398405
"""
399-
self.queue = queue.Queue(queue_dir, pop_limit=size_limit,
400-
oversize_dir=oversize_dir)
406+
if isinstance(queue, str):
407+
self.queue = queue.Queue(queue, pop_limit=size_limit,
408+
oversize_dir=oversize_dir)
409+
else:
410+
self.queue = queue
401411
self.sleep = sleep
402412

403413
# Pool is from multiprocess.dummy which uses threads rather than processes
404414
self.workers = Pool(workers)
405415

416+
self._running = True
417+
406418
def start(self, one_shot=False):
407419
"""
408420
Start simply loops indefinitely sleeping and pulling messages
@@ -412,25 +424,32 @@ def start(self, one_shot=False):
412424
"""
413425

414426
logging.info("Queue receiver started on queue dir %s", self.queue.dir)
415-
logging.debug("Sleeping for %d seconds...", self.sleep)
416-
417-
# if there are no messages left in the maildir and this a one-shot, the
418-
# while loop terminates
419-
while not (len(self.queue) == 0 and one_shot):
420-
# if there's nothing in the queue, take a break
421-
if len(self.queue) == 0:
422-
time.sleep(self.sleep)
423-
continue
424427

425-
try:
426-
key, msg = self.queue.pop()
427-
except KeyError:
428-
logging.debug("Could not find message in Queue")
429-
continue
430-
431-
logging.debug("Pulled message with key: %r off", key)
432-
self.workers.apply_async(self.process_message, args=(msg,))
428+
def _run()
429+
while self._running:
430+
# if there's nothing in the queue, take a break
431+
if len(self.queue) == 0:
432+
if one_shot:
433+
break
434+
else:
435+
logging.debug("Sleeping for %d seconds...", self.sleep)
436+
time.sleep(self.sleep)
437+
continue
438+
439+
try:
440+
key, msg = self.queue.pop()
441+
except KeyError:
442+
logging.debug("Could not find message in Queue")
443+
continue
444+
445+
logging.debug("Pulled message with key: %r off", key)
446+
self.workers.apply_async(self.process_message, args=(msg,))
447+
self.main_thread = threading.Thread(target=_run)
448+
self.main_thread.start()
433449

450+
def stop(self):
451+
self._running = False
452+
self.main_thread.join()
434453
self.workers.close()
435454
self.workers.join()
436455

@@ -441,12 +460,13 @@ def process_message(self, msg):
441460
"""
442461

443462
try:
444-
logging.debug("Message received from Peer: %r, From: %r, to To %r.", msg.Peer, msg.From, msg.To)
463+
logging.debug("Message received from Queue: %r, Peer: %r, From: %r, to To %r.",
464+
self.queue, msg.Peer, msg.From, msg.To)
445465
routing.Router.deliver(msg)
446466
except SMTPError as err:
447467
logging.exception("Raising SMTPError when running in a QueueReceiver is unsupported.")
448468
undeliverable_message(msg.Data, err.message)
449469
except Exception:
450-
logging.exception("Exception while processing message from Peer: "
451-
"%r, From: %r, to To %r.", msg.Peer, msg.From, msg.To)
470+
logging.exception("Exception while processing message from Queue: %r, Peer: "
471+
"%r, From: %r, to To %r.", self.queue, msg.Peer, msg.From, msg.To)
452472
undeliverable_message(msg.Data, "Router failed to catch exception.")

0 commit comments

Comments
 (0)