Test SMTP server for unit testing (original) (raw)

import asynchat

import asyncore

import errno

import logging

import logging.handlers

import smtpd

import socket

import sys

import threading

TEST_SMTP_PORT = 9025

class TestSMTPChannel(smtpd.SMTPChannel):

def __init__(self, server, conn, addr, sockmap):

asynchat.async_chat.__init__(self, conn, sockmap)

self.smtp_server = server

self.conn = conn

self.addr = addr

self.received_lines = []

self.smtp_state = self.COMMAND

self.seen_greeting = ''

self.mailfrom = None

self.rcpttos = []

self.received_data = ''

self.fqdn = socket.getfqdn()

self.num_bytes = 0

try:

self.peer = conn.getpeername()

except socket.error as err:

# a race condition may occur if the other end is closing

# before we can get the peername

self.close()

if err.args[0] != errno.ENOTCONN:

raise

return

self.push('220 %s %s' % (self.fqdn, smtpd.__version__))

self.set_terminator(b'\r\n')

class TestSMTPServer(smtpd.SMTPServer):

channel_class = TestSMTPChannel

def __init__(self, addr, handler, poll_interval, sockmap):

self._localaddr = addr

self._remoteaddr = None

self.sockmap = sockmap

asyncore.dispatcher.__init__(self, map=sockmap)

try:

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

sock.setblocking(0)

self.set_socket(sock, map=sockmap)

# try to re-use a server port if possible

self.set_reuse_addr()

self.bind(addr)

self.listen(5)

except:

self.close()

raise

self._handler = handler

self._thread = None

self.poll_interval = poll_interval

def handle_accepted(self, conn, addr):

print('Incoming connection from %s' % repr(addr), file=smtpd.DEBUGSTREAM)

channel = self.channel_class(self, conn, addr, self.sockmap)

def process_message(self, peer, mailfrom, rcpttos, data):

self._handler(peer, mailfrom, rcpttos, data)

def start(self):

self._thread = t = threading.Thread(target=self.serve_forever,

args=(self.poll_interval,))

t.setDaemon(True)

t.start()

def serve_forever(self, poll_interval):

asyncore.loop(poll_interval, map=self.sockmap)

def stop(self):

self.close()

self._thread.join()

self._thread = None

def process(peer, mailfrom, rcpttos, data):

print('%r' % ((peer, mailfrom, rcpttos, data),))

def main():

addr = ('localhost', TEST_SMTP_PORT)

#smtpd.DEBUGSTREAM = sys.stderr

h = logging.handlers.SMTPHandler(addr, 'me', 'you', 'Log')

my_map = {}

server = TestSMTPServer(addr, process, 0.001, my_map)

server.start()

r = logging.makeLogRecord({'msg': 'Hello'})

h.handle(r)

server.stop()

h.close()

if __name__ == '__main__':

main()