Handling invalid zip files
[dmarc.git] / dmarc_to_database.py
1 #!/usr/bin/python3
2
3 import configparser
4 import imaplib
5 import email
6 import io
7 import zipfile
8 import xml.etree.ElementTree
9 import psycopg2
10 import re
11 import datetime
12
13 def fetch_msg(num):
14 return mailbox.uid('FETCH', num, '(RFC822)')[1][0][1]
15
16 def xml_of_part(part):
17 try:
18 with zipfile.ZipFile(io.BytesIO(part.get_payload(decode=True))) as zf:
19 fn = zf.infolist()[0].filename
20 contents = zf.read(fn).decode('utf-8')
21 return xml.etree.ElementTree.fromstring(contents)
22 except zipfile.BadZipFile:
23 return None
24
25
26 def xml_of(message):
27 reports = []
28 if message.is_multipart():
29 for p in message.get_payload():
30 if 'zip' in p.get_content_type():
31 reports += [xml_of_part(p)]
32 else:
33 reports = [xml_of_part(message)]
34 return reports
35
36 def extract_report(msg):
37 pmsg = email.message_from_bytes(msg)
38 return xml_of(pmsg)
39
40 def maybe_strip(text):
41 if text:
42 return text.strip()
43 else:
44 return ''
45
46 field_maps = {
47 './policy_published/adkim':
48 {'pg_field_name': 'policy_published_adkim',
49 'pg_table': 'reports', 'pg_type': 'varchar'},
50 './policy_published/aspf':
51 {'pg_field_name': 'policy_published_aspf',
52 'pg_table': 'reports', 'pg_type': 'varchar'},
53 './policy_published/domain':
54 {'pg_field_name': 'policy_published_domain',
55 'pg_table': 'reports', 'pg_type': 'varchar'},
56 './policy_published/p':
57 {'pg_field_name': 'policy_published_p',
58 'pg_table': 'reports', 'pg_type': 'varchar'},
59 './policy_published/pct':
60 {'pg_field_name': 'policy_published_pct',
61 'pg_table': 'reports', 'pg_type': 'int'},
62 './record[{}]/auth_results/dkim/domain':
63 {'pg_field_name': 'auth_results_dkim_domain',
64 'pg_table': 'report_items', 'pg_type': 'varchar'},
65 './record[{}]/auth_results/dkim/result':
66 {'pg_field_name': 'auth_results_dkim_result',
67 'pg_table': 'report_items', 'pg_type': 'varchar'},
68 './record[{}]/auth_results/spf/domain':
69 {'pg_field_name': 'auth_results_spf_domain',
70 'pg_table': 'report_items', 'pg_type': 'varchar'},
71 './record[{}]/auth_results/spf/result':
72 {'pg_field_name': 'auth_results_spf_result',
73 'pg_table': 'report_items', 'pg_type': 'varchar'},
74 './record[{}]/identifiers/header_from':
75 {'pg_field_name': 'identifiers_header_from',
76 'pg_table': 'report_items', 'pg_type': 'varchar'},
77 './record[{}]/row/count':
78 {'pg_field_name': 'count',
79 'pg_table': 'report_items', 'pg_type': 'int'},
80 './record[{}]/row/policy_evaluated/disposition':
81 {'pg_field_name': 'policy_evaluated_disposition',
82 'pg_table': 'report_items', 'pg_type': 'varchar'},
83 './record[{}]/row/policy_evaluated/dkim':
84 {'pg_field_name': 'policy_evaluated_dkim',
85 'pg_table': 'report_items', 'pg_type': 'varchar'},
86 './record[{}]/row/policy_evaluated/spf':
87 {'pg_field_name': 'policy_evaluated_spf',
88 'pg_table': 'report_items', 'pg_type': 'varchar'},
89 './record[{}]/row/source_ip':
90 {'pg_field_name': 'source_ip',
91 'pg_table': 'report_items', 'pg_type': 'inet'},
92 './report_metadata/date_range/begin':
93 {'pg_field_name': 'report_metadata_date_range_begin',
94 'pg_table': 'reports', 'pg_type': 'timestamp'},
95 './report_metadata/date_range/end':
96 {'pg_field_name': 'report_metadata_date_range_end',
97 'pg_table': 'reports', 'pg_type': 'timestamp'},
98 './report_metadata/email':
99 {'pg_field_name': 'report_metadata_email',
100 'pg_table': 'reports', 'pg_type': 'varchar'},
101 './report_metadata/org_name':
102 {'pg_field_name': 'report_metadata_org_name',
103 'pg_table': 'reports', 'pg_type': 'varchar'},
104 './report_metadata/report_id':
105 {'pg_field_name': 'report_metadata_report_id',
106 'pg_table': 'reports', 'pg_type': 'varchar'}}
107
108
109
110 def build_insert_command(table_name, report, preamble_values=None, i=None):
111 field_names = []
112 if preamble_values:
113 values = preamble_values.copy()
114 else:
115 values = {}
116 for f in [f for f in field_maps if field_maps[f]['pg_table'] == table_name]:
117 if i:
118 fp = f.format(i)
119 else:
120 fp = f
121 field_names += [field_maps[f]['pg_field_name']]
122 if field_maps[f]['pg_type'] == 'int':
123 values[field_maps[f]['pg_field_name']] = int(report.find(fp).text)
124 elif field_maps[f]['pg_type'] == 'timestamp':
125 values[field_maps[f]['pg_field_name']] = datetime.datetime.utcfromtimestamp(int(report.find(fp).text))
126 elif field_maps[f]['pg_type'] == 'inet':
127 values[field_maps[f]['pg_field_name']] = maybe_strip(report.find(fp).text)
128 else:
129 values[field_maps[f]['pg_field_name']] = maybe_strip(report.find(fp).text)
130 insert_string = 'insert into {} ('.format(table_name)
131 if preamble_values:
132 insert_string += ', '.join(sorted(preamble_values.keys())) + ', '
133 insert_string += ', '.join(field_names) + ') '
134 insert_string += 'values ('
135 if preamble_values:
136 insert_string += ', '.join('%({})s'.format(fn) for fn in sorted(preamble_values.keys())) + ', '
137 insert_string += ', '.join('%({})s'.format(f) for f in field_names) + ');'
138 return insert_string, values
139
140
141 def write_report(connection, cursor, report):
142 insert_string, values = build_insert_command('reports', report)
143 # print(insert_string, values)
144 cursor.execute(insert_string, values)
145
146 for i in range(1, len(report.findall('./record'))+1):
147 field_names = []
148 cursor.execute('select id, report_metadata_report_id from reports where report_metadata_report_id = %s;',
149 [report.find('./report_metadata/report_id').text])
150 results = cursor.fetchall()
151 if len(results) != 1:
152 raise RuntimeError('Could not find report record for report item')
153 else:
154 report_id = results[0][0]
155 insert_string, values = build_insert_command('report_items', report, i=i,
156 preamble_values={'report_id': report_id})
157 # print(insert_string, values)
158 cursor.execute(insert_string, values)
159 connection.commit()
160
161 config = configparser.ConfigParser()
162 config.read('dmarc.ini')
163
164 conn = psycopg2.connect(host=config['database']['server'],
165 database=config['database']['database'],
166 user=config['database']['username'],
167 password=config['database']['password'])
168
169 cur = conn.cursor()
170 cur.execute('select max(report_metadata_date_range_end) from reports')
171 results = cur.fetchall()
172 most_recent_date = results[0][0]
173
174 mailbox = imaplib.IMAP4(host=config['imap']['server'],
175 port=config['imap']['port'])
176 mailbox.starttls()
177 mailbox.login(config['imap']['username'], config['imap']['password'])
178 mailbox.select('INBOX', readonly=True)
179
180
181 if most_recent_date:
182 mails_from = "SINCE " + (most_recent_date - datetime.timedelta(days=2)).strftime("%d-%b-%Y")
183 else:
184 mails_from = "ALL"
185 resp, nums = mailbox.uid('SEARCH', None, mails_from)
186
187
188 dmarc_reports = [report for report_set in [extract_report(fetch_msg(n)) for n in nums[0].split()]
189 for report in report_set
190 if report]
191
192 mailbox.close()
193 mailbox.logout()
194
195 for report in dmarc_reports:
196 cur.execute('select id, report_metadata_report_id from reports where report_metadata_report_id = %s;',
197 [report.find('./report_metadata/report_id').text])
198 results = cur.fetchall()
199 if not results:
200 print('write', report.find('./report_metadata/report_id').text)
201 write_report(conn, cur, report)
202
203 conn.close()