Test SMTP server for unit testing (original) (raw)
import asynchat
import asyncore
import errno
import logging
import logging.handlers
import smtpd
import socket
import sys
import threading
TEST_SMTP_PORT = 9025
class TestSMTPChannel(smtpd.SMTPChannel):
def __init__(self, server, conn, addr, sockmap):
asynchat.async_chat.__init__(self, conn, sockmap)
self.smtp_server = server
self.conn = conn
self.addr = addr
self.received_lines = []
self.smtp_state = self.COMMAND
self.seen_greeting = ''
self.mailfrom = None
self.rcpttos = []
self.received_data = ''
self.fqdn = socket.getfqdn()
self.num_bytes = 0
try:
self.peer = conn.getpeername()
except socket.error as err:
# a race condition may occur if the other end is closing
# before we can get the peername
self.close()
if err.args[0] != errno.ENOTCONN:
raise
return
self.push('220 %s %s' % (self.fqdn, smtpd.__version__))
self.set_terminator(b'\r\n')
class TestSMTPServer(smtpd.SMTPServer):
channel_class = TestSMTPChannel
def __init__(self, addr, handler, poll_interval, sockmap):
self._localaddr = addr
self._remoteaddr = None
self.sockmap = sockmap
asyncore.dispatcher.__init__(self, map=sockmap)
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setblocking(0)
self.set_socket(sock, map=sockmap)
# try to re-use a server port if possible
self.set_reuse_addr()
self.bind(addr)
self.listen(5)
except:
self.close()
raise
self._handler = handler
self._thread = None
self.poll_interval = poll_interval
def handle_accepted(self, conn, addr):
print('Incoming connection from %s' % repr(addr), file=smtpd.DEBUGSTREAM)
channel = self.channel_class(self, conn, addr, self.sockmap)
def process_message(self, peer, mailfrom, rcpttos, data):
self._handler(peer, mailfrom, rcpttos, data)
def start(self):
self._thread = t = threading.Thread(target=self.serve_forever,
args=(self.poll_interval,))
t.setDaemon(True)
t.start()
def serve_forever(self, poll_interval):
asyncore.loop(poll_interval, map=self.sockmap)
def stop(self):
self.close()
self._thread.join()
self._thread = None
def process(peer, mailfrom, rcpttos, data):
print('%r' % ((peer, mailfrom, rcpttos, data),))
def main():
addr = ('localhost', TEST_SMTP_PORT)
#smtpd.DEBUGSTREAM = sys.stderr
h = logging.handlers.SMTPHandler(addr, 'me', 'you', 'Log')
my_map = {}
server = TestSMTPServer(addr, process, 0.001, my_map)
server.start()
r = logging.makeLogRecord({'msg': 'Hello'})
h.handle(r)
server.stop()
h.close()
if __name__ == '__main__':
main()