Added timezone to timestamps
authorNeil Smith <neil.git@njae.me.uk>
Fri, 1 Apr 2016 11:12:43 +0000 (12:12 +0100)
committerNeil Smith <neil.git@njae.me.uk>
Fri, 1 Apr 2016 11:12:43 +0000 (12:12 +0100)
complete.ipynb
dmarc_to_database [new file with mode: 0755]
dmarc_to_database.py
queries.ipynb

index bd272eb7ad705ecee8787fc5aa3eaaa633b1f3fb..735b78f70fc97f202cfdcdd90b64d3cb12b106e0 100644 (file)
    "metadata": {
     "collapsed": false
    },
+   "outputs": [],
+   "source": [
+    "conn = psycopg2.connect(host=config['database']['server'],\n",
+    "                        database=config['database']['database'], \n",
+    "                        user=config['database']['username'], \n",
+    "                        password=config['database']['password'])"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 13,
+   "metadata": {
+    "collapsed": false
+   },
    "outputs": [
     {
      "data": {
       "text/plain": [
-       "datetime.datetime(2016, 3, 29, 23, 59, 59)"
+       "datetime.datetime(2016, 3, 30, 23, 59, 59)"
       ]
      },
-     "execution_count": 12,
+     "execution_count": 13,
      "metadata": {},
      "output_type": "execute_result"
     }
    ],
    "source": [
-    "with psycopg2.connect(host=config['database']['server'],\n",
-    "                        database=config['database']['database'], \n",
-    "                        user=config['database']['username'], \n",
-    "                        password=config['database']['password']) as conn:\n",
+    "with conn:\n",
     "    with conn.cursor() as cur:\n",
     "        cur.execute('select max(report_metadata_date_range_end) from reports')\n",
     "        results = cur.fetchall()\n",
   },
   {
    "cell_type": "code",
-   "execution_count": 13,
+   "execution_count": 14,
    "metadata": {
     "collapsed": false
    },
     {
      "data": {
       "text/plain": [
-       "('OK', [b'178'])"
+       "('OK', [b'179'])"
       ]
      },
-     "execution_count": 13,
+     "execution_count": 14,
      "metadata": {},
      "output_type": "execute_result"
     }
   },
   {
    "cell_type": "code",
-   "execution_count": 14,
+   "execution_count": 15,
    "metadata": {
     "collapsed": false
    },
     {
      "data": {
       "text/plain": [
-       "('SINCE 27-Mar-2016', 'OK', [b'169 170 171 172 173 174 175 176 177 178 179'])"
+       "('SINCE 28-Mar-2016', 'OK', [b'172 173 174 175 176 177 178 179 180'])"
       ]
      },
-     "execution_count": 14,
+     "execution_count": 15,
      "metadata": {},
      "output_type": "execute_result"
     }
   },
   {
    "cell_type": "code",
-   "execution_count": 15,
+   "execution_count": 16,
    "metadata": {
     "collapsed": false
    },
     {
      "data": {
       "text/plain": [
-       "['1458957186.548175',\n",
-       " '2150510829392606201',\n",
-       " '68aad5080a774e2c997d159b546569b9@hotmail.com',\n",
-       " '1459129809.695034',\n",
+       "['1459129809.695034',\n",
        " '16143280651570354241',\n",
        " '8c177254c3cb41869dc3afab59f74c76@hotmail.com',\n",
        " '15410706527896810898',\n",
        " '1459216304.582931',\n",
        " '15497495941279624940',\n",
        " '1459302353.261157',\n",
-       " '7773a696f4a54f1e8c01f4644fbb94ee@hotmail.com']"
+       " '7773a696f4a54f1e8c01f4644fbb94ee@hotmail.com',\n",
+       " '15185964531645951164']"
       ]
      },
-     "execution_count": 15,
+     "execution_count": 16,
      "metadata": {},
      "output_type": "execute_result"
     }
   },
   {
    "cell_type": "code",
-   "execution_count": 16,
+   "execution_count": 17,
    "metadata": {
     "collapsed": false
    },
        "('BYE', [b'Logging out'])"
       ]
      },
-     "execution_count": 16,
+     "execution_count": 17,
      "metadata": {},
      "output_type": "execute_result"
     }
    "metadata": {
     "collapsed": false
    },
-   "outputs": [
-    {
-     "name": "stdout",
-     "output_type": "stream",
-     "text": [
-      "write 1459302353.261157\n",
-      "write 7773a696f4a54f1e8c01f4644fbb94ee@hotmail.com\n"
-     ]
-    }
-   ],
+   "outputs": [],
    "source": [
-    "with psycopg2.connect(host=config['database']['server'],\n",
-    "                        database=config['database']['database'], \n",
-    "                        user=config['database']['username'], \n",
-    "                        password=config['database']['password']) as conn:\n",
+    "with conn:\n",
     "    with conn.cursor() as cur:\n",
     "        for report in dmarc_reports:\n",
     "            cur.execute('select id, report_metadata_report_id from reports where report_metadata_report_id = %s;', \n",
   },
   {
    "cell_type": "code",
-   "execution_count": null,
+   "execution_count": 19,
    "metadata": {
     "collapsed": true
    },
    "outputs": [],
-   "source": []
+   "source": [
+    "conn.close()"
+   ]
   }
  ],
  "metadata": {
diff --git a/dmarc_to_database b/dmarc_to_database
new file mode 100755 (executable)
index 0000000..a8141fd
--- /dev/null
@@ -0,0 +1,201 @@
+#!/usr/bin/python3
+
+import configparser
+import imaplib
+import email
+import io
+import zipfile
+import xml.etree.ElementTree
+import psycopg2
+import re
+import datetime
+
+def fetch_msg(num):
+    return mailbox.uid('FETCH', num, '(RFC822)')[1][0][1]
+
+def xml_of_part(part):
+    with zipfile.ZipFile(io.BytesIO(part.get_payload(decode=True))) as zf:
+            fn = zf.infolist()[0].filename
+            contents = zf.read(fn).decode('utf-8')
+            return xml.etree.ElementTree.fromstring(contents)
+
+
+def xml_of(message):
+    reports = []
+    if message.is_multipart():
+        for p in message.get_payload():
+            if 'zip' in p.get_content_type():
+                reports += [xml_of_part(p)]
+    else:
+        reports = [xml_of_part(message)]
+    return reports
+
+def extract_report(msg):
+    pmsg = email.message_from_bytes(msg)
+    return xml_of(pmsg)
+
+def maybe_strip(text):
+    if text:
+        return text.strip()
+    else:
+        return ''
+
+field_maps = {'./policy_published/adkim': {'pg_field_name': 'policy_published_adkim',
+  'pg_table': 'reports',
+  'pg_type': 'varchar'},
+ './policy_published/aspf': {'pg_field_name': 'policy_published_aspf',
+  'pg_table': 'reports',
+  'pg_type': 'varchar'},
+ './policy_published/domain': {'pg_field_name': 'policy_published_domain',
+  'pg_table': 'reports',
+  'pg_type': 'varchar'},
+ './policy_published/p': {'pg_field_name': 'policy_published_p',
+  'pg_table': 'reports',
+  'pg_type': 'varchar'},
+ './policy_published/pct': {'pg_field_name': 'policy_published_pct',
+  'pg_table': 'reports',
+  'pg_type': 'int'},
+ './record[{}]/auth_results/dkim/domain': {'pg_field_name': 'auth_results_dkim_domain',
+  'pg_table': 'report_items',
+  'pg_type': 'varchar'},
+ './record[{}]/auth_results/dkim/result': {'pg_field_name': 'auth_results_dkim_result',
+  'pg_table': 'report_items',
+  'pg_type': 'varchar'},
+ './record[{}]/auth_results/spf/domain': {'pg_field_name': 'auth_results_spf_domain',
+  'pg_table': 'report_items',
+  'pg_type': 'varchar'},
+ './record[{}]/auth_results/spf/result': {'pg_field_name': 'auth_results_spf_result',
+  'pg_table': 'report_items',
+  'pg_type': 'varchar'},
+ './record[{}]/identifiers/header_from': {'pg_field_name': 'identifiers_header_from',
+  'pg_table': 'report_items',
+  'pg_type': 'varchar'},
+ './record[{}]/row/count': {'pg_field_name': 'count',
+  'pg_table': 'report_items',
+  'pg_type': 'int'},
+ './record[{}]/row/policy_evaluated/disposition': {'pg_field_name': 'policy_evaluated_disposition',
+  'pg_table': 'report_items',
+  'pg_type': 'varchar'},
+ './record[{}]/row/policy_evaluated/dkim': {'pg_field_name': 'policy_evaluated_dkim',
+  'pg_table': 'report_items',
+  'pg_type': 'varchar'},
+ './record[{}]/row/policy_evaluated/spf': {'pg_field_name': 'policy_evaluated_spf',
+  'pg_table': 'report_items',
+  'pg_type': 'varchar'},
+ './record[{}]/row/source_ip': {'pg_field_name': 'source_ip',
+  'pg_table': 'report_items',
+  'pg_type': 'inet'},
+ './report_metadata/date_range/begin': {'pg_field_name': 'report_metadata_date_range_begin',
+  'pg_table': 'reports',
+  'pg_type': 'timestamp'},
+ './report_metadata/date_range/end': {'pg_field_name': 'report_metadata_date_range_end',
+  'pg_table': 'reports',
+  'pg_type': 'timestamp'},
+ './report_metadata/email': {'pg_field_name': 'report_metadata_email',
+  'pg_table': 'reports',
+  'pg_type': 'varchar'},
+ './report_metadata/org_name': {'pg_field_name': 'report_metadata_org_name',
+  'pg_table': 'reports',
+  'pg_type': 'varchar'},
+ './report_metadata/report_id': {'pg_field_name': 'report_metadata_report_id',
+  'pg_table': 'reports',
+  'pg_type': 'varchar'}}
+
+
+
+def build_insert_command(table_name, report, preamble_values=None, i=None):
+    field_names = []
+    if preamble_values:
+        values = preamble_values.copy()
+    else:
+        values = {}
+    for f in [f for f in field_maps if field_maps[f]['pg_table'] == table_name]:
+        if i:
+            fp = f.format(i)
+        else:
+            fp = f
+        field_names += [field_maps[f]['pg_field_name']]
+        if field_maps[f]['pg_type'] == 'int':
+            values[field_maps[f]['pg_field_name']] = int(report.find(fp).text)
+        elif field_maps[f]['pg_type'] == 'timestamp':
+            # values[field_maps[f]['pg_field_name']] = datetime.datetime.utcfromtimestamp(int(report.find(fp).text))
+            values[field_maps[f]['pg_field_name']] = \
+                datetime.datetime.fromtimestamp(int(report.find(fp).text),  
+                    tz=datetime.timezone.utc)
+        elif field_maps[f]['pg_type'] == 'inet':
+            values[field_maps[f]['pg_field_name']] = maybe_strip(report.find(fp).text)
+        else:
+            values[field_maps[f]['pg_field_name']] = maybe_strip(report.find(fp).text)
+    insert_string = 'insert into {} ('.format(table_name)
+    if preamble_values:
+        insert_string += ', '.join(sorted(preamble_values.keys())) + ', '
+    insert_string += ', '.join(field_names) + ') '
+    insert_string += 'values ('
+    if preamble_values:
+        insert_string += ', '.join('%({})s'.format(fn) for fn in sorted(preamble_values.keys())) + ', '
+    insert_string += ', '.join('%({})s'.format(f) for f in field_names) + ');'
+    return insert_string, values
+
+
+def write_report(connection, cursor, report):
+    insert_string, values = build_insert_command('reports', report)
+    # print(insert_string, values)
+    cursor.execute(insert_string, values)
+    
+    for i in range(1, len(report.findall('./record'))+1):
+        field_names = []
+        cursor.execute('select id, report_metadata_report_id from reports where report_metadata_report_id = %s;', 
+            [report.find('./report_metadata/report_id').text])
+        results = cursor.fetchall()
+        if len(results) != 1:
+            raise RuntimeError('Could not find report record for report item')
+        else:
+            report_id = results[0][0]
+        insert_string, values = build_insert_command('report_items', report, i=i,
+                                                     preamble_values={'report_id': report_id})
+        # print(insert_string, values)
+        cursor.execute(insert_string, values)
+    connection.commit()
+
+config = configparser.ConfigParser()
+config.read('dmarc.ini')
+
+conn = psycopg2.connect(host=config['database']['server'],
+                        database=config['database']['database'], 
+                        user=config['database']['username'], 
+                        password=config['database']['password']) 
+
+cur = conn.cursor()
+cur.execute('select max(report_metadata_date_range_end) from reports')
+results = cur.fetchall()
+most_recent_date = results[0][0]
+
+mailbox = imaplib.IMAP4(host=config['imap']['server'], 
+                      port=config['imap']['port'])
+mailbox.starttls()
+mailbox.login(config['imap']['username'], config['imap']['password'])
+mailbox.select('INBOX', readonly=True)
+
+
+if most_recent_date:
+    mails_from = "SINCE " + (most_recent_date - datetime.timedelta(days=2)).strftime("%d-%b-%Y")
+else:
+    mails_from = "ALL"
+resp, nums = mailbox.uid('SEARCH', None, mails_from)
+
+
+dmarc_reports = [report for report_set in [extract_report(fetch_msg(n)) for n in nums[0].split()]
+                for report in report_set]
+
+mailbox.close()
+mailbox.logout()
+
+for report in dmarc_reports:
+    cur.execute('select id, report_metadata_report_id from reports where report_metadata_report_id = %s;', 
+        [report.find('./report_metadata/report_id').text])
+    results = cur.fetchall()
+    if not results:
+        print('write', report.find('./report_metadata/report_id').text)
+        write_report(conn, cur, report)
+
+conn.close()
index 2fd237568b03a466ac5ed2f16d56215344d8ce23..46249649141088ccf310859496d7f36ac9a41326 100644 (file)
@@ -1,3 +1,5 @@
+#!/usr/bin/python3
+
 import configparser
 import imaplib
 import email
@@ -155,13 +157,14 @@ def write_report(connection, cursor, report):
 config = configparser.ConfigParser()
 config.read('dmarc.ini')
 
-with psycopg2.connect(host=config['database']['server'],
+conn = psycopg2.connect(host=config['database']['server'],
                         database=config['database']['database'], 
                         user=config['database']['username'], 
-                        password=config['database']['password']) as conn:
-    with conn.cursor() as cur:
-        cur.execute('select max(report_metadata_date_range_end) from reports')
-        results = cur.fetchall()
+                        password=config['database']['password']) 
+
+cur = conn.cursor()
+cur.execute('select max(report_metadata_date_range_end) from reports')
+results = cur.fetchall()
 most_recent_date = results[0][0]
 
 mailbox = imaplib.IMAP4(host=config['imap']['server'], 
@@ -184,19 +187,12 @@ dmarc_reports = [report for report_set in [extract_report(fetch_msg(n)) for n in
 mailbox.close()
 mailbox.logout()
 
+for report in dmarc_reports:
+    cur.execute('select id, report_metadata_report_id from reports where report_metadata_report_id = %s;', 
+        [report.find('./report_metadata/report_id').text])
+    results = cur.fetchall()
+    if not results:
+        print('write', report.find('./report_metadata/report_id').text)
+        write_report(conn, cur, report)
 
-with psycopg2.connect(host=config['database']['server'],
-                        database=config['database']['database'], 
-                        user=config['database']['username'], 
-                        password=config['database']['password']) as conn:
-    with conn.cursor() as cur:
-        for report in dmarc_reports:
-            cur.execute('select id, report_metadata_report_id from reports where report_metadata_report_id = %s;', 
-                [report.find('./report_metadata/report_id').text])
-            results = cur.fetchall()
-            if not results:
-                print('write', report.find('./report_metadata/report_id').text)
-                write_report(conn, cur, report)
-
-
-
+conn.close()
index b2a54d3648264bb124b9f9ab86c01eb609276e8c..38b22c920a3376b160c59743880c13690b930895 100644 (file)
@@ -2,7 +2,7 @@
  "cells": [
   {
    "cell_type": "code",
-   "execution_count": 18,
+   "execution_count": 1,
    "metadata": {
     "collapsed": true
    },
@@ -22,7 +22,7 @@
   },
   {
    "cell_type": "code",
-   "execution_count": 5,
+   "execution_count": 2,
    "metadata": {
     "collapsed": false
    },
@@ -33,7 +33,7 @@
        "['dmarc.ini']"
       ]
      },
-     "execution_count": 5,
+     "execution_count": 2,
      "metadata": {},
      "output_type": "execute_result"
     }
@@ -45,7 +45,7 @@
   },
   {
    "cell_type": "code",
-   "execution_count": 31,
+   "execution_count": 3,
    "metadata": {
     "collapsed": false
    },
        " (('google.com', 'softfail', 'pass', '146.185.136.235'), 'new.realms.co.uk'),\n",
        " (('google.com', 'pass', 'pass', '82.109.184.9'), 'clublloyds.com'),\n",
        " (('Yahoo! Inc.', 'softfail', 'pass', '65.20.0.12'), 'lb.lon5.cpcloud.co.uk'),\n",
+       " (('Yahoo! Inc.', 'softfail', 'pass', '65.20.0.12'), 'lb.lon5.cpcloud.co.uk'),\n",
        " (('Yahoo! Inc.', 'softfail', 'pass', '65.20.0.12'), 'lb.lon5.cpcloud.co.uk')]"
       ]
      },
-     "execution_count": 31,
+     "execution_count": 3,
      "metadata": {},
      "output_type": "execute_result"
     }
    ],
    "source": [
-    "with psycopg2.connect(host=config['database']['server'],\n",
+    "conn = psycopg2.connect(host=config['database']['server'],\n",
     "                        database=config['database']['database'], \n",
     "                        user=config['database']['username'], \n",
-    "                        password=config['database']['password']) as conn:\n",
+    "                        password=config['database']['password'])\n",
+    "with conn:\n",
     "    with conn.cursor() as cur:\n",
     "        cur.execute(\"\"\"\n",
     "        select report_metadata_org_name, auth_results_spf_result, auth_results_dkim_result, source_ip \n",
   },
   {
    "cell_type": "code",
-   "execution_count": 32,
+   "execution_count": 4,
    "metadata": {
     "collapsed": false
    },
        " (('google.com', 'softfail', 'pass', '146.185.136.235'), 'new.realms.co.uk'),\n",
        " (('google.com', 'softfail', 'pass', '146.185.136.235'), 'new.realms.co.uk'),\n",
        " (('Yahoo! Inc.', 'softfail', 'pass', '65.20.0.12'), 'lb.lon5.cpcloud.co.uk'),\n",
+       " (('Yahoo! Inc.', 'softfail', 'pass', '65.20.0.12'), 'lb.lon5.cpcloud.co.uk'),\n",
        " (('Yahoo! Inc.', 'softfail', 'pass', '65.20.0.12'), 'lb.lon5.cpcloud.co.uk')]"
       ]
      },
-     "execution_count": 32,
+     "execution_count": 4,
      "metadata": {},
      "output_type": "execute_result"
     }
    ],
    "source": [
-    "with psycopg2.connect(host=config['database']['server'],\n",
-    "                        database=config['database']['database'], \n",
-    "                        user=config['database']['username'], \n",
-    "                        password=config['database']['password']) as conn:\n",
+    "with conn:\n",
     "    with conn.cursor() as cur:\n",
     "        cur.execute(\"\"\"\n",
     "        select report_metadata_org_name, auth_results_spf_result, auth_results_dkim_result, source_ip \n",
   },
   {
    "cell_type": "code",
-   "execution_count": 20,
+   "execution_count": 5,
    "metadata": {
     "collapsed": false
    },
        "'lb.lon5.cpcloud.co.uk'"
       ]
      },
-     "execution_count": 20,
+     "execution_count": 5,
      "metadata": {},
      "output_type": "execute_result"
     }
   },
   {
    "cell_type": "code",
-   "execution_count": 30,
+   "execution_count": 6,
    "metadata": {
     "collapsed": false
    },
        " ('1458957186.548175',),\n",
        " ('1459129809.695034',),\n",
        " ('1459216304.582931',),\n",
+       " ('1459302353.261157',),\n",
        " ('14593873841710243963',),\n",
        " ('14661842628106423589',),\n",
        " ('14662396456930987863',),\n",
        " ('15111277194568576101',),\n",
+       " ('15185964531645951164',),\n",
        " ('15410706527896810898',),\n",
        " ('15497495941279624940',),\n",
        " ('15974729567081493290',),\n",
        " ('726a3261dfab4b4590b5fc898c561b08@hotmail.com',),\n",
        " ('730219275619457',),\n",
        " ('75eef2128eb84e9ca8e4837f3d4e31bd@hotmail.com',),\n",
+       " ('7773a696f4a54f1e8c01f4644fbb94ee@hotmail.com',),\n",
        " ('77b6c2aa32bf440aa240195db229cd4a@hotmail.com',),\n",
        " ('7815164892280952980',),\n",
        " ('7834597727856283739',),\n",
        " ('fc0750780e0d4b1395c4c9f41cb9791f@hotmail.com',)]"
       ]
      },
-     "execution_count": 30,
+     "execution_count": 6,
      "metadata": {},
      "output_type": "execute_result"
     }
    ],
    "source": [
-    "with psycopg2.connect(host=config['database']['server'],\n",
-    "                        database=config['database']['database'], \n",
-    "                        user=config['database']['username'], \n",
-    "                        password=config['database']['password']) as conn:\n",
+    "with conn:\n",
     "    with conn.cursor() as cur:\n",
     "        cur.execute(\"\"\"\n",
     "        select report_metadata_report_id\n",
   },
   {
    "cell_type": "code",
-   "execution_count": null,
+   "execution_count": 7,
    "metadata": {
     "collapsed": true
    },
    "outputs": [],
-   "source": []
+   "source": [
+    "conn.close()"
+   ]
   }
  ],
  "metadata": {