Source code for nti.mailer.queue

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Processors for :mod:`repoze.sendmail`, intended as a drop-in replacement
for the ``qp`` command line, using Amazon SES.

"""

from __future__ import print_function, absolute_import, division
__docformat__ = "restructuredtext en"


import os
import argparse
import sys
import logging
from email.message import Message

import gevent

from zope import interface
from zope import deprecation
from zope.cachedescriptors.property import Lazy

import boto3

from botocore.config import Config

from repoze.sendmail.encoding import encode_message

from repoze.sendmail.interfaces import IMailer

from repoze.sendmail.maildir import Maildir

from repoze.sendmail.queue import ConsoleApp as _ConsoleApp
from repoze.sendmail.queue import QueueProcessor


logger = __import__('logging').getLogger(__name__)

[docs] @interface.implementer(IMailer) class SESMailer(object): """ This object does not handle throttling or quata actions; see also :mod:`nti.app.bulkemail.process`. """ def __init__(self, region='us-east-1'): self.region = region @property def _ses_config(self): return Config(region_name=self.region) @Lazy def client(self): client = boto3.client('ses', config=self._ses_config) assert client return client def close(self): # pragma: no cover pass def send(self, fromaddr, toaddrs, message): if not isinstance(message, Message): # pragma: no cover raise ValueError('Message must be instance of email.message.Message') message = encode_message(message) # Send the mail using SES, transforming SESError and known # subclasses into something the SMTP-based queue processor # knows how to deal with. NOTE: now that we're here, we have # the opportunity to de-VERP the fromaddr found in the # message, but still use the VERP form in the fromaddr we pass # to SES. In this way we can handle bounces with the recipient # none-the-wiser. See also :mod:`nti.app.bulkemail.process` # NOTE: Each recipient (To, CC, BCC) counts as a distinct # message for purposes of the quota limits. There are a # maximum of 50 dests per address. # (http://docs.aws.amazon.com/ses/latest/APIReference/API_SendRawEmail.html) # # NOTE: It is recommended to send an email to individuals: # http://docs.aws.amazon.com/ses/latest/DeveloperGuide/sending-email.html # "When you send an email to multiple recipients (recipients # are "To", "CC", and "BCC" addresses) and the call to Amazon # SES fails, the entire email is rejected and none of the # recipients will receive the intended email. We therefore # recommend that you send an email to one recipient at a time." # QQQ: The docs for SendRawEmail say that destinations is not required, # so how does that interact with what's in the message body? # Boto will accept either a string, a list of strings, or None # pylint:disable=no-member self.client.send_raw_email(RawMessage={'Data': message}, Source=fromaddr, Destinations=toaddrs)
[docs] class ConsoleApp(_ConsoleApp): def __init__(self, argv=None): # pylint: disable=super-init-not-called argv = argv or sys.argv # Bypass the superclass, don't try to construct an SMTP mailer self.script_name = argv[0] self._process_args(argv[1:]) self.mailer = SESMailer() getattr(self.mailer, 'client')
class _AbstractMailerProcess(object): _exit = False def __init__(self, mailer_factory, queue_path, sleep_seconds=120): # pylint: disable=unused-argument self.mailer_factory = mailer_factory self.sleep_seconds = sleep_seconds self.queue_path = queue_path self.mail_dir = Maildir(self.queue_path, create=True) def _maildir_factory(self, *_args, **_kwargs): return self.mail_dir def _do_process_queue(self): mailer = self.mailer_factory() assert mailer try: processor = QueueProcessor(mailer, # Note this gets ignored by the Maildir factory we send self.queue_path, Maildir=self._maildir_factory) logger.info('Processing messages %s' % (processor.maildir.path)) processor.send_messages() finally: try: mailer.close() except AttributeError: pass mailer = None def close(self): raise NotImplementedError
[docs] class LoopingMailerProcess(_AbstractMailerProcess): """ A mailer processor that dumps the queue on a provided interval. """ # Hook for testing. _sleep_after_run = staticmethod(gevent.sleep) def run(self): while not self._exit: self._do_process_queue() logger.debug('Going to sleep for %i seconds' % (self.sleep_seconds)) self._sleep_after_run(self.sleep_seconds) def close(self): self._exit = True
def _stat_modified_time(attrs): """ libev and libuv expose different attributes. Specifically modified time is st_mtime or st_mtim respectively. In libuv we need to look at st_mtim.tv_sec *and* st_mtim.tv_nsec: in case modifications happen within the second we compared to. """ try: return attrs.st_mtime except AttributeError: # Or we could scale tv_sec to nanoseconds... try: return (attrs.st_mtim.tv_sec, attrs.st_mtim.tv_nsec) except AttributeError: # pragma: no cover # XXX: Bug on gevent on PyPy/Darwin/libev: # AttributeError: cdata 'struct stat' has no field 'st_mtim' # In fact, it only has the `st_nlink` field. # Be sure we're on PyPy if not hasattr(sys, 'pypy_version_info'): # pylint:disable=raise-missing-from raise NotImplementedError("Unsupported stat implementation") # The best we can do is return something that should compare # unique so we always look like we're modified... return id(attrs) def _stat_watcher_modified(watcher): """ Inspects the stat watcher to see if the modified time has changed between the current attrs and the prev attrs """ # XXX: This can fail (false negative) if modifications are coming # in faster than the resolution of the mtime, which depends on the # gevent loop in use, as well as the filesystem and possibly # the configuration. This is easily observed on GitHub Actions # with both watchers; our writing process has to sleep to allow # the modification times to appear to change. # XXX: Moreover, the very act of processing the queue will probably cause # the watcher to fire. We should probably stop the watcher while # processing the queue. return _stat_modified_time(watcher.prev) != _stat_modified_time(watcher.attr) _MINIMUM_DEBOUNCE_INTERVAL_SECONDS = 10
[docs] class MailerWatcher(_AbstractMailerProcess): """ A Mailer processor that watches for changes in the mail directory using gevent stat watchers. """ watcher = None debouncer = None debouncer_count = 0 max_process_frequency_seconds = _MINIMUM_DEBOUNCE_INTERVAL_SECONDS def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) hub = gevent.get_hub() to_watch = os.path.join(self.queue_path, 'new') # TODO: Do we need to get abspath() on to_watch? I (JAM) # suspect watchers and symlinks don't play well self.watcher = hub.loop.stat(to_watch) def close(self): self._stop_watching() # It's critical to close() watchers before we destroy them # (let them be GC'd). Otherwise, gevent can crash (mostly # under libuv). See # https://github.com/gevent/gevent/issues/1805 self.watcher.close() if self.debouncer is not None: self.debouncer.stop() self.debouncer.close() self.debouncer = None self.debouncer_count = 0 def _start_watching(self): assert self.watcher logger.debug('Starting watcher for MailDir %s', self.watcher.path) self.watcher.start(self._stat_change_observed) def _stop_watching(self): assert self.watcher logger.debug('Stopping watcher for MailDir %s', self.watcher.path) self.watcher.stop() def _youve_got_mail(self): # We've detected we have mail. We want to debounce # this so we aren't going crazy. Process the queue at most every # self.max_process_frequency_seconds # We use a gevent timer to accomplish this. # XXX: This logic seems complex. Can it be simplified to accomplish the # same thing? hub = gevent.get_hub() if self.debouncer is None: self.debouncer = hub.loop.timer(self.max_process_frequency_seconds) if self.debouncer.active is False: # XXX is False? Why not just 'not'? self.debouncer_count = 0 self.debouncer.start(self._timer_fired) logger.info('Processing mail queue. Queue processing paused for %i seconds', self.max_process_frequency_seconds) self._do_process_queue() else: self.debouncer_count += 1 logger.debug('Deferring queue processing because it was run recently') def _stat_change_observed(self): # On certain file systems we will see stat changes # for access times which we don't care about. We really # only care about modified times. if _stat_watcher_modified(self.watcher): logger.debug('Maildir watcher detected MailDir modification') self._youve_got_mail() def _timer_fired(self): self.debouncer.stop() self.debouncer.close() self.debouncer = None if self.debouncer_count > 0: self.debouncer_count = 0 self._youve_got_mail() def run(self, seconds=None): # pylint:disable=arguments-differ # Process once initially in case we have things in the queue already self._do_process_queue() # Note we don't call start watching because _do_process_queue handles that self._start_watching() gevent.get_hub().join(seconds)
_LOG_LEVELS = [ logging.ERROR, logging.WARN, logging.INFO, logging.DEBUG ] def _log_level_for_verbosity(verbosity=0): # clamp to the range. # start at 0, no wraparound verbosity = max(verbosity, 0) # not past the end verbosity = min(verbosity, len(_LOG_LEVELS) - 1) return _LOG_LEVELS[verbosity] def run_process(): # pragma: no cover parser = argparse.ArgumentParser( description='Spawn a process that stays alive, periodically polling the mail queue ' 'and sending new mail using SES.') parser.add_argument('queue_path', help='The path to the maildir', action='store') parser.add_argument('-r', '--sesregion', help='The SES region to connect to.') parser.add_argument('-v', '--verbose', help='How verbose to log.', action='count', default=1) parser.add_argument('-w', '--debounce-interval', dest='interval', help=('The number of seconds to wait between dumping the queue ' '(default: %(default)s)'), action='store', default=MailerWatcher.max_process_frequency_seconds, type=int) arguments = parser.parse_args() log_level = _log_level_for_verbosity(arguments.verbose) logging.basicConfig(stream=sys.stderr, format='%(asctime)s %(levelname)s %(message)s', level=log_level) _mailer_factory = SESMailer if arguments.sesregion: # pylint:disable=redefined-variable-type # pylint:disable=unnecessary-lambda-assignment _mailer_factory = lambda: SESMailer(arguments.sesregion) app = MailerWatcher(_mailer_factory, arguments.queue_path) if arguments.interval: app.max_process_frequency_seconds = max(_MINIMUM_DEBOUNCE_INTERVAL_SECONDS, arguments.interval) logger.info('Using debounce interval of %i', app.max_process_frequency_seconds) app.run() def run_console(): # pragma: no cover if '--help' in sys.argv: # Override the help message we get from repoze.sendmail, we # ignore all the SMTP stuff it wants to do. parser = argparse.ArgumentParser( description='Process the mail queue one time and exit. ' 'Mail found in the mail queue will be sent using default SES settings.' ) parser.add_argument('queue_path', help='The path to the maildir', action='store') parser.parse_args() return logging.basicConfig(stream=sys.stderr, format='%(asctime)s %(levelname)s %(message)s', level=logging.WARN) app = ConsoleApp() app.main() ### Deprecated names #: Backwards compatibility alias #: #: .. deprecated:: 0.0.1 #: Use `LoopingMailerProcess` MailerProcess = LoopingMailerProcess # BWC deprecation.deprecated('MailerProcess', 'Use LoopingMailerProcess') if __name__ == "__main__": # pragma NO COVERAGE run_console()