In: Computer Science
Search the Internet and find a Python app you think is interesting. Your choice. 1. Tell me where you found the code you start with (yes, feel free to start with existing code). You can put this in the testing document. 2. Add some functionality. Significant functionality. 3. Add comments - I know how much you all enjoy comments, so there will be a lot of points here. Tell me how to use the program, and what you added. 4. Include a testing document.
Application were doing here is to test how many mails can be sent in a particular time period .The code here is done in python :
Application Code:
from __future__ import print_function
import asyncore, collections, email, json, smtpd, sys, threading,
time
from wsgiref.simple_server import make_server, WSGIRequestHandler,
WSGIServer
import bottle
__version__ = '1.1.3'
#importing required packages
try:
import sendgrid
except ImportError:
sendgrid = None
#initiating SMTP servers
class _SMTPServer(smtpd.SMTPServer):
def process_message(self, peer, mailfrom, rcpttos, data):
#defining processes like where the email TO
b = email.message_from_string(data)
body_parts = []
if b.is_multipart(): #if the Mail has Multiple attatchments or
multiple contents
for part in b.walk():
ctype = part.get_content_type()
cdispo = str(part.get('Content-Disposition'))
if ctype in ('text/plain','text/html') and 'attachment' not in
cdispo: #Checking the formats of the attachments
body_parts.append(part.get_payload(decode=True).decode())
else:
body_parts = [b.get_payload(decode=True).decode()]
e = Email(frm=mailfrom, to=rcpttos, raw=data,
msg='\n'.join(body_parts)) #Joing the parts of the mail
for callback in self.callbacks.values():
callback(e)
_smtp_servers = {}
class _SendgridServer(object):
def __init__(self, port): #defining the ports
self.port = port
self.app = app = bottle.Bottle()
@app.get('/v3/templates')
def templates(): #defining the templates(if we want to send mails
to large number of audience pre defined templates are used)
return {'templates': []}
@app.post('/v3/mail/send')
def send(): #Defining send in the format JSON format
d = bottle.request.json
for p in d['personalizations']:
msg = json.dumps(p, indent=2) #defining indentations
email = Email(frm=d['from']['email'], to=[addr['email'] for addr in
p['to']], msg=msg, raw=msg)
for callback in self.callbacks.values():
callback(email)
return {}
def start(self):
def f(): #defining function for self sending emails
self._sendgrid_server = make_server('localhost', self.port,
self.app, WSGIServer, WSGIRequestHandler)
self._sendgrid_server.serve_forever()
t = threading.Thread(target=f)
t.daemon = True
t.start() #sarting the daemons
time.sleep(.1) #waiting time
_sendgrid_servers = {}
class Server(object):
def __init__(self, smtp_port=1025, sendgrid_port=None):
#initaializing sending ports
self._smtp_port = smtp_port
self._sendgrid_port = sendgrid_port
self._sendgrid_server = None
self.emails = []
def _callback(self, email): #defining call back functions
self.emails.append(email)
def _start_smtp_server(self):
_smtp_server = _smtp_servers.get(self._smtp_port)
if not _smtp_server: #if it is otherthan smtp server and checking
version of system
if sys.version_info[0] < 3:
_smtp_server = _SMTPServer(('localhost', self._smtp_port),
None)
else:
_smtp_server = _SMTPServer(('localhost', self._smtp_port), None,
decode_data=True)
_smtp_servers[self._smtp_port] = _smtp_server
_smtp_server.callbacks = {}
t = threading.Thread(target=asyncore.loop)
t.daemon = True #setting the daemons to True
t.start() #starting the proc
_smtp_server.callbacks[id(self)] = self._callback
def _start_sendgrid_server(self): #defining the function for
grids
server = _sendgrid_servers.get(self._sendgrid_port)
if not server:
_sendgrid_servers[self._sendgrid_port] = server =
_SendgridServer(self._sendgrid_port)
server.callbacks = {}
server.start()
server.callbacks[id(self)] = self._callback
def __enter__(self):
if self._smtp_port: self._start_smtp_server()
if self._sendgrid_port: self._start_sendgrid_server()
return self
def __exit__(self, type, value, tb): #defining Exit function for
both smtp servers and Send grid functions
if self._smtp_port:
del _smtp_servers[self._smtp_port].callbacks[id(self)]
if self._sendgrid_port:
del _sendgrid_servers[self._sendgrid_port].callbacks[id(self)]
Email = collections.namedtuple('Email',
['frm','to','msg','raw'])
Testing for the above application
import smtplib, unittest #importing the necessary librarys
import mailtest
try: #try catch methods to there is any exceptions this will
handle without stopping the entire code running
import sendgrid
except ImportError:
sendgrid = None
class TestMailTest(unittest.TestCase):
def test_smtp(self): #defining the test functionality
with mailtest.Server(smtp_port=1025) as s:
sender = smtplib.SMTP('localhost', 1025) #setting localhost
sender.sendmail('[email protected]', ['[email protected]'],
'hi') #sending mail
sender.close()
self.assertEqual(len(s.emails), 1)
self.assertEqual(s.emails[0].frm, '[email protected]')
self.assertEqual(s.emails[0].to, ['[email protected]'])
self.assertEqual(s.emails[0].msg, 'hi')
def test_mass_email(self): #defining the test functionality for
self mails
with mailtest.Server(smtp_port=1025) as s:
sender = smtplib.SMTP('localhost', 1025)
c = 1000 # ~0.240s
for i in range(c):
sender.sendmail('[email protected]', ['[email protected]'],
'msg #%i' % i)
sender.close()
self.assertEqual(len(s.emails), c)
if sendgrid: #defining the sendgrid for self
def test_sendgrid(self):
with mailtest.Server(sendgrid_port=1080) as s:
sg = sendgrid.SendGridAPIClient(apikey='any',
host='http://localhost:1080')
sg.client.templates.get() #getting the templates
mail = sendgrid.helpers.mail.Mail()
mail.from_email = sendgrid.helpers.mail.Email('[email protected]',
'Author Name')
mail.subject = 'test email' #sending a test mail
mail.template_id = '12345'
for i in range(5):
person = sendgrid.helpers.mail.Personalization()
person.add_to(sendgrid.helpers.mail.Email('recipient%[email protected]'
% i, 'Recipient #%i' % i))
#person.add_substitution(Substitution(unsubscribe_path,
user.unsubscribe_path()))
#person.add_custom_arg(CustomArg("user_id", str(user.id)))
mail.add_personalization(person) #adding the above
personilization
body = mail.get()
sg.client.mail.send.post(request_body=body)
self.assertEqual(len(s.emails), 5)
self.assertEqual(s.emails[0].frm, '[email protected]')
self.assertEqual(s.emails[0].to, ['[email protected]'])
# self.assertEqual(s.emails[0].msg, 'hi')
if __name__ == '__main__':
unittest.main()
description:
The application is about to test the speed or how many mails will be sent in the certain time period and I got this code from github and i have added comments in the code and included the test code below and can state that using the i-5 7th gen processor we can send upto 4000 mails per second