a8141fdc108478097cc290e86890901c59cb6fd5
[dmarc.git] / dmarc_to_database
1 #!/usr/bin/python3
2
3 import configparser
4 import imaplib
5 import email
6 import io
7 import zipfile
8 import xml.etree.ElementTree
9 import psycopg2
10 import re
11 import datetime
12
13 def fetch_msg(num):
14 return mailbox.uid('FETCH', num, '(RFC822)')[1][0][1]
15
16 def xml_of_part(part):
17 with zipfile.ZipFile(io.BytesIO(part.get_payload(decode=True))) as zf:
18 fn = zf.infolist()[0].filename
19 contents = zf.read(fn).decode('utf-8')
20 return xml.etree.ElementTree.fromstring(contents)
21
22
23 def xml_of(message):
24 reports = []
25 if message.is_multipart():
26 for p in message.get_payload():
27 if 'zip' in p.get_content_type():
28 reports += [xml_of_part(p)]
29 else:
30 reports = [xml_of_part(message)]
31 return reports
32
33 def extract_report(msg):
34 pmsg = email.message_from_bytes(msg)
35 return xml_of(pmsg)
36
37 def maybe_strip(text):
38 if text:
39 return text.strip()
40 else:
41 return ''
42
43 field_maps = {'./policy_published/adkim': {'pg_field_name': 'policy_published_adkim',
44 'pg_table': 'reports',
45 'pg_type': 'varchar'},
46 './policy_published/aspf': {'pg_field_name': 'policy_published_aspf',
47 'pg_table': 'reports',
48 'pg_type': 'varchar'},
49 './policy_published/domain': {'pg_field_name': 'policy_published_domain',
50 'pg_table': 'reports',
51 'pg_type': 'varchar'},
52 './policy_published/p': {'pg_field_name': 'policy_published_p',
53 'pg_table': 'reports',
54 'pg_type': 'varchar'},
55 './policy_published/pct': {'pg_field_name': 'policy_published_pct',
56 'pg_table': 'reports',
57 'pg_type': 'int'},
58 './record[{}]/auth_results/dkim/domain': {'pg_field_name': 'auth_results_dkim_domain',
59 'pg_table': 'report_items',
60 'pg_type': 'varchar'},
61 './record[{}]/auth_results/dkim/result': {'pg_field_name': 'auth_results_dkim_result',
62 'pg_table': 'report_items',
63 'pg_type': 'varchar'},
64 './record[{}]/auth_results/spf/domain': {'pg_field_name': 'auth_results_spf_domain',
65 'pg_table': 'report_items',
66 'pg_type': 'varchar'},
67 './record[{}]/auth_results/spf/result': {'pg_field_name': 'auth_results_spf_result',
68 'pg_table': 'report_items',
69 'pg_type': 'varchar'},
70 './record[{}]/identifiers/header_from': {'pg_field_name': 'identifiers_header_from',
71 'pg_table': 'report_items',
72 'pg_type': 'varchar'},
73 './record[{}]/row/count': {'pg_field_name': 'count',
74 'pg_table': 'report_items',
75 'pg_type': 'int'},
76 './record[{}]/row/policy_evaluated/disposition': {'pg_field_name': 'policy_evaluated_disposition',
77 'pg_table': 'report_items',
78 'pg_type': 'varchar'},
79 './record[{}]/row/policy_evaluated/dkim': {'pg_field_name': 'policy_evaluated_dkim',
80 'pg_table': 'report_items',
81 'pg_type': 'varchar'},
82 './record[{}]/row/policy_evaluated/spf': {'pg_field_name': 'policy_evaluated_spf',
83 'pg_table': 'report_items',
84 'pg_type': 'varchar'},
85 './record[{}]/row/source_ip': {'pg_field_name': 'source_ip',
86 'pg_table': 'report_items',
87 'pg_type': 'inet'},
88 './report_metadata/date_range/begin': {'pg_field_name': 'report_metadata_date_range_begin',
89 'pg_table': 'reports',
90 'pg_type': 'timestamp'},
91 './report_metadata/date_range/end': {'pg_field_name': 'report_metadata_date_range_end',
92 'pg_table': 'reports',
93 'pg_type': 'timestamp'},
94 './report_metadata/email': {'pg_field_name': 'report_metadata_email',
95 'pg_table': 'reports',
96 'pg_type': 'varchar'},
97 './report_metadata/org_name': {'pg_field_name': 'report_metadata_org_name',
98 'pg_table': 'reports',
99 'pg_type': 'varchar'},
100 './report_metadata/report_id': {'pg_field_name': 'report_metadata_report_id',
101 'pg_table': 'reports',
102 'pg_type': 'varchar'}}
103
104
105
106 def build_insert_command(table_name, report, preamble_values=None, i=None):
107 field_names = []
108 if preamble_values:
109 values = preamble_values.copy()
110 else:
111 values = {}
112 for f in [f for f in field_maps if field_maps[f]['pg_table'] == table_name]:
113 if i:
114 fp = f.format(i)
115 else:
116 fp = f
117 field_names += [field_maps[f]['pg_field_name']]
118 if field_maps[f]['pg_type'] == 'int':
119 values[field_maps[f]['pg_field_name']] = int(report.find(fp).text)
120 elif field_maps[f]['pg_type'] == 'timestamp':
121 # values[field_maps[f]['pg_field_name']] = datetime.datetime.utcfromtimestamp(int(report.find(fp).text))
122 values[field_maps[f]['pg_field_name']] = \
123 datetime.datetime.fromtimestamp(int(report.find(fp).text),
124 tz=datetime.timezone.utc)
125 elif field_maps[f]['pg_type'] == 'inet':
126 values[field_maps[f]['pg_field_name']] = maybe_strip(report.find(fp).text)
127 else:
128 values[field_maps[f]['pg_field_name']] = maybe_strip(report.find(fp).text)
129 insert_string = 'insert into {} ('.format(table_name)
130 if preamble_values:
131 insert_string += ', '.join(sorted(preamble_values.keys())) + ', '
132 insert_string += ', '.join(field_names) + ') '
133 insert_string += 'values ('
134 if preamble_values:
135 insert_string += ', '.join('%({})s'.format(fn) for fn in sorted(preamble_values.keys())) + ', '
136 insert_string += ', '.join('%({})s'.format(f) for f in field_names) + ');'
137 return insert_string, values
138
139
140 def write_report(connection, cursor, report):
141 insert_string, values = build_insert_command('reports', report)
142 # print(insert_string, values)
143 cursor.execute(insert_string, values)
144
145 for i in range(1, len(report.findall('./record'))+1):
146 field_names = []
147 cursor.execute('select id, report_metadata_report_id from reports where report_metadata_report_id = %s;',
148 [report.find('./report_metadata/report_id').text])
149 results = cursor.fetchall()
150 if len(results) != 1:
151 raise RuntimeError('Could not find report record for report item')
152 else:
153 report_id = results[0][0]
154 insert_string, values = build_insert_command('report_items', report, i=i,
155 preamble_values={'report_id': report_id})
156 # print(insert_string, values)
157 cursor.execute(insert_string, values)
158 connection.commit()
159
160 config = configparser.ConfigParser()
161 config.read('dmarc.ini')
162
163 conn = psycopg2.connect(host=config['database']['server'],
164 database=config['database']['database'],
165 user=config['database']['username'],
166 password=config['database']['password'])
167
168 cur = conn.cursor()
169 cur.execute('select max(report_metadata_date_range_end) from reports')
170 results = cur.fetchall()
171 most_recent_date = results[0][0]
172
173 mailbox = imaplib.IMAP4(host=config['imap']['server'],
174 port=config['imap']['port'])
175 mailbox.starttls()
176 mailbox.login(config['imap']['username'], config['imap']['password'])
177 mailbox.select('INBOX', readonly=True)
178
179
180 if most_recent_date:
181 mails_from = "SINCE " + (most_recent_date - datetime.timedelta(days=2)).strftime("%d-%b-%Y")
182 else:
183 mails_from = "ALL"
184 resp, nums = mailbox.uid('SEARCH', None, mails_from)
185
186
187 dmarc_reports = [report for report_set in [extract_report(fetch_msg(n)) for n in nums[0].split()]
188 for report in report_set]
189
190 mailbox.close()
191 mailbox.logout()
192
193 for report in dmarc_reports:
194 cur.execute('select id, report_metadata_report_id from reports where report_metadata_report_id = %s;',
195 [report.find('./report_metadata/report_id').text])
196 results = cur.fetchall()
197 if not results:
198 print('write', report.find('./report_metadata/report_id').text)
199 write_report(conn, cur, report)
200
201 conn.close()