Tidied, created stand-alone script
[dmarc.git] / dmarc_to_database.py
1 import configparser
2 import imaplib
3 import email
4 import io
5 import zipfile
6 import xml.etree.ElementTree
7 import psycopg2
8 import re
9 import datetime
10
11 def fetch_msg(num):
12 return mailbox.uid('FETCH', num, '(RFC822)')[1][0][1]
13
14 def xml_of_part(part):
15 with zipfile.ZipFile(io.BytesIO(part.get_payload(decode=True))) as zf:
16 fn = zf.infolist()[0].filename
17 contents = zf.read(fn).decode('utf-8')
18 return xml.etree.ElementTree.fromstring(contents)
19
20
21 def xml_of(message):
22 reports = []
23 if message.is_multipart():
24 for p in message.get_payload():
25 if 'zip' in p.get_content_type():
26 reports += [xml_of_part(p)]
27 else:
28 reports = [xml_of_part(message)]
29 return reports
30
31 def extract_report(msg):
32 pmsg = email.message_from_bytes(msg)
33 return xml_of(pmsg)
34
35 def maybe_strip(text):
36 if text:
37 return text.strip()
38 else:
39 return ''
40
41 field_maps = {'./policy_published/adkim': {'pg_field_name': 'policy_published_adkim',
42 'pg_table': 'reports',
43 'pg_type': 'varchar'},
44 './policy_published/aspf': {'pg_field_name': 'policy_published_aspf',
45 'pg_table': 'reports',
46 'pg_type': 'varchar'},
47 './policy_published/domain': {'pg_field_name': 'policy_published_domain',
48 'pg_table': 'reports',
49 'pg_type': 'varchar'},
50 './policy_published/p': {'pg_field_name': 'policy_published_p',
51 'pg_table': 'reports',
52 'pg_type': 'varchar'},
53 './policy_published/pct': {'pg_field_name': 'policy_published_pct',
54 'pg_table': 'reports',
55 'pg_type': 'int'},
56 './record[{}]/auth_results/dkim/domain': {'pg_field_name': 'auth_results_dkim_domain',
57 'pg_table': 'report_items',
58 'pg_type': 'varchar'},
59 './record[{}]/auth_results/dkim/result': {'pg_field_name': 'auth_results_dkim_result',
60 'pg_table': 'report_items',
61 'pg_type': 'varchar'},
62 './record[{}]/auth_results/spf/domain': {'pg_field_name': 'auth_results_spf_domain',
63 'pg_table': 'report_items',
64 'pg_type': 'varchar'},
65 './record[{}]/auth_results/spf/result': {'pg_field_name': 'auth_results_spf_result',
66 'pg_table': 'report_items',
67 'pg_type': 'varchar'},
68 './record[{}]/identifiers/header_from': {'pg_field_name': 'identifiers_header_from',
69 'pg_table': 'report_items',
70 'pg_type': 'varchar'},
71 './record[{}]/row/count': {'pg_field_name': 'count',
72 'pg_table': 'report_items',
73 'pg_type': 'int'},
74 './record[{}]/row/policy_evaluated/disposition': {'pg_field_name': 'policy_evaluated_disposition',
75 'pg_table': 'report_items',
76 'pg_type': 'varchar'},
77 './record[{}]/row/policy_evaluated/dkim': {'pg_field_name': 'policy_evaluated_dkim',
78 'pg_table': 'report_items',
79 'pg_type': 'varchar'},
80 './record[{}]/row/policy_evaluated/spf': {'pg_field_name': 'policy_evaluated_spf',
81 'pg_table': 'report_items',
82 'pg_type': 'varchar'},
83 './record[{}]/row/source_ip': {'pg_field_name': 'source_ip',
84 'pg_table': 'report_items',
85 'pg_type': 'inet'},
86 './report_metadata/date_range/begin': {'pg_field_name': 'report_metadata_date_range_begin',
87 'pg_table': 'reports',
88 'pg_type': 'timestamp'},
89 './report_metadata/date_range/end': {'pg_field_name': 'report_metadata_date_range_end',
90 'pg_table': 'reports',
91 'pg_type': 'timestamp'},
92 './report_metadata/email': {'pg_field_name': 'report_metadata_email',
93 'pg_table': 'reports',
94 'pg_type': 'varchar'},
95 './report_metadata/org_name': {'pg_field_name': 'report_metadata_org_name',
96 'pg_table': 'reports',
97 'pg_type': 'varchar'},
98 './report_metadata/report_id': {'pg_field_name': 'report_metadata_report_id',
99 'pg_table': 'reports',
100 'pg_type': 'varchar'}}
101
102
103
104 def build_insert_command(table_name, report, preamble_values=None, i=None):
105 field_names = []
106 if preamble_values:
107 values = preamble_values.copy()
108 else:
109 values = {}
110 for f in [f for f in field_maps if field_maps[f]['pg_table'] == table_name]:
111 if i:
112 fp = f.format(i)
113 else:
114 fp = f
115 field_names += [field_maps[f]['pg_field_name']]
116 if field_maps[f]['pg_type'] == 'int':
117 values[field_maps[f]['pg_field_name']] = int(report.find(fp).text)
118 elif field_maps[f]['pg_type'] == 'timestamp':
119 values[field_maps[f]['pg_field_name']] = datetime.datetime.utcfromtimestamp(int(report.find(fp).text))
120 elif field_maps[f]['pg_type'] == 'inet':
121 values[field_maps[f]['pg_field_name']] = maybe_strip(report.find(fp).text)
122 else:
123 values[field_maps[f]['pg_field_name']] = maybe_strip(report.find(fp).text)
124 insert_string = 'insert into {} ('.format(table_name)
125 if preamble_values:
126 insert_string += ', '.join(sorted(preamble_values.keys())) + ', '
127 insert_string += ', '.join(field_names) + ') '
128 insert_string += 'values ('
129 if preamble_values:
130 insert_string += ', '.join('%({})s'.format(fn) for fn in sorted(preamble_values.keys())) + ', '
131 insert_string += ', '.join('%({})s'.format(f) for f in field_names) + ');'
132 return insert_string, values
133
134
135 def write_report(connection, cursor, report):
136 insert_string, values = build_insert_command('reports', report)
137 # print(insert_string, values)
138 cursor.execute(insert_string, values)
139
140 for i in range(1, len(report.findall('./record'))+1):
141 field_names = []
142 cursor.execute('select id, report_metadata_report_id from reports where report_metadata_report_id = %s;',
143 [report.find('./report_metadata/report_id').text])
144 results = cursor.fetchall()
145 if len(results) != 1:
146 raise RuntimeError('Could not find report record for report item')
147 else:
148 report_id = results[0][0]
149 insert_string, values = build_insert_command('report_items', report, i=i,
150 preamble_values={'report_id': report_id})
151 # print(insert_string, values)
152 cursor.execute(insert_string, values)
153 connection.commit()
154
155 config = configparser.ConfigParser()
156 config.read('dmarc.ini')
157
158 with psycopg2.connect(host=config['database']['server'],
159 database=config['database']['database'],
160 user=config['database']['username'],
161 password=config['database']['password']) as conn:
162 with conn.cursor() as cur:
163 cur.execute('select max(report_metadata_date_range_end) from reports')
164 results = cur.fetchall()
165 most_recent_date = results[0][0]
166
167 mailbox = imaplib.IMAP4(host=config['imap']['server'],
168 port=config['imap']['port'])
169 mailbox.starttls()
170 mailbox.login(config['imap']['username'], config['imap']['password'])
171 mailbox.select('INBOX', readonly=True)
172
173
174 if most_recent_date:
175 mails_from = "SINCE " + (most_recent_date - datetime.timedelta(days=2)).strftime("%d-%b-%Y")
176 else:
177 mails_from = "ALL"
178 resp, nums = mailbox.uid('SEARCH', None, mails_from)
179
180
181 dmarc_reports = [report for report_set in [extract_report(fetch_msg(n)) for n in nums[0].split()]
182 for report in report_set]
183
184 mailbox.close()
185 mailbox.logout()
186
187
188 with psycopg2.connect(host=config['database']['server'],
189 database=config['database']['database'],
190 user=config['database']['username'],
191 password=config['database']['password']) as conn:
192 with conn.cursor() as cur:
193 for report in dmarc_reports:
194 cur.execute('select id, report_metadata_report_id from reports where report_metadata_report_id = %s;',
195 [report.find('./report_metadata/report_id').text])
196 results = cur.fetchall()
197 if not results:
198 print('write', report.find('./report_metadata/report_id').text)
199 write_report(conn, cur, report)
200
201
202