In [1]:
import configparser
import imaplib
import email
import io
import zipfile
import xml.etree.ElementTree
import psycopg2
import re
import datetime

In [2]:
def fetch_msg(num):
 return mailbox.uid('FETCH', num, '(RFC822)')[1][0][1]

In [3]:
def xml_of_part(part):
 with zipfile.ZipFile(io.BytesIO(part.get_payload(decode=True))) as zf:
 fn = zf.infolist()[0].filename
 contents = zf.read(fn).decode('utf-8')
 return xml.etree.ElementTree.fromstring(contents)

In [4]:
def xml_of(message):
 reports = []
 if message.is_multipart():
 for p in message.get_payload():
 if 'zip' in p.get_content_type():
 reports += [xml_of_part(p)]
 else:
 reports = [xml_of_part(message)]
 return reports

In [5]:
def extract_report(msg):
 pmsg = email.message_from_bytes(msg)
 return xml_of(pmsg)

In [6]:
def maybe_strip(text):
 if text:
 return text.strip()
 else:
 return ''

In [7]:
field_maps = {'./policy_published/adkim': {'pg_field_name': 'policy_published_adkim',
 'pg_table': 'reports',
 'pg_type': 'varchar'},
 './policy_published/aspf': {'pg_field_name': 'policy_published_aspf',
 'pg_table': 'reports',
 'pg_type': 'varchar'},
 './policy_published/domain': {'pg_field_name': 'policy_published_domain',
 'pg_table': 'reports',
 'pg_type': 'varchar'},
 './policy_published/p': {'pg_field_name': 'policy_published_p',
 'pg_table': 'reports',
 'pg_type': 'varchar'},
 './policy_published/pct': {'pg_field_name': 'policy_published_pct',
 'pg_table': 'reports',
 'pg_type': 'int'},
 './record[{}]/auth_results/dkim/domain': {'pg_field_name': 'auth_results_dkim_domain',
 'pg_table': 'report_items',
 'pg_type': 'varchar'},
 './record[{}]/auth_results/dkim/result': {'pg_field_name': 'auth_results_dkim_result',
 'pg_table': 'report_items',
 'pg_type': 'varchar'},
 './record[{}]/auth_results/spf/domain': {'pg_field_name': 'auth_results_spf_domain',
 'pg_table': 'report_items',
 'pg_type': 'varchar'},
 './record[{}]/auth_results/spf/result': {'pg_field_name': 'auth_results_spf_result',
 'pg_table': 'report_items',
 'pg_type': 'varchar'},
 './record[{}]/identifiers/header_from': {'pg_field_name': 'identifiers_header_from',
 'pg_table': 'report_items',
 'pg_type': 'varchar'},
 './record[{}]/row/count': {'pg_field_name': 'count',
 'pg_table': 'report_items',
 'pg_type': 'int'},
 './record[{}]/row/policy_evaluated/disposition': {'pg_field_name': 'policy_evaluated_disposition',
 'pg_table': 'report_items',
 'pg_type': 'varchar'},
 './record[{}]/row/policy_evaluated/dkim': {'pg_field_name': 'policy_evaluated_dkim',
 'pg_table': 'report_items',
 'pg_type': 'varchar'},
 './record[{}]/row/policy_evaluated/spf': {'pg_field_name': 'policy_evaluated_spf',
 'pg_table': 'report_items',
 'pg_type': 'varchar'},
 './record[{}]/row/source_ip': {'pg_field_name': 'source_ip',
 'pg_table': 'report_items',
 'pg_type': 'inet'},
 './report_metadata/date_range/begin': {'pg_field_name': 'report_metadata_date_range_begin',
 'pg_table': 'reports',
 'pg_type': 'timestamp'},
 './report_metadata/date_range/end': {'pg_field_name': 'report_metadata_date_range_end',
 'pg_table': 'reports',
 'pg_type': 'timestamp'},
 './report_metadata/email': {'pg_field_name': 'report_metadata_email',
 'pg_table': 'reports',
 'pg_type': 'varchar'},
 './report_metadata/org_name': {'pg_field_name': 'report_metadata_org_name',
 'pg_table': 'reports',
 'pg_type': 'varchar'},
 './report_metadata/report_id': {'pg_field_name': 'report_metadata_report_id',
 'pg_table': 'reports',
 'pg_type': 'varchar'}}

In [8]:
def build_insert_command(table_name, report, preamble_values=None, i=None):
 field_names = []
 if preamble_values:
 values = preamble_values.copy()
 else:
 values = {}
 for f in [f for f in field_maps if field_maps[f]['pg_table'] == table_name]:
 # print(f)
 if i:
 fp = f.format(i)
 else:
 fp = f
 field_names += [field_maps[f]['pg_field_name']]
 if field_maps[f]['pg_type'] == 'int':
 values[field_maps[f]['pg_field_name']] = int(report.find(fp).text)
 elif field_maps[f]['pg_type'] == 'timestamp':
 values[field_maps[f]['pg_field_name']] = datetime.datetime.utcfromtimestamp(int(report.find(fp).text))
 elif field_maps[f]['pg_type'] == 'inet':
 values[field_maps[f]['pg_field_name']] = maybe_strip(report.find(fp).text)
 else:
 values[field_maps[f]['pg_field_name']] = maybe_strip(report.find(fp).text)
 insert_string = 'insert into {} ('.format(table_name)
 if preamble_values:
 insert_string += ', '.join(sorted(preamble_values.keys())) + ', '
 insert_string += ', '.join(field_names) + ') '
 insert_string += 'values ('
 if preamble_values:
 insert_string += ', '.join('%({})s'.format(fn) for fn in sorted(preamble_values.keys())) + ', '
 insert_string += ', '.join('%({})s'.format(f) for f in field_names) + ');'
 return insert_string, values

In [9]:
def write_report(connection, cursor, report):
 insert_string, values = build_insert_command('reports', report)
 # print(insert_string, values)
 cursor.execute(insert_string, values)
 
 for i in range(1, len(report.findall('./record'))+1):
 field_names = []
 cursor.execute('select id, report_metadata_report_id from reports where report_metadata_report_id = %s;', 
 [report.find('./report_metadata/report_id').text])
 results = cursor.fetchall()
 if len(results) != 1:
 raise RuntimeError('Could not find report record for report item')
 else:
 report_id = results[0][0]
 insert_string, values = build_insert_command('report_items', report, i=i,
 preamble_values={'report_id': report_id})
 # print(insert_string, values)
 cursor.execute(insert_string, values)
 connection.commit()

In [10]:
def write_report_old(connection, cursor, report):
 
 field_names = []
 values = {}
 for f in [f for f in field_maps if field_maps[f]['pg_table'] == 'reports']:
 field_names += [field_maps[f]['pg_field_name']]
 if field_maps[f]['pg_type'] == 'int':
 values[field_maps[f]['pg_field_name']] = int(report.find(f).text)
 elif field_maps[f]['pg_type'] == 'timestamp':
 values[field_maps[f]['pg_field_name']] = datetime.datetime.utcfromtimestamp(int(report.find(f).text))
 elif field_maps[f]['pg_type'] == 'inet':
 values[field_maps[f]['pg_field_name']] = maybe_strip(report.find(f).text)
 else:
 values[field_maps[f]['pg_field_name']] = maybe_strip(report.find(f).text)
 insert_string = 'insert into reports (' + ', '.join(field_names) + ') '
 insert_string += 'values (' + ', '.join('%({})s'.format(f) for f in field_names) + ');'
 # print(insert_string, values)
 cursor.execute(insert_string, values)
 
 for i in range(1, len(report.findall('./record'))+1):
 field_names = []
 cursor.execute('select id, report_metadata_report_id from reports where report_metadata_report_id = %s;', 
 [report.find('./report_metadata/report_id').text])
 results = cursor.fetchall()
 if len(results) != 1:
 raise
 else:
 report_id = results[0][0]
 values = {'report_id': report_id}
 for f in [f for f in field_maps if field_maps[f]['pg_table'] == 'report_items']:
 field_names += [field_maps[f]['pg_field_name']]
 if field_maps[f]['pg_type'] == 'int':
 values[field_maps[f]['pg_field_name']] = int(report.find(f.format(i)).text)
 elif field_maps[f]['pg_type'] == 'timestamp':
 values[field_maps[f]['pg_field_name']] = datetime.datetime.utcfromtimestamp(int(report.find(f.format(i)).text))
 elif field_maps[f]['pg_type'] == 'inet':
 values[field_maps[f]['pg_field_name']] = maybe_strip(report.find(f.format(i)).text)
 else:
 values[field_maps[f]['pg_field_name']] = maybe_strip(report.find(f.format(i)).text)
 insert_string = 'insert into report_items (report_id, ' + ', '.join(field_names) + ') '
 insert_string += 'values (%(report_id)s, ' + ', '.join('%({})s'.format(f) for f in field_names) + ');'
 # print(insert_string, values)
 cursor.execute(insert_string, values)
 connection.commit()

In [11]:
config = configparser.ConfigParser()
config.read('dmarc.ini')

['dmarc.ini']

In [12]:
with psycopg2.connect(host=config['database']['server'],
 database=config['database']['database'], 
 user=config['database']['username'], 
 password=config['database']['password']) as conn:
 with conn.cursor() as cur:
 cur.execute('select max(report_metadata_date_range_end) from reports')
 results = cur.fetchall()
most_recent_date = results[0][0]
most_recent_date

datetime.datetime(2016, 3, 29, 23, 59, 59)

In [13]:
mailbox = imaplib.IMAP4(host=config['imap']['server'], 
 port=config['imap']['port'])
mailbox.starttls()
mailbox.login(config['imap']['username'], config['imap']['password'])
mailbox.select('INBOX', readonly=True)

('OK', [b'178'])

In [14]:
if most_recent_date:
 mails_from = "SINCE " + (most_recent_date - datetime.timedelta(days=2)).strftime("%d-%b-%Y")
else:
 mails_from = "ALL"
resp, nums = mailbox.uid('SEARCH', None, mails_from)
mails_from, resp, nums

('SINCE 27-Mar-2016', 'OK', [b'169 170 171 172 173 174 175 176 177 178 179'])

In [15]:
dmarc_reports = [report for report_set in [extract_report(fetch_msg(n)) for n in nums[0].split()]
 for report in report_set]
[r.find('./report_metadata/report_id').text for r in dmarc_reports]

['1458957186.548175',
 '2150510829392606201',
 '68aad5080a774e2c997d159b546569b9@hotmail.com',
 '1459129809.695034',
 '16143280651570354241',
 '8c177254c3cb41869dc3afab59f74c76@hotmail.com',
 '15410706527896810898',
 '1459216304.582931',
 '15497495941279624940',
 '1459302353.261157',
 '7773a696f4a54f1e8c01f4644fbb94ee@hotmail.com']

In [16]:
mailbox.close()
mailbox.logout()

('BYE', [b'Logging out'])

In [18]:
with psycopg2.connect(host=config['database']['server'],
 database=config['database']['database'], 
 user=config['database']['username'], 
 password=config['database']['password']) as conn:
 with conn.cursor() as cur:
 for report in dmarc_reports:
 cur.execute('select id, report_metadata_report_id from reports where report_metadata_report_id = %s;', 
 [report.find('./report_metadata/report_id').text])
 results = cur.fetchall()
 if not results:
 print('write', report.find('./report_metadata/report_id').text)
 write_report(conn, cur, report)

write 1459302353.261157
write 7773a696f4a54f1e8c01f4644fbb94ee@hotmail.com
