{ "cells": [ { "cell_type": "code", "execution_count": 1, "metadata": { "collapsed": true }, "outputs": [], "source": [ "import configparser\n", "import imaplib\n", "import email\n", "import io\n", "import zipfile\n", "import xml.etree.ElementTree\n", "import psycopg2\n", "import re\n", "import datetime" ] }, { "cell_type": "code", "execution_count": 2, "metadata": { "collapsed": true }, "outputs": [], "source": [ "def fetch_msg(num):\n", " return mailbox.uid('FETCH', num, '(RFC822)')[1][0][1]" ] }, { "cell_type": "code", "execution_count": 3, "metadata": { "collapsed": true }, "outputs": [], "source": [ "def xml_of_part(part):\n", " with zipfile.ZipFile(io.BytesIO(part.get_payload(decode=True))) as zf:\n", " fn = zf.infolist()[0].filename\n", " contents = zf.read(fn).decode('utf-8')\n", " return xml.etree.ElementTree.fromstring(contents)" ] }, { "cell_type": "code", "execution_count": 4, "metadata": { "collapsed": true }, "outputs": [], "source": [ "def xml_of(message):\n", " reports = []\n", " if message.is_multipart():\n", " for p in message.get_payload():\n", " if 'zip' in p.get_content_type():\n", " reports += [xml_of_part(p)]\n", " else:\n", " reports = [xml_of_part(message)]\n", " return reports" ] }, { "cell_type": "code", "execution_count": 5, "metadata": { "collapsed": true }, "outputs": [], "source": [ "def extract_report(msg):\n", " pmsg = email.message_from_bytes(msg)\n", " return xml_of(pmsg)" ] }, { "cell_type": "code", "execution_count": 6, "metadata": { "collapsed": true }, "outputs": [], "source": [ "def maybe_strip(text):\n", " if text:\n", " return text.strip()\n", " else:\n", " return ''" ] }, { "cell_type": "code", "execution_count": 7, "metadata": { "collapsed": true }, "outputs": [], "source": [ "field_maps = {'./policy_published/adkim': {'pg_field_name': 'policy_published_adkim',\n", " 'pg_table': 'reports',\n", " 'pg_type': 'varchar'},\n", " './policy_published/aspf': {'pg_field_name': 'policy_published_aspf',\n", " 'pg_table': 'reports',\n", " 'pg_type': 'varchar'},\n", " './policy_published/domain': {'pg_field_name': 'policy_published_domain',\n", " 'pg_table': 'reports',\n", " 'pg_type': 'varchar'},\n", " './policy_published/p': {'pg_field_name': 'policy_published_p',\n", " 'pg_table': 'reports',\n", " 'pg_type': 'varchar'},\n", " './policy_published/pct': {'pg_field_name': 'policy_published_pct',\n", " 'pg_table': 'reports',\n", " 'pg_type': 'int'},\n", " './record[{}]/auth_results/dkim/domain': {'pg_field_name': 'auth_results_dkim_domain',\n", " 'pg_table': 'report_items',\n", " 'pg_type': 'varchar'},\n", " './record[{}]/auth_results/dkim/result': {'pg_field_name': 'auth_results_dkim_result',\n", " 'pg_table': 'report_items',\n", " 'pg_type': 'varchar'},\n", " './record[{}]/auth_results/spf/domain': {'pg_field_name': 'auth_results_spf_domain',\n", " 'pg_table': 'report_items',\n", " 'pg_type': 'varchar'},\n", " './record[{}]/auth_results/spf/result': {'pg_field_name': 'auth_results_spf_result',\n", " 'pg_table': 'report_items',\n", " 'pg_type': 'varchar'},\n", " './record[{}]/identifiers/header_from': {'pg_field_name': 'identifiers_header_from',\n", " 'pg_table': 'report_items',\n", " 'pg_type': 'varchar'},\n", " './record[{}]/row/count': {'pg_field_name': 'count',\n", " 'pg_table': 'report_items',\n", " 'pg_type': 'int'},\n", " './record[{}]/row/policy_evaluated/disposition': {'pg_field_name': 'policy_evaluated_disposition',\n", " 'pg_table': 'report_items',\n", " 'pg_type': 'varchar'},\n", " './record[{}]/row/policy_evaluated/dkim': {'pg_field_name': 'policy_evaluated_dkim',\n", " 'pg_table': 'report_items',\n", " 'pg_type': 'varchar'},\n", " './record[{}]/row/policy_evaluated/spf': {'pg_field_name': 'policy_evaluated_spf',\n", " 'pg_table': 'report_items',\n", " 'pg_type': 'varchar'},\n", " './record[{}]/row/source_ip': {'pg_field_name': 'source_ip',\n", " 'pg_table': 'report_items',\n", " 'pg_type': 'inet'},\n", " './report_metadata/date_range/begin': {'pg_field_name': 'report_metadata_date_range_begin',\n", " 'pg_table': 'reports',\n", " 'pg_type': 'timestamp'},\n", " './report_metadata/date_range/end': {'pg_field_name': 'report_metadata_date_range_end',\n", " 'pg_table': 'reports',\n", " 'pg_type': 'timestamp'},\n", " './report_metadata/email': {'pg_field_name': 'report_metadata_email',\n", " 'pg_table': 'reports',\n", " 'pg_type': 'varchar'},\n", " './report_metadata/org_name': {'pg_field_name': 'report_metadata_org_name',\n", " 'pg_table': 'reports',\n", " 'pg_type': 'varchar'},\n", " './report_metadata/report_id': {'pg_field_name': 'report_metadata_report_id',\n", " 'pg_table': 'reports',\n", " 'pg_type': 'varchar'}}" ] }, { "cell_type": "code", "execution_count": 8, "metadata": { "collapsed": true }, "outputs": [], "source": [ "def build_insert_command(table_name, report, preamble_values=None, i=None):\n", " field_names = []\n", " if preamble_values:\n", " values = preamble_values.copy()\n", " else:\n", " values = {}\n", " for f in [f for f in field_maps if field_maps[f]['pg_table'] == table_name]:\n", " # print(f)\n", " if i:\n", " fp = f.format(i)\n", " else:\n", " fp = f\n", " field_names += [field_maps[f]['pg_field_name']]\n", " if field_maps[f]['pg_type'] == 'int':\n", " values[field_maps[f]['pg_field_name']] = int(report.find(fp).text)\n", " elif field_maps[f]['pg_type'] == 'timestamp':\n", " values[field_maps[f]['pg_field_name']] = datetime.datetime.utcfromtimestamp(int(report.find(fp).text))\n", " elif field_maps[f]['pg_type'] == 'inet':\n", " values[field_maps[f]['pg_field_name']] = maybe_strip(report.find(fp).text)\n", " else:\n", " values[field_maps[f]['pg_field_name']] = maybe_strip(report.find(fp).text)\n", " insert_string = 'insert into {} ('.format(table_name)\n", " if preamble_values:\n", " insert_string += ', '.join(sorted(preamble_values.keys())) + ', '\n", " insert_string += ', '.join(field_names) + ') '\n", " insert_string += 'values ('\n", " if preamble_values:\n", " insert_string += ', '.join('%({})s'.format(fn) for fn in sorted(preamble_values.keys())) + ', '\n", " insert_string += ', '.join('%({})s'.format(f) for f in field_names) + ');'\n", " return insert_string, values" ] }, { "cell_type": "code", "execution_count": 9, "metadata": { "collapsed": true }, "outputs": [], "source": [ "def write_report(connection, cursor, report):\n", " insert_string, values = build_insert_command('reports', report)\n", " # print(insert_string, values)\n", " cursor.execute(insert_string, values)\n", " \n", " for i in range(1, len(report.findall('./record'))+1):\n", " field_names = []\n", " cursor.execute('select id, report_metadata_report_id from reports where report_metadata_report_id = %s;', \n", " [report.find('./report_metadata/report_id').text])\n", " results = cursor.fetchall()\n", " if len(results) != 1:\n", " raise RuntimeError('Could not find report record for report item')\n", " else:\n", " report_id = results[0][0]\n", " insert_string, values = build_insert_command('report_items', report, i=i,\n", " preamble_values={'report_id': report_id})\n", " # print(insert_string, values)\n", " cursor.execute(insert_string, values)\n", " connection.commit()" ] }, { "cell_type": "code", "execution_count": 10, "metadata": { "collapsed": true }, "outputs": [], "source": [ "def write_report_old(connection, cursor, report):\n", " \n", " field_names = []\n", " values = {}\n", " for f in [f for f in field_maps if field_maps[f]['pg_table'] == 'reports']:\n", " field_names += [field_maps[f]['pg_field_name']]\n", " if field_maps[f]['pg_type'] == 'int':\n", " values[field_maps[f]['pg_field_name']] = int(report.find(f).text)\n", " elif field_maps[f]['pg_type'] == 'timestamp':\n", " values[field_maps[f]['pg_field_name']] = datetime.datetime.utcfromtimestamp(int(report.find(f).text))\n", " elif field_maps[f]['pg_type'] == 'inet':\n", " values[field_maps[f]['pg_field_name']] = maybe_strip(report.find(f).text)\n", " else:\n", " values[field_maps[f]['pg_field_name']] = maybe_strip(report.find(f).text)\n", " insert_string = 'insert into reports (' + ', '.join(field_names) + ') '\n", " insert_string += 'values (' + ', '.join('%({})s'.format(f) for f in field_names) + ');'\n", " # print(insert_string, values)\n", " cursor.execute(insert_string, values)\n", " \n", " for i in range(1, len(report.findall('./record'))+1):\n", " field_names = []\n", " cursor.execute('select id, report_metadata_report_id from reports where report_metadata_report_id = %s;', \n", " [report.find('./report_metadata/report_id').text])\n", " results = cursor.fetchall()\n", " if len(results) != 1:\n", " raise\n", " else:\n", " report_id = results[0][0]\n", " values = {'report_id': report_id}\n", " for f in [f for f in field_maps if field_maps[f]['pg_table'] == 'report_items']:\n", " field_names += [field_maps[f]['pg_field_name']]\n", " if field_maps[f]['pg_type'] == 'int':\n", " values[field_maps[f]['pg_field_name']] = int(report.find(f.format(i)).text)\n", " elif field_maps[f]['pg_type'] == 'timestamp':\n", " values[field_maps[f]['pg_field_name']] = datetime.datetime.utcfromtimestamp(int(report.find(f.format(i)).text))\n", " elif field_maps[f]['pg_type'] == 'inet':\n", " values[field_maps[f]['pg_field_name']] = maybe_strip(report.find(f.format(i)).text)\n", " else:\n", " values[field_maps[f]['pg_field_name']] = maybe_strip(report.find(f.format(i)).text)\n", " insert_string = 'insert into report_items (report_id, ' + ', '.join(field_names) + ') '\n", " insert_string += 'values (%(report_id)s, ' + ', '.join('%({})s'.format(f) for f in field_names) + ');'\n", " # print(insert_string, values)\n", " cursor.execute(insert_string, values)\n", " connection.commit()" ] }, { "cell_type": "code", "execution_count": 11, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "['dmarc.ini']" ] }, "execution_count": 11, "metadata": {}, "output_type": "execute_result" } ], "source": [ "config = configparser.ConfigParser()\n", "config.read('dmarc.ini')" ] }, { "cell_type": "code", "execution_count": 12, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "datetime.datetime(2016, 3, 29, 23, 59, 59)" ] }, "execution_count": 12, "metadata": {}, "output_type": "execute_result" } ], "source": [ "with psycopg2.connect(host=config['database']['server'],\n", " database=config['database']['database'], \n", " user=config['database']['username'], \n", " password=config['database']['password']) as conn:\n", " with conn.cursor() as cur:\n", " cur.execute('select max(report_metadata_date_range_end) from reports')\n", " results = cur.fetchall()\n", "most_recent_date = results[0][0]\n", "most_recent_date" ] }, { "cell_type": "code", "execution_count": 13, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "('OK', [b'178'])" ] }, "execution_count": 13, "metadata": {}, "output_type": "execute_result" } ], "source": [ "mailbox = imaplib.IMAP4(host=config['imap']['server'], \n", " port=config['imap']['port'])\n", "mailbox.starttls()\n", "mailbox.login(config['imap']['username'], config['imap']['password'])\n", "mailbox.select('INBOX', readonly=True)" ] }, { "cell_type": "code", "execution_count": 14, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "('SINCE 27-Mar-2016', 'OK', [b'169 170 171 172 173 174 175 176 177 178 179'])" ] }, "execution_count": 14, "metadata": {}, "output_type": "execute_result" } ], "source": [ "if most_recent_date:\n", " mails_from = \"SINCE \" + (most_recent_date - datetime.timedelta(days=2)).strftime(\"%d-%b-%Y\")\n", "else:\n", " mails_from = \"ALL\"\n", "resp, nums = mailbox.uid('SEARCH', None, mails_from)\n", "mails_from, resp, nums" ] }, { "cell_type": "code", "execution_count": 15, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "['1458957186.548175',\n", " '2150510829392606201',\n", " '68aad5080a774e2c997d159b546569b9@hotmail.com',\n", " '1459129809.695034',\n", " '16143280651570354241',\n", " '8c177254c3cb41869dc3afab59f74c76@hotmail.com',\n", " '15410706527896810898',\n", " '1459216304.582931',\n", " '15497495941279624940',\n", " '1459302353.261157',\n", " '7773a696f4a54f1e8c01f4644fbb94ee@hotmail.com']" ] }, "execution_count": 15, "metadata": {}, "output_type": "execute_result" } ], "source": [ "dmarc_reports = [report for report_set in [extract_report(fetch_msg(n)) for n in nums[0].split()]\n", " for report in report_set]\n", "[r.find('./report_metadata/report_id').text for r in dmarc_reports]" ] }, { "cell_type": "code", "execution_count": 16, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "('BYE', [b'Logging out'])" ] }, "execution_count": 16, "metadata": {}, "output_type": "execute_result" } ], "source": [ "mailbox.close()\n", "mailbox.logout()" ] }, { "cell_type": "code", "execution_count": 18, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "write 1459302353.261157\n", "write 7773a696f4a54f1e8c01f4644fbb94ee@hotmail.com\n" ] } ], "source": [ "with psycopg2.connect(host=config['database']['server'],\n", " database=config['database']['database'], \n", " user=config['database']['username'], \n", " password=config['database']['password']) as conn:\n", " with conn.cursor() as cur:\n", " for report in dmarc_reports:\n", " cur.execute('select id, report_metadata_report_id from reports where report_metadata_report_id = %s;', \n", " [report.find('./report_metadata/report_id').text])\n", " results = cur.fetchall()\n", " if not results:\n", " print('write', report.find('./report_metadata/report_id').text)\n", " write_report(conn, cur, report)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.4.3+" } }, "nbformat": 4, "nbformat_minor": 0 }