8 import xml
.etree
.ElementTree
14 return mailbox
.uid('FETCH', num
, '(RFC822)')[1][0][1]
16 def xml_of_part(part
):
18 with zipfile
.ZipFile(io
.BytesIO(part
.get_payload(decode
=True))) as zf
:
19 fn
= zf
.infolist()[0].filename
20 contents
= zf
.read(fn
).decode('utf-8')
21 return xml
.etree
.ElementTree
.fromstring(contents
)
22 except zipfile
.BadZipFile
:
28 if message
.is_multipart():
29 for p
in message
.get_payload():
30 if 'zip' in p
.get_content_type():
31 reports
+= [xml_of_part(p
)]
33 reports
= [xml_of_part(message
)]
36 def extract_report(msg
):
37 pmsg
= email
.message_from_bytes(msg
)
40 def maybe_strip(text
):
47 './policy_published/adkim':
48 {'pg_field_name': 'policy_published_adkim',
49 'pg_table': 'reports', 'pg_type': 'varchar'},
50 './policy_published/aspf':
51 {'pg_field_name': 'policy_published_aspf',
52 'pg_table': 'reports', 'pg_type': 'varchar'},
53 './policy_published/domain':
54 {'pg_field_name': 'policy_published_domain',
55 'pg_table': 'reports', 'pg_type': 'varchar'},
56 './policy_published/p':
57 {'pg_field_name': 'policy_published_p',
58 'pg_table': 'reports', 'pg_type': 'varchar'},
59 './policy_published/pct':
60 {'pg_field_name': 'policy_published_pct',
61 'pg_table': 'reports', 'pg_type': 'int'},
62 './record[{}]/auth_results/dkim/domain':
63 {'pg_field_name': 'auth_results_dkim_domain',
64 'pg_table': 'report_items', 'pg_type': 'varchar'},
65 './record[{}]/auth_results/dkim/result':
66 {'pg_field_name': 'auth_results_dkim_result',
67 'pg_table': 'report_items', 'pg_type': 'varchar'},
68 './record[{}]/auth_results/spf/domain':
69 {'pg_field_name': 'auth_results_spf_domain',
70 'pg_table': 'report_items', 'pg_type': 'varchar'},
71 './record[{}]/auth_results/spf/result':
72 {'pg_field_name': 'auth_results_spf_result',
73 'pg_table': 'report_items', 'pg_type': 'varchar'},
74 './record[{}]/identifiers/header_from':
75 {'pg_field_name': 'identifiers_header_from',
76 'pg_table': 'report_items', 'pg_type': 'varchar'},
77 './record[{}]/row/count':
78 {'pg_field_name': 'count',
79 'pg_table': 'report_items', 'pg_type': 'int'},
80 './record[{}]/row/policy_evaluated/disposition':
81 {'pg_field_name': 'policy_evaluated_disposition',
82 'pg_table': 'report_items', 'pg_type': 'varchar'},
83 './record[{}]/row/policy_evaluated/dkim':
84 {'pg_field_name': 'policy_evaluated_dkim',
85 'pg_table': 'report_items', 'pg_type': 'varchar'},
86 './record[{}]/row/policy_evaluated/spf':
87 {'pg_field_name': 'policy_evaluated_spf',
88 'pg_table': 'report_items', 'pg_type': 'varchar'},
89 './record[{}]/row/source_ip':
90 {'pg_field_name': 'source_ip',
91 'pg_table': 'report_items', 'pg_type': 'inet'},
92 './report_metadata/date_range/begin':
93 {'pg_field_name': 'report_metadata_date_range_begin',
94 'pg_table': 'reports', 'pg_type': 'timestamp'},
95 './report_metadata/date_range/end':
96 {'pg_field_name': 'report_metadata_date_range_end',
97 'pg_table': 'reports', 'pg_type': 'timestamp'},
98 './report_metadata/email':
99 {'pg_field_name': 'report_metadata_email',
100 'pg_table': 'reports', 'pg_type': 'varchar'},
101 './report_metadata/org_name':
102 {'pg_field_name': 'report_metadata_org_name',
103 'pg_table': 'reports', 'pg_type': 'varchar'},
104 './report_metadata/report_id':
105 {'pg_field_name': 'report_metadata_report_id',
106 'pg_table': 'reports', 'pg_type': 'varchar'}}
110 def build_insert_command(table_name
, report
, preamble_values
=None, i
=None):
113 values
= preamble_values
.copy()
116 for f
in [f
for f
in field_maps
if field_maps
[f
]['pg_table'] == table_name
]:
121 field_names
+= [field_maps
[f
]['pg_field_name']]
122 if field_maps
[f
]['pg_type'] == 'int':
123 values
[field_maps
[f
]['pg_field_name']] = int(report
.find(fp
).text
)
124 elif field_maps
[f
]['pg_type'] == 'timestamp':
125 values
[field_maps
[f
]['pg_field_name']] = datetime
.datetime
.utcfromtimestamp(int(report
.find(fp
).text
))
126 elif field_maps
[f
]['pg_type'] == 'inet':
127 values
[field_maps
[f
]['pg_field_name']] = maybe_strip(report
.find(fp
).text
)
129 values
[field_maps
[f
]['pg_field_name']] = maybe_strip(report
.find(fp
).text
)
130 insert_string
= 'insert into {} ('.format(table_name
)
132 insert_string
+= ', '.join(sorted(preamble_values
.keys())) + ', '
133 insert_string
+= ', '.join(field_names
) + ') '
134 insert_string
+= 'values ('
136 insert_string
+= ', '.join('%({})s'.format(fn
) for fn
in sorted(preamble_values
.keys())) + ', '
137 insert_string
+= ', '.join('%({})s'.format(f
) for f
in field_names
) + ');'
138 return insert_string
, values
141 def write_report(connection
, cursor
, report
):
142 insert_string
, values
= build_insert_command('reports', report
)
143 # print(insert_string, values)
144 cursor
.execute(insert_string
, values
)
146 for i
in range(1, len(report
.findall('./record'))+1):
148 cursor
.execute('select id, report_metadata_report_id from reports where report_metadata_report_id = %s;',
149 [report
.find('./report_metadata/report_id').text
])
150 results
= cursor
.fetchall()
151 if len(results
) != 1:
152 raise RuntimeError('Could not find report record for report item')
154 report_id
= results
[0][0]
155 insert_string
, values
= build_insert_command('report_items', report
, i
=i
,
156 preamble_values
={'report_id': report_id
})
157 # print(insert_string, values)
158 cursor
.execute(insert_string
, values
)
161 config
= configparser
.ConfigParser()
162 config
.read('dmarc.ini')
164 conn
= psycopg2
.connect(host
=config
['database']['server'],
165 database
=config
['database']['database'],
166 user
=config
['database']['username'],
167 password
=config
['database']['password'])
170 cur
.execute('select max(report_metadata_date_range_end) from reports')
171 results
= cur
.fetchall()
172 most_recent_date
= results
[0][0]
174 mailbox
= imaplib
.IMAP4(host
=config
['imap']['server'],
175 port
=config
['imap']['port'])
177 mailbox
.login(config
['imap']['username'], config
['imap']['password'])
178 mailbox
.select('INBOX', readonly
=True)
182 mails_from
= "SINCE " + (most_recent_date
- datetime
.timedelta(days
=2)).strftime("%d-%b-%Y")
185 resp
, nums
= mailbox
.uid('SEARCH', None, mails_from
)
188 dmarc_reports
= [report
for report_set
in [extract_report(fetch_msg(n
)) for n
in nums
[0].split()]
189 for report
in report_set
195 for report
in dmarc_reports
:
196 cur
.execute('select id, report_metadata_report_id from reports where report_metadata_report_id = %s;',
197 [report
.find('./report_metadata/report_id').text
])
198 results
= cur
.fetchall()
200 print('write', report
.find('./report_metadata/report_id').text
)
201 write_report(conn
, cur
, report
)