Fixed error handling
[dmarc.git] / dmarc_to_database
1 #!/usr/bin/python3
2
3 import configparser
4 import imaplib
5 import email
6 import io
7 import zipfile
8 import xml.etree.ElementTree
9 import psycopg2
10 import re
11 import datetime
12 import argparse
13
14 parser = argparse.ArgumentParser(description='Process DMARC records.')
15 parser.add_argument('-c', '--config', action='store',
16 default='', dest='config_file',
17 help='Path to config file')
18 parser.add_argument('-t', '--test', action='store_true',
19 default=False,
20 help='Test, but do not add records to the database')
21 args = parser.parse_args()
22
23 def fetch_msg(num):
24 return mailbox.uid('FETCH', num, '(RFC822)')[1][0][1]
25
26 def xml_of_part(part):
27 try:
28 with zipfile.ZipFile(io.BytesIO(part.get_payload(decode=True))) as zf:
29 fn = zf.infolist()[0].filename
30 contents = zf.read(fn).decode('utf-8')
31 return xml.etree.ElementTree.fromstring(contents)
32 except zipfile.BadZipFile:
33 return None
34
35
36 def xml_of(message):
37 reports = []
38 if message.is_multipart():
39 for p in message.get_payload():
40 if 'zip' in p.get_content_type():
41 reports += [xml_of_part(p)]
42 else:
43 reports = [xml_of_part(message)]
44 return reports
45
46 def extract_report(msg):
47 pmsg = email.message_from_bytes(msg)
48 return xml_of(pmsg)
49
50 def maybe_strip(text):
51 if text:
52 return text.strip()
53 else:
54 return ''
55
56 field_maps = {'./policy_published/adkim': {'pg_field_name': 'policy_published_adkim',
57 'pg_table': 'reports',
58 'pg_type': 'varchar'},
59 './policy_published/aspf': {'pg_field_name': 'policy_published_aspf',
60 'pg_table': 'reports',
61 'pg_type': 'varchar'},
62 './policy_published/domain': {'pg_field_name': 'policy_published_domain',
63 'pg_table': 'reports',
64 'pg_type': 'varchar'},
65 './policy_published/p': {'pg_field_name': 'policy_published_p',
66 'pg_table': 'reports',
67 'pg_type': 'varchar'},
68 './policy_published/pct': {'pg_field_name': 'policy_published_pct',
69 'pg_table': 'reports',
70 'pg_type': 'int'},
71 './record[{}]/auth_results/dkim/domain': {'pg_field_name': 'auth_results_dkim_domain',
72 'pg_table': 'report_items',
73 'pg_type': 'varchar'},
74 './record[{}]/auth_results/dkim/result': {'pg_field_name': 'auth_results_dkim_result',
75 'pg_table': 'report_items',
76 'pg_type': 'varchar'},
77 './record[{}]/auth_results/spf/domain': {'pg_field_name': 'auth_results_spf_domain',
78 'pg_table': 'report_items',
79 'pg_type': 'varchar'},
80 './record[{}]/auth_results/spf/result': {'pg_field_name': 'auth_results_spf_result',
81 'pg_table': 'report_items',
82 'pg_type': 'varchar'},
83 './record[{}]/identifiers/header_from': {'pg_field_name': 'identifiers_header_from',
84 'pg_table': 'report_items',
85 'pg_type': 'varchar'},
86 './record[{}]/row/count': {'pg_field_name': 'count',
87 'pg_table': 'report_items',
88 'pg_type': 'int'},
89 './record[{}]/row/policy_evaluated/disposition': {'pg_field_name': 'policy_evaluated_disposition',
90 'pg_table': 'report_items',
91 'pg_type': 'varchar'},
92 './record[{}]/row/policy_evaluated/dkim': {'pg_field_name': 'policy_evaluated_dkim',
93 'pg_table': 'report_items',
94 'pg_type': 'varchar'},
95 './record[{}]/row/policy_evaluated/spf': {'pg_field_name': 'policy_evaluated_spf',
96 'pg_table': 'report_items',
97 'pg_type': 'varchar'},
98 './record[{}]/row/source_ip': {'pg_field_name': 'source_ip',
99 'pg_table': 'report_items',
100 'pg_type': 'inet'},
101 './report_metadata/date_range/begin': {'pg_field_name': 'report_metadata_date_range_begin',
102 'pg_table': 'reports',
103 'pg_type': 'timestamptz'},
104 './report_metadata/date_range/end': {'pg_field_name': 'report_metadata_date_range_end',
105 'pg_table': 'reports',
106 'pg_type': 'timestamptz'},
107 './report_metadata/email': {'pg_field_name': 'report_metadata_email',
108 'pg_table': 'reports',
109 'pg_type': 'varchar'},
110 './report_metadata/org_name': {'pg_field_name': 'report_metadata_org_name',
111 'pg_table': 'reports',
112 'pg_type': 'varchar'},
113 './report_metadata/report_id': {'pg_field_name': 'report_metadata_report_id',
114 'pg_table': 'reports',
115 'pg_type': 'varchar'}}
116
117
118
119 def build_insert_command(table_name, report, preamble_values=None, i=None):
120 field_names = []
121 if preamble_values:
122 values = preamble_values.copy()
123 else:
124 values = {}
125 for f in [f for f in field_maps if field_maps[f]['pg_table'] == table_name]:
126 if i:
127 fp = f.format(i)
128 else:
129 fp = f
130 field_names += [field_maps[f]['pg_field_name']]
131 if field_maps[f]['pg_type'] == 'int':
132 values[field_maps[f]['pg_field_name']] = int(report.find(fp).text)
133 elif field_maps[f]['pg_type'] == 'timestamptz':
134 # values[field_maps[f]['pg_field_name']] = datetime.datetime.utcfromtimestamp(int(report.find(fp).text))
135 values[field_maps[f]['pg_field_name']] = \
136 datetime.datetime.fromtimestamp(int(report.find(fp).text),
137 tz=datetime.timezone.utc)
138 elif field_maps[f]['pg_type'] == 'inet':
139 values[field_maps[f]['pg_field_name']] = maybe_strip(report.find(fp).text)
140 else:
141 values[field_maps[f]['pg_field_name']] = maybe_strip(report.find(fp).text)
142 insert_string = 'insert into {} ('.format(table_name)
143 if preamble_values:
144 insert_string += ', '.join(sorted(preamble_values.keys())) + ', '
145 insert_string += ', '.join(field_names) + ') '
146 insert_string += 'values ('
147 if preamble_values:
148 insert_string += ', '.join('%({})s'.format(fn) for fn in sorted(preamble_values.keys())) + ', '
149 insert_string += ', '.join('%({})s'.format(f) for f in field_names) + ');'
150 return insert_string, values
151
152
153 def write_report(connection, cursor, report):
154 try:
155 insert_string, values = build_insert_command('reports', report)
156 # print(insert_string, values)
157 cursor.execute(insert_string, values)
158
159 for i in range(1, len(report.findall('./record'))+1):
160 field_names = []
161 cursor.execute('select id, report_metadata_report_id from reports where report_metadata_report_id = %s;',
162 [report.find('./report_metadata/report_id').text])
163 results = cursor.fetchall()
164 if len(results) != 1:
165 raise RuntimeError('Could not find report record for report item')
166 else:
167 report_id = results[0][0]
168 insert_string, values = build_insert_command('report_items', report, i=i,
169 preamble_values={'report_id': report_id})
170 # print(insert_string, values)
171 cursor.execute(insert_string, values)
172 connection.commit()
173 except AttributeError:
174 pass
175
176 config = configparser.ConfigParser()
177 if args.config_file:
178 config.read(args.config_file)
179 else:
180 config.read(['/etc/dmarc_to_database.ini', './dmarc_to_database.ini'])
181
182 if not config.sections():
183 raise RuntimeError('Could not find configuration file')
184
185 conn = psycopg2.connect(host=config['database']['server'],
186 database=config['database']['database'],
187 user=config['database']['username'],
188 password=config['database']['password'])
189
190 cur = conn.cursor()
191 cur.execute('select max(report_metadata_date_range_end) from reports')
192 results = cur.fetchall()
193 most_recent_date = results[0][0]
194
195 mailbox = imaplib.IMAP4(host=config['imap']['server'],
196 port=config['imap']['port'])
197 mailbox.starttls()
198 mailbox.login(config['imap']['username'], config['imap']['password'])
199 mailbox.select('INBOX', readonly=True)
200
201
202 if most_recent_date:
203 mails_from = "SINCE " + (most_recent_date - datetime.timedelta(days=2)).strftime("%d-%b-%Y")
204 else:
205 mails_from = "ALL"
206 resp, nums = mailbox.uid('SEARCH', None, mails_from)
207
208
209 dmarc_reports = [report for report_set in [extract_report(fetch_msg(n)) for n in nums[0].split()]
210 for report in report_set
211 if report]
212
213 mailbox.close()
214 mailbox.logout()
215
216 for report in dmarc_reports:
217 cur.execute('select id, report_metadata_report_id from reports where report_metadata_report_id = %s;',
218 [report.find('./report_metadata/report_id').text])
219 results = cur.fetchall()
220 if not results:
221 print('write', report.find('./report_metadata/report_id').text)
222 if not args.test:
223 write_report(conn, cur, report)
224
225 conn.close()