From fa101ebf71cd46c51df29eac35a001f60dd6434e Mon Sep 17 00:00:00 2001 From: Neil Smith Date: Fri, 1 Apr 2016 12:12:43 +0100 Subject: [PATCH] Added timezone to timestamps --- complete.ipynb | 73 ++++++++-------- dmarc_to_database | 201 +++++++++++++++++++++++++++++++++++++++++++ dmarc_to_database.py | 36 ++++---- queries.ipynb | 48 ++++++----- 4 files changed, 278 insertions(+), 80 deletions(-) create mode 100755 dmarc_to_database diff --git a/complete.ipynb b/complete.ipynb index bd272eb..735b78f 100644 --- a/complete.ipynb +++ b/complete.ipynb @@ -315,23 +315,34 @@ "metadata": { "collapsed": false }, + "outputs": [], + "source": [ + "conn = psycopg2.connect(host=config['database']['server'],\n", + " database=config['database']['database'], \n", + " user=config['database']['username'], \n", + " password=config['database']['password'])" + ] + }, + { + "cell_type": "code", + "execution_count": 13, + "metadata": { + "collapsed": false + }, "outputs": [ { "data": { "text/plain": [ - "datetime.datetime(2016, 3, 29, 23, 59, 59)" + "datetime.datetime(2016, 3, 30, 23, 59, 59)" ] }, - "execution_count": 12, + "execution_count": 13, "metadata": {}, "output_type": "execute_result" } ], "source": [ - "with psycopg2.connect(host=config['database']['server'],\n", - " database=config['database']['database'], \n", - " user=config['database']['username'], \n", - " password=config['database']['password']) as conn:\n", + "with conn:\n", " with conn.cursor() as cur:\n", " cur.execute('select max(report_metadata_date_range_end) from reports')\n", " results = cur.fetchall()\n", @@ -341,7 +352,7 @@ }, { "cell_type": "code", - "execution_count": 13, + "execution_count": 14, "metadata": { "collapsed": false }, @@ -349,10 +360,10 @@ { "data": { "text/plain": [ - "('OK', [b'178'])" + "('OK', [b'179'])" ] }, - "execution_count": 13, + "execution_count": 14, "metadata": {}, "output_type": "execute_result" } @@ -367,7 +378,7 @@ }, { "cell_type": "code", - "execution_count": 14, + "execution_count": 15, "metadata": { "collapsed": false }, @@ -375,10 +386,10 @@ { "data": { "text/plain": [ - "('SINCE 27-Mar-2016', 'OK', [b'169 170 171 172 173 174 175 176 177 178 179'])" + "('SINCE 28-Mar-2016', 'OK', [b'172 173 174 175 176 177 178 179 180'])" ] }, - "execution_count": 14, + "execution_count": 15, "metadata": {}, "output_type": "execute_result" } @@ -394,7 +405,7 @@ }, { "cell_type": "code", - "execution_count": 15, + "execution_count": 16, "metadata": { "collapsed": false }, @@ -402,20 +413,18 @@ { "data": { "text/plain": [ - "['1458957186.548175',\n", - " '2150510829392606201',\n", - " '68aad5080a774e2c997d159b546569b9@hotmail.com',\n", - " '1459129809.695034',\n", + "['1459129809.695034',\n", " '16143280651570354241',\n", " '8c177254c3cb41869dc3afab59f74c76@hotmail.com',\n", " '15410706527896810898',\n", " '1459216304.582931',\n", " '15497495941279624940',\n", " '1459302353.261157',\n", - " '7773a696f4a54f1e8c01f4644fbb94ee@hotmail.com']" + " '7773a696f4a54f1e8c01f4644fbb94ee@hotmail.com',\n", + " '15185964531645951164']" ] }, - "execution_count": 15, + "execution_count": 16, "metadata": {}, "output_type": "execute_result" } @@ -428,7 +437,7 @@ }, { "cell_type": "code", - "execution_count": 16, + "execution_count": 17, "metadata": { "collapsed": false }, @@ -439,7 +448,7 @@ "('BYE', [b'Logging out'])" ] }, - "execution_count": 16, + "execution_count": 17, "metadata": {}, "output_type": "execute_result" } @@ -455,21 +464,9 @@ "metadata": { "collapsed": false }, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "write 1459302353.261157\n", - "write 7773a696f4a54f1e8c01f4644fbb94ee@hotmail.com\n" - ] - } - ], + "outputs": [], "source": [ - "with psycopg2.connect(host=config['database']['server'],\n", - " database=config['database']['database'], \n", - " user=config['database']['username'], \n", - " password=config['database']['password']) as conn:\n", + "with conn:\n", " with conn.cursor() as cur:\n", " for report in dmarc_reports:\n", " cur.execute('select id, report_metadata_report_id from reports where report_metadata_report_id = %s;', \n", @@ -482,12 +479,14 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 19, "metadata": { "collapsed": true }, "outputs": [], - "source": [] + "source": [ + "conn.close()" + ] } ], "metadata": { diff --git a/dmarc_to_database b/dmarc_to_database new file mode 100755 index 0000000..a8141fd --- /dev/null +++ b/dmarc_to_database @@ -0,0 +1,201 @@ +#!/usr/bin/python3 + +import configparser +import imaplib +import email +import io +import zipfile +import xml.etree.ElementTree +import psycopg2 +import re +import datetime + +def fetch_msg(num): + return mailbox.uid('FETCH', num, '(RFC822)')[1][0][1] + +def xml_of_part(part): + with zipfile.ZipFile(io.BytesIO(part.get_payload(decode=True))) as zf: + fn = zf.infolist()[0].filename + contents = zf.read(fn).decode('utf-8') + return xml.etree.ElementTree.fromstring(contents) + + +def xml_of(message): + reports = [] + if message.is_multipart(): + for p in message.get_payload(): + if 'zip' in p.get_content_type(): + reports += [xml_of_part(p)] + else: + reports = [xml_of_part(message)] + return reports + +def extract_report(msg): + pmsg = email.message_from_bytes(msg) + return xml_of(pmsg) + +def maybe_strip(text): + if text: + return text.strip() + else: + return '' + +field_maps = {'./policy_published/adkim': {'pg_field_name': 'policy_published_adkim', + 'pg_table': 'reports', + 'pg_type': 'varchar'}, + './policy_published/aspf': {'pg_field_name': 'policy_published_aspf', + 'pg_table': 'reports', + 'pg_type': 'varchar'}, + './policy_published/domain': {'pg_field_name': 'policy_published_domain', + 'pg_table': 'reports', + 'pg_type': 'varchar'}, + './policy_published/p': {'pg_field_name': 'policy_published_p', + 'pg_table': 'reports', + 'pg_type': 'varchar'}, + './policy_published/pct': {'pg_field_name': 'policy_published_pct', + 'pg_table': 'reports', + 'pg_type': 'int'}, + './record[{}]/auth_results/dkim/domain': {'pg_field_name': 'auth_results_dkim_domain', + 'pg_table': 'report_items', + 'pg_type': 'varchar'}, + './record[{}]/auth_results/dkim/result': {'pg_field_name': 'auth_results_dkim_result', + 'pg_table': 'report_items', + 'pg_type': 'varchar'}, + './record[{}]/auth_results/spf/domain': {'pg_field_name': 'auth_results_spf_domain', + 'pg_table': 'report_items', + 'pg_type': 'varchar'}, + './record[{}]/auth_results/spf/result': {'pg_field_name': 'auth_results_spf_result', + 'pg_table': 'report_items', + 'pg_type': 'varchar'}, + './record[{}]/identifiers/header_from': {'pg_field_name': 'identifiers_header_from', + 'pg_table': 'report_items', + 'pg_type': 'varchar'}, + './record[{}]/row/count': {'pg_field_name': 'count', + 'pg_table': 'report_items', + 'pg_type': 'int'}, + './record[{}]/row/policy_evaluated/disposition': {'pg_field_name': 'policy_evaluated_disposition', + 'pg_table': 'report_items', + 'pg_type': 'varchar'}, + './record[{}]/row/policy_evaluated/dkim': {'pg_field_name': 'policy_evaluated_dkim', + 'pg_table': 'report_items', + 'pg_type': 'varchar'}, + './record[{}]/row/policy_evaluated/spf': {'pg_field_name': 'policy_evaluated_spf', + 'pg_table': 'report_items', + 'pg_type': 'varchar'}, + './record[{}]/row/source_ip': {'pg_field_name': 'source_ip', + 'pg_table': 'report_items', + 'pg_type': 'inet'}, + './report_metadata/date_range/begin': {'pg_field_name': 'report_metadata_date_range_begin', + 'pg_table': 'reports', + 'pg_type': 'timestamp'}, + './report_metadata/date_range/end': {'pg_field_name': 'report_metadata_date_range_end', + 'pg_table': 'reports', + 'pg_type': 'timestamp'}, + './report_metadata/email': {'pg_field_name': 'report_metadata_email', + 'pg_table': 'reports', + 'pg_type': 'varchar'}, + './report_metadata/org_name': {'pg_field_name': 'report_metadata_org_name', + 'pg_table': 'reports', + 'pg_type': 'varchar'}, + './report_metadata/report_id': {'pg_field_name': 'report_metadata_report_id', + 'pg_table': 'reports', + 'pg_type': 'varchar'}} + + + +def build_insert_command(table_name, report, preamble_values=None, i=None): + field_names = [] + if preamble_values: + values = preamble_values.copy() + else: + values = {} + for f in [f for f in field_maps if field_maps[f]['pg_table'] == table_name]: + if i: + fp = f.format(i) + else: + fp = f + field_names += [field_maps[f]['pg_field_name']] + if field_maps[f]['pg_type'] == 'int': + values[field_maps[f]['pg_field_name']] = int(report.find(fp).text) + elif field_maps[f]['pg_type'] == 'timestamp': + # values[field_maps[f]['pg_field_name']] = datetime.datetime.utcfromtimestamp(int(report.find(fp).text)) + values[field_maps[f]['pg_field_name']] = \ + datetime.datetime.fromtimestamp(int(report.find(fp).text), + tz=datetime.timezone.utc) + elif field_maps[f]['pg_type'] == 'inet': + values[field_maps[f]['pg_field_name']] = maybe_strip(report.find(fp).text) + else: + values[field_maps[f]['pg_field_name']] = maybe_strip(report.find(fp).text) + insert_string = 'insert into {} ('.format(table_name) + if preamble_values: + insert_string += ', '.join(sorted(preamble_values.keys())) + ', ' + insert_string += ', '.join(field_names) + ') ' + insert_string += 'values (' + if preamble_values: + insert_string += ', '.join('%({})s'.format(fn) for fn in sorted(preamble_values.keys())) + ', ' + insert_string += ', '.join('%({})s'.format(f) for f in field_names) + ');' + return insert_string, values + + +def write_report(connection, cursor, report): + insert_string, values = build_insert_command('reports', report) + # print(insert_string, values) + cursor.execute(insert_string, values) + + for i in range(1, len(report.findall('./record'))+1): + field_names = [] + cursor.execute('select id, report_metadata_report_id from reports where report_metadata_report_id = %s;', + [report.find('./report_metadata/report_id').text]) + results = cursor.fetchall() + if len(results) != 1: + raise RuntimeError('Could not find report record for report item') + else: + report_id = results[0][0] + insert_string, values = build_insert_command('report_items', report, i=i, + preamble_values={'report_id': report_id}) + # print(insert_string, values) + cursor.execute(insert_string, values) + connection.commit() + +config = configparser.ConfigParser() +config.read('dmarc.ini') + +conn = psycopg2.connect(host=config['database']['server'], + database=config['database']['database'], + user=config['database']['username'], + password=config['database']['password']) + +cur = conn.cursor() +cur.execute('select max(report_metadata_date_range_end) from reports') +results = cur.fetchall() +most_recent_date = results[0][0] + +mailbox = imaplib.IMAP4(host=config['imap']['server'], + port=config['imap']['port']) +mailbox.starttls() +mailbox.login(config['imap']['username'], config['imap']['password']) +mailbox.select('INBOX', readonly=True) + + +if most_recent_date: + mails_from = "SINCE " + (most_recent_date - datetime.timedelta(days=2)).strftime("%d-%b-%Y") +else: + mails_from = "ALL" +resp, nums = mailbox.uid('SEARCH', None, mails_from) + + +dmarc_reports = [report for report_set in [extract_report(fetch_msg(n)) for n in nums[0].split()] + for report in report_set] + +mailbox.close() +mailbox.logout() + +for report in dmarc_reports: + cur.execute('select id, report_metadata_report_id from reports where report_metadata_report_id = %s;', + [report.find('./report_metadata/report_id').text]) + results = cur.fetchall() + if not results: + print('write', report.find('./report_metadata/report_id').text) + write_report(conn, cur, report) + +conn.close() diff --git a/dmarc_to_database.py b/dmarc_to_database.py index 2fd2375..4624964 100644 --- a/dmarc_to_database.py +++ b/dmarc_to_database.py @@ -1,3 +1,5 @@ +#!/usr/bin/python3 + import configparser import imaplib import email @@ -155,13 +157,14 @@ def write_report(connection, cursor, report): config = configparser.ConfigParser() config.read('dmarc.ini') -with psycopg2.connect(host=config['database']['server'], +conn = psycopg2.connect(host=config['database']['server'], database=config['database']['database'], user=config['database']['username'], - password=config['database']['password']) as conn: - with conn.cursor() as cur: - cur.execute('select max(report_metadata_date_range_end) from reports') - results = cur.fetchall() + password=config['database']['password']) + +cur = conn.cursor() +cur.execute('select max(report_metadata_date_range_end) from reports') +results = cur.fetchall() most_recent_date = results[0][0] mailbox = imaplib.IMAP4(host=config['imap']['server'], @@ -184,19 +187,12 @@ dmarc_reports = [report for report_set in [extract_report(fetch_msg(n)) for n in mailbox.close() mailbox.logout() +for report in dmarc_reports: + cur.execute('select id, report_metadata_report_id from reports where report_metadata_report_id = %s;', + [report.find('./report_metadata/report_id').text]) + results = cur.fetchall() + if not results: + print('write', report.find('./report_metadata/report_id').text) + write_report(conn, cur, report) -with psycopg2.connect(host=config['database']['server'], - database=config['database']['database'], - user=config['database']['username'], - password=config['database']['password']) as conn: - with conn.cursor() as cur: - for report in dmarc_reports: - cur.execute('select id, report_metadata_report_id from reports where report_metadata_report_id = %s;', - [report.find('./report_metadata/report_id').text]) - results = cur.fetchall() - if not results: - print('write', report.find('./report_metadata/report_id').text) - write_report(conn, cur, report) - - - +conn.close() diff --git a/queries.ipynb b/queries.ipynb index b2a54d3..38b22c9 100644 --- a/queries.ipynb +++ b/queries.ipynb @@ -2,7 +2,7 @@ "cells": [ { "cell_type": "code", - "execution_count": 18, + "execution_count": 1, "metadata": { "collapsed": true }, @@ -22,7 +22,7 @@ }, { "cell_type": "code", - "execution_count": 5, + "execution_count": 2, "metadata": { "collapsed": false }, @@ -33,7 +33,7 @@ "['dmarc.ini']" ] }, - "execution_count": 5, + "execution_count": 2, "metadata": {}, "output_type": "execute_result" } @@ -45,7 +45,7 @@ }, { "cell_type": "code", - "execution_count": 31, + "execution_count": 3, "metadata": { "collapsed": false }, @@ -97,19 +97,21 @@ " (('google.com', 'softfail', 'pass', '146.185.136.235'), 'new.realms.co.uk'),\n", " (('google.com', 'pass', 'pass', '82.109.184.9'), 'clublloyds.com'),\n", " (('Yahoo! Inc.', 'softfail', 'pass', '65.20.0.12'), 'lb.lon5.cpcloud.co.uk'),\n", + " (('Yahoo! Inc.', 'softfail', 'pass', '65.20.0.12'), 'lb.lon5.cpcloud.co.uk'),\n", " (('Yahoo! Inc.', 'softfail', 'pass', '65.20.0.12'), 'lb.lon5.cpcloud.co.uk')]" ] }, - "execution_count": 31, + "execution_count": 3, "metadata": {}, "output_type": "execute_result" } ], "source": [ - "with psycopg2.connect(host=config['database']['server'],\n", + "conn = psycopg2.connect(host=config['database']['server'],\n", " database=config['database']['database'], \n", " user=config['database']['username'], \n", - " password=config['database']['password']) as conn:\n", + " password=config['database']['password'])\n", + "with conn:\n", " with conn.cursor() as cur:\n", " cur.execute(\"\"\"\n", " select report_metadata_org_name, auth_results_spf_result, auth_results_dkim_result, source_ip \n", @@ -122,7 +124,7 @@ }, { "cell_type": "code", - "execution_count": 32, + "execution_count": 4, "metadata": { "collapsed": false }, @@ -171,19 +173,17 @@ " (('google.com', 'softfail', 'pass', '146.185.136.235'), 'new.realms.co.uk'),\n", " (('google.com', 'softfail', 'pass', '146.185.136.235'), 'new.realms.co.uk'),\n", " (('Yahoo! Inc.', 'softfail', 'pass', '65.20.0.12'), 'lb.lon5.cpcloud.co.uk'),\n", + " (('Yahoo! Inc.', 'softfail', 'pass', '65.20.0.12'), 'lb.lon5.cpcloud.co.uk'),\n", " (('Yahoo! Inc.', 'softfail', 'pass', '65.20.0.12'), 'lb.lon5.cpcloud.co.uk')]" ] }, - "execution_count": 32, + "execution_count": 4, "metadata": {}, "output_type": "execute_result" } ], "source": [ - "with psycopg2.connect(host=config['database']['server'],\n", - " database=config['database']['database'], \n", - " user=config['database']['username'], \n", - " password=config['database']['password']) as conn:\n", + "with conn:\n", " with conn.cursor() as cur:\n", " cur.execute(\"\"\"\n", " select report_metadata_org_name, auth_results_spf_result, auth_results_dkim_result, source_ip \n", @@ -196,7 +196,7 @@ }, { "cell_type": "code", - "execution_count": 20, + "execution_count": 5, "metadata": { "collapsed": false }, @@ -207,7 +207,7 @@ "'lb.lon5.cpcloud.co.uk'" ] }, - "execution_count": 20, + "execution_count": 5, "metadata": {}, "output_type": "execute_result" } @@ -218,7 +218,7 @@ }, { "cell_type": "code", - "execution_count": 30, + "execution_count": 6, "metadata": { "collapsed": false }, @@ -313,10 +313,12 @@ " ('1458957186.548175',),\n", " ('1459129809.695034',),\n", " ('1459216304.582931',),\n", + " ('1459302353.261157',),\n", " ('14593873841710243963',),\n", " ('14661842628106423589',),\n", " ('14662396456930987863',),\n", " ('15111277194568576101',),\n", + " ('15185964531645951164',),\n", " ('15410706527896810898',),\n", " ('15497495941279624940',),\n", " ('15974729567081493290',),\n", @@ -374,6 +376,7 @@ " ('726a3261dfab4b4590b5fc898c561b08@hotmail.com',),\n", " ('730219275619457',),\n", " ('75eef2128eb84e9ca8e4837f3d4e31bd@hotmail.com',),\n", + " ('7773a696f4a54f1e8c01f4644fbb94ee@hotmail.com',),\n", " ('77b6c2aa32bf440aa240195db229cd4a@hotmail.com',),\n", " ('7815164892280952980',),\n", " ('7834597727856283739',),\n", @@ -404,16 +407,13 @@ " ('fc0750780e0d4b1395c4c9f41cb9791f@hotmail.com',)]" ] }, - "execution_count": 30, + "execution_count": 6, "metadata": {}, "output_type": "execute_result" } ], "source": [ - "with psycopg2.connect(host=config['database']['server'],\n", - " database=config['database']['database'], \n", - " user=config['database']['username'], \n", - " password=config['database']['password']) as conn:\n", + "with conn:\n", " with conn.cursor() as cur:\n", " cur.execute(\"\"\"\n", " select report_metadata_report_id\n", @@ -424,12 +424,14 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 7, "metadata": { "collapsed": true }, "outputs": [], - "source": [] + "source": [ + "conn.close()" + ] } ], "metadata": { -- 2.34.1