Tidied, created stand-alone script
[dmarc.git] / complete.ipynb
1 {
2 "cells": [
3 {
4 "cell_type": "code",
5 "execution_count": 1,
6 "metadata": {
7 "collapsed": true
8 },
9 "outputs": [],
10 "source": [
11 "import configparser\n",
12 "import imaplib\n",
13 "import email\n",
14 "import io\n",
15 "import zipfile\n",
16 "import xml.etree.ElementTree\n",
17 "import psycopg2\n",
18 "import re\n",
19 "import datetime"
20 ]
21 },
22 {
23 "cell_type": "code",
24 "execution_count": 2,
25 "metadata": {
26 "collapsed": true
27 },
28 "outputs": [],
29 "source": [
30 "def fetch_msg(num):\n",
31 " return mailbox.uid('FETCH', num, '(RFC822)')[1][0][1]"
32 ]
33 },
34 {
35 "cell_type": "code",
36 "execution_count": 3,
37 "metadata": {
38 "collapsed": true
39 },
40 "outputs": [],
41 "source": [
42 "def xml_of_part(part):\n",
43 " with zipfile.ZipFile(io.BytesIO(part.get_payload(decode=True))) as zf:\n",
44 " fn = zf.infolist()[0].filename\n",
45 " contents = zf.read(fn).decode('utf-8')\n",
46 " return xml.etree.ElementTree.fromstring(contents)"
47 ]
48 },
49 {
50 "cell_type": "code",
51 "execution_count": 4,
52 "metadata": {
53 "collapsed": true
54 },
55 "outputs": [],
56 "source": [
57 "def xml_of(message):\n",
58 " reports = []\n",
59 " if message.is_multipart():\n",
60 " for p in message.get_payload():\n",
61 " if 'zip' in p.get_content_type():\n",
62 " reports += [xml_of_part(p)]\n",
63 " else:\n",
64 " reports = [xml_of_part(message)]\n",
65 " return reports"
66 ]
67 },
68 {
69 "cell_type": "code",
70 "execution_count": 5,
71 "metadata": {
72 "collapsed": true
73 },
74 "outputs": [],
75 "source": [
76 "def extract_report(msg):\n",
77 " pmsg = email.message_from_bytes(msg)\n",
78 " return xml_of(pmsg)"
79 ]
80 },
81 {
82 "cell_type": "code",
83 "execution_count": 6,
84 "metadata": {
85 "collapsed": true
86 },
87 "outputs": [],
88 "source": [
89 "def maybe_strip(text):\n",
90 " if text:\n",
91 " return text.strip()\n",
92 " else:\n",
93 " return ''"
94 ]
95 },
96 {
97 "cell_type": "code",
98 "execution_count": 7,
99 "metadata": {
100 "collapsed": true
101 },
102 "outputs": [],
103 "source": [
104 "field_maps = {'./policy_published/adkim': {'pg_field_name': 'policy_published_adkim',\n",
105 " 'pg_table': 'reports',\n",
106 " 'pg_type': 'varchar'},\n",
107 " './policy_published/aspf': {'pg_field_name': 'policy_published_aspf',\n",
108 " 'pg_table': 'reports',\n",
109 " 'pg_type': 'varchar'},\n",
110 " './policy_published/domain': {'pg_field_name': 'policy_published_domain',\n",
111 " 'pg_table': 'reports',\n",
112 " 'pg_type': 'varchar'},\n",
113 " './policy_published/p': {'pg_field_name': 'policy_published_p',\n",
114 " 'pg_table': 'reports',\n",
115 " 'pg_type': 'varchar'},\n",
116 " './policy_published/pct': {'pg_field_name': 'policy_published_pct',\n",
117 " 'pg_table': 'reports',\n",
118 " 'pg_type': 'int'},\n",
119 " './record[{}]/auth_results/dkim/domain': {'pg_field_name': 'auth_results_dkim_domain',\n",
120 " 'pg_table': 'report_items',\n",
121 " 'pg_type': 'varchar'},\n",
122 " './record[{}]/auth_results/dkim/result': {'pg_field_name': 'auth_results_dkim_result',\n",
123 " 'pg_table': 'report_items',\n",
124 " 'pg_type': 'varchar'},\n",
125 " './record[{}]/auth_results/spf/domain': {'pg_field_name': 'auth_results_spf_domain',\n",
126 " 'pg_table': 'report_items',\n",
127 " 'pg_type': 'varchar'},\n",
128 " './record[{}]/auth_results/spf/result': {'pg_field_name': 'auth_results_spf_result',\n",
129 " 'pg_table': 'report_items',\n",
130 " 'pg_type': 'varchar'},\n",
131 " './record[{}]/identifiers/header_from': {'pg_field_name': 'identifiers_header_from',\n",
132 " 'pg_table': 'report_items',\n",
133 " 'pg_type': 'varchar'},\n",
134 " './record[{}]/row/count': {'pg_field_name': 'count',\n",
135 " 'pg_table': 'report_items',\n",
136 " 'pg_type': 'int'},\n",
137 " './record[{}]/row/policy_evaluated/disposition': {'pg_field_name': 'policy_evaluated_disposition',\n",
138 " 'pg_table': 'report_items',\n",
139 " 'pg_type': 'varchar'},\n",
140 " './record[{}]/row/policy_evaluated/dkim': {'pg_field_name': 'policy_evaluated_dkim',\n",
141 " 'pg_table': 'report_items',\n",
142 " 'pg_type': 'varchar'},\n",
143 " './record[{}]/row/policy_evaluated/spf': {'pg_field_name': 'policy_evaluated_spf',\n",
144 " 'pg_table': 'report_items',\n",
145 " 'pg_type': 'varchar'},\n",
146 " './record[{}]/row/source_ip': {'pg_field_name': 'source_ip',\n",
147 " 'pg_table': 'report_items',\n",
148 " 'pg_type': 'inet'},\n",
149 " './report_metadata/date_range/begin': {'pg_field_name': 'report_metadata_date_range_begin',\n",
150 " 'pg_table': 'reports',\n",
151 " 'pg_type': 'timestamp'},\n",
152 " './report_metadata/date_range/end': {'pg_field_name': 'report_metadata_date_range_end',\n",
153 " 'pg_table': 'reports',\n",
154 " 'pg_type': 'timestamp'},\n",
155 " './report_metadata/email': {'pg_field_name': 'report_metadata_email',\n",
156 " 'pg_table': 'reports',\n",
157 " 'pg_type': 'varchar'},\n",
158 " './report_metadata/org_name': {'pg_field_name': 'report_metadata_org_name',\n",
159 " 'pg_table': 'reports',\n",
160 " 'pg_type': 'varchar'},\n",
161 " './report_metadata/report_id': {'pg_field_name': 'report_metadata_report_id',\n",
162 " 'pg_table': 'reports',\n",
163 " 'pg_type': 'varchar'}}"
164 ]
165 },
166 {
167 "cell_type": "code",
168 "execution_count": 8,
169 "metadata": {
170 "collapsed": true
171 },
172 "outputs": [],
173 "source": [
174 "def build_insert_command(table_name, report, preamble_values=None, i=None):\n",
175 " field_names = []\n",
176 " if preamble_values:\n",
177 " values = preamble_values.copy()\n",
178 " else:\n",
179 " values = {}\n",
180 " for f in [f for f in field_maps if field_maps[f]['pg_table'] == table_name]:\n",
181 " # print(f)\n",
182 " if i:\n",
183 " fp = f.format(i)\n",
184 " else:\n",
185 " fp = f\n",
186 " field_names += [field_maps[f]['pg_field_name']]\n",
187 " if field_maps[f]['pg_type'] == 'int':\n",
188 " values[field_maps[f]['pg_field_name']] = int(report.find(fp).text)\n",
189 " elif field_maps[f]['pg_type'] == 'timestamp':\n",
190 " values[field_maps[f]['pg_field_name']] = datetime.datetime.utcfromtimestamp(int(report.find(fp).text))\n",
191 " elif field_maps[f]['pg_type'] == 'inet':\n",
192 " values[field_maps[f]['pg_field_name']] = maybe_strip(report.find(fp).text)\n",
193 " else:\n",
194 " values[field_maps[f]['pg_field_name']] = maybe_strip(report.find(fp).text)\n",
195 " insert_string = 'insert into {} ('.format(table_name)\n",
196 " if preamble_values:\n",
197 " insert_string += ', '.join(sorted(preamble_values.keys())) + ', '\n",
198 " insert_string += ', '.join(field_names) + ') '\n",
199 " insert_string += 'values ('\n",
200 " if preamble_values:\n",
201 " insert_string += ', '.join('%({})s'.format(fn) for fn in sorted(preamble_values.keys())) + ', '\n",
202 " insert_string += ', '.join('%({})s'.format(f) for f in field_names) + ');'\n",
203 " return insert_string, values"
204 ]
205 },
206 {
207 "cell_type": "code",
208 "execution_count": 9,
209 "metadata": {
210 "collapsed": true
211 },
212 "outputs": [],
213 "source": [
214 "def write_report(connection, cursor, report):\n",
215 " insert_string, values = build_insert_command('reports', report)\n",
216 " # print(insert_string, values)\n",
217 " cursor.execute(insert_string, values)\n",
218 " \n",
219 " for i in range(1, len(report.findall('./record'))+1):\n",
220 " field_names = []\n",
221 " cursor.execute('select id, report_metadata_report_id from reports where report_metadata_report_id = %s;', \n",
222 " [report.find('./report_metadata/report_id').text])\n",
223 " results = cursor.fetchall()\n",
224 " if len(results) != 1:\n",
225 " raise RuntimeError('Could not find report record for report item')\n",
226 " else:\n",
227 " report_id = results[0][0]\n",
228 " insert_string, values = build_insert_command('report_items', report, i=i,\n",
229 " preamble_values={'report_id': report_id})\n",
230 " # print(insert_string, values)\n",
231 " cursor.execute(insert_string, values)\n",
232 " connection.commit()"
233 ]
234 },
235 {
236 "cell_type": "code",
237 "execution_count": 10,
238 "metadata": {
239 "collapsed": true
240 },
241 "outputs": [],
242 "source": [
243 "def write_report_old(connection, cursor, report):\n",
244 " \n",
245 " field_names = []\n",
246 " values = {}\n",
247 " for f in [f for f in field_maps if field_maps[f]['pg_table'] == 'reports']:\n",
248 " field_names += [field_maps[f]['pg_field_name']]\n",
249 " if field_maps[f]['pg_type'] == 'int':\n",
250 " values[field_maps[f]['pg_field_name']] = int(report.find(f).text)\n",
251 " elif field_maps[f]['pg_type'] == 'timestamp':\n",
252 " values[field_maps[f]['pg_field_name']] = datetime.datetime.utcfromtimestamp(int(report.find(f).text))\n",
253 " elif field_maps[f]['pg_type'] == 'inet':\n",
254 " values[field_maps[f]['pg_field_name']] = maybe_strip(report.find(f).text)\n",
255 " else:\n",
256 " values[field_maps[f]['pg_field_name']] = maybe_strip(report.find(f).text)\n",
257 " insert_string = 'insert into reports (' + ', '.join(field_names) + ') '\n",
258 " insert_string += 'values (' + ', '.join('%({})s'.format(f) for f in field_names) + ');'\n",
259 " # print(insert_string, values)\n",
260 " cursor.execute(insert_string, values)\n",
261 " \n",
262 " for i in range(1, len(report.findall('./record'))+1):\n",
263 " field_names = []\n",
264 " cursor.execute('select id, report_metadata_report_id from reports where report_metadata_report_id = %s;', \n",
265 " [report.find('./report_metadata/report_id').text])\n",
266 " results = cursor.fetchall()\n",
267 " if len(results) != 1:\n",
268 " raise\n",
269 " else:\n",
270 " report_id = results[0][0]\n",
271 " values = {'report_id': report_id}\n",
272 " for f in [f for f in field_maps if field_maps[f]['pg_table'] == 'report_items']:\n",
273 " field_names += [field_maps[f]['pg_field_name']]\n",
274 " if field_maps[f]['pg_type'] == 'int':\n",
275 " values[field_maps[f]['pg_field_name']] = int(report.find(f.format(i)).text)\n",
276 " elif field_maps[f]['pg_type'] == 'timestamp':\n",
277 " values[field_maps[f]['pg_field_name']] = datetime.datetime.utcfromtimestamp(int(report.find(f.format(i)).text))\n",
278 " elif field_maps[f]['pg_type'] == 'inet':\n",
279 " values[field_maps[f]['pg_field_name']] = maybe_strip(report.find(f.format(i)).text)\n",
280 " else:\n",
281 " values[field_maps[f]['pg_field_name']] = maybe_strip(report.find(f.format(i)).text)\n",
282 " insert_string = 'insert into report_items (report_id, ' + ', '.join(field_names) + ') '\n",
283 " insert_string += 'values (%(report_id)s, ' + ', '.join('%({})s'.format(f) for f in field_names) + ');'\n",
284 " # print(insert_string, values)\n",
285 " cursor.execute(insert_string, values)\n",
286 " connection.commit()"
287 ]
288 },
289 {
290 "cell_type": "code",
291 "execution_count": 11,
292 "metadata": {
293 "collapsed": false
294 },
295 "outputs": [
296 {
297 "data": {
298 "text/plain": [
299 "['dmarc.ini']"
300 ]
301 },
302 "execution_count": 11,
303 "metadata": {},
304 "output_type": "execute_result"
305 }
306 ],
307 "source": [
308 "config = configparser.ConfigParser()\n",
309 "config.read('dmarc.ini')"
310 ]
311 },
312 {
313 "cell_type": "code",
314 "execution_count": 12,
315 "metadata": {
316 "collapsed": false
317 },
318 "outputs": [
319 {
320 "data": {
321 "text/plain": [
322 "datetime.datetime(2016, 3, 29, 23, 59, 59)"
323 ]
324 },
325 "execution_count": 12,
326 "metadata": {},
327 "output_type": "execute_result"
328 }
329 ],
330 "source": [
331 "with psycopg2.connect(host=config['database']['server'],\n",
332 " database=config['database']['database'], \n",
333 " user=config['database']['username'], \n",
334 " password=config['database']['password']) as conn:\n",
335 " with conn.cursor() as cur:\n",
336 " cur.execute('select max(report_metadata_date_range_end) from reports')\n",
337 " results = cur.fetchall()\n",
338 "most_recent_date = results[0][0]\n",
339 "most_recent_date"
340 ]
341 },
342 {
343 "cell_type": "code",
344 "execution_count": 13,
345 "metadata": {
346 "collapsed": false
347 },
348 "outputs": [
349 {
350 "data": {
351 "text/plain": [
352 "('OK', [b'178'])"
353 ]
354 },
355 "execution_count": 13,
356 "metadata": {},
357 "output_type": "execute_result"
358 }
359 ],
360 "source": [
361 "mailbox = imaplib.IMAP4(host=config['imap']['server'], \n",
362 " port=config['imap']['port'])\n",
363 "mailbox.starttls()\n",
364 "mailbox.login(config['imap']['username'], config['imap']['password'])\n",
365 "mailbox.select('INBOX', readonly=True)"
366 ]
367 },
368 {
369 "cell_type": "code",
370 "execution_count": 14,
371 "metadata": {
372 "collapsed": false
373 },
374 "outputs": [
375 {
376 "data": {
377 "text/plain": [
378 "('SINCE 27-Mar-2016', 'OK', [b'169 170 171 172 173 174 175 176 177 178 179'])"
379 ]
380 },
381 "execution_count": 14,
382 "metadata": {},
383 "output_type": "execute_result"
384 }
385 ],
386 "source": [
387 "if most_recent_date:\n",
388 " mails_from = \"SINCE \" + (most_recent_date - datetime.timedelta(days=2)).strftime(\"%d-%b-%Y\")\n",
389 "else:\n",
390 " mails_from = \"ALL\"\n",
391 "resp, nums = mailbox.uid('SEARCH', None, mails_from)\n",
392 "mails_from, resp, nums"
393 ]
394 },
395 {
396 "cell_type": "code",
397 "execution_count": 15,
398 "metadata": {
399 "collapsed": false
400 },
401 "outputs": [
402 {
403 "data": {
404 "text/plain": [
405 "['1458957186.548175',\n",
406 " '2150510829392606201',\n",
407 " '68aad5080a774e2c997d159b546569b9@hotmail.com',\n",
408 " '1459129809.695034',\n",
409 " '16143280651570354241',\n",
410 " '8c177254c3cb41869dc3afab59f74c76@hotmail.com',\n",
411 " '15410706527896810898',\n",
412 " '1459216304.582931',\n",
413 " '15497495941279624940',\n",
414 " '1459302353.261157',\n",
415 " '7773a696f4a54f1e8c01f4644fbb94ee@hotmail.com']"
416 ]
417 },
418 "execution_count": 15,
419 "metadata": {},
420 "output_type": "execute_result"
421 }
422 ],
423 "source": [
424 "dmarc_reports = [report for report_set in [extract_report(fetch_msg(n)) for n in nums[0].split()]\n",
425 " for report in report_set]\n",
426 "[r.find('./report_metadata/report_id').text for r in dmarc_reports]"
427 ]
428 },
429 {
430 "cell_type": "code",
431 "execution_count": 16,
432 "metadata": {
433 "collapsed": false
434 },
435 "outputs": [
436 {
437 "data": {
438 "text/plain": [
439 "('BYE', [b'Logging out'])"
440 ]
441 },
442 "execution_count": 16,
443 "metadata": {},
444 "output_type": "execute_result"
445 }
446 ],
447 "source": [
448 "mailbox.close()\n",
449 "mailbox.logout()"
450 ]
451 },
452 {
453 "cell_type": "code",
454 "execution_count": 18,
455 "metadata": {
456 "collapsed": false
457 },
458 "outputs": [
459 {
460 "name": "stdout",
461 "output_type": "stream",
462 "text": [
463 "write 1459302353.261157\n",
464 "write 7773a696f4a54f1e8c01f4644fbb94ee@hotmail.com\n"
465 ]
466 }
467 ],
468 "source": [
469 "with psycopg2.connect(host=config['database']['server'],\n",
470 " database=config['database']['database'], \n",
471 " user=config['database']['username'], \n",
472 " password=config['database']['password']) as conn:\n",
473 " with conn.cursor() as cur:\n",
474 " for report in dmarc_reports:\n",
475 " cur.execute('select id, report_metadata_report_id from reports where report_metadata_report_id = %s;', \n",
476 " [report.find('./report_metadata/report_id').text])\n",
477 " results = cur.fetchall()\n",
478 " if not results:\n",
479 " print('write', report.find('./report_metadata/report_id').text)\n",
480 " write_report(conn, cur, report)"
481 ]
482 },
483 {
484 "cell_type": "code",
485 "execution_count": null,
486 "metadata": {
487 "collapsed": true
488 },
489 "outputs": [],
490 "source": []
491 }
492 ],
493 "metadata": {
494 "kernelspec": {
495 "display_name": "Python 3",
496 "language": "python",
497 "name": "python3"
498 },
499 "language_info": {
500 "codemirror_mode": {
501 "name": "ipython",
502 "version": 3
503 },
504 "file_extension": ".py",
505 "mimetype": "text/x-python",
506 "name": "python",
507 "nbconvert_exporter": "python",
508 "pygments_lexer": "ipython3",
509 "version": "3.4.3+"
510 }
511 },
512 "nbformat": 4,
513 "nbformat_minor": 0
514 }