8 import xml
.etree
.ElementTree
14 parser
= argparse
.ArgumentParser(description
='Process DMARC records.')
15 parser
.add_argument('-c', '--config', action
='store',
16 default
='', dest
='config_file',
17 help='Path to config file')
18 parser
.add_argument('-t', '--test', action
='store_true',
20 help='Test, but do not add records to the database')
21 args
= parser
.parse_args()
24 return mailbox
.uid('FETCH', num
, '(RFC822)')[1][0][1]
26 def xml_of_part(part
):
28 with zipfile
.ZipFile(io
.BytesIO(part
.get_payload(decode
=True))) as zf
:
29 fn
= zf
.infolist()[0].filename
30 contents
= zf
.read(fn
).decode('utf-8')
31 return xml
.etree
.ElementTree
.fromstring(contents
)
32 except zipfile
.BadZipFile
:
38 if message
.is_multipart():
39 for p
in message
.get_payload():
40 if 'zip' in p
.get_content_type():
41 reports
+= [xml_of_part(p
)]
43 reports
= [xml_of_part(message
)]
46 def extract_report(msg
):
47 pmsg
= email
.message_from_bytes(msg
)
50 def maybe_strip(text
):
56 field_maps
= {'./policy_published/adkim': {'pg_field_name': 'policy_published_adkim',
57 'pg_table': 'reports',
58 'pg_type': 'varchar'},
59 './policy_published/aspf': {'pg_field_name': 'policy_published_aspf',
60 'pg_table': 'reports',
61 'pg_type': 'varchar'},
62 './policy_published/domain': {'pg_field_name': 'policy_published_domain',
63 'pg_table': 'reports',
64 'pg_type': 'varchar'},
65 './policy_published/p': {'pg_field_name': 'policy_published_p',
66 'pg_table': 'reports',
67 'pg_type': 'varchar'},
68 './policy_published/pct': {'pg_field_name': 'policy_published_pct',
69 'pg_table': 'reports',
71 './record[{}]/auth_results/dkim/domain': {'pg_field_name': 'auth_results_dkim_domain',
72 'pg_table': 'report_items',
73 'pg_type': 'varchar'},
74 './record[{}]/auth_results/dkim/result': {'pg_field_name': 'auth_results_dkim_result',
75 'pg_table': 'report_items',
76 'pg_type': 'varchar'},
77 './record[{}]/auth_results/spf/domain': {'pg_field_name': 'auth_results_spf_domain',
78 'pg_table': 'report_items',
79 'pg_type': 'varchar'},
80 './record[{}]/auth_results/spf/result': {'pg_field_name': 'auth_results_spf_result',
81 'pg_table': 'report_items',
82 'pg_type': 'varchar'},
83 './record[{}]/identifiers/header_from': {'pg_field_name': 'identifiers_header_from',
84 'pg_table': 'report_items',
85 'pg_type': 'varchar'},
86 './record[{}]/row/count': {'pg_field_name': 'count',
87 'pg_table': 'report_items',
89 './record[{}]/row/policy_evaluated/disposition': {'pg_field_name': 'policy_evaluated_disposition',
90 'pg_table': 'report_items',
91 'pg_type': 'varchar'},
92 './record[{}]/row/policy_evaluated/dkim': {'pg_field_name': 'policy_evaluated_dkim',
93 'pg_table': 'report_items',
94 'pg_type': 'varchar'},
95 './record[{}]/row/policy_evaluated/spf': {'pg_field_name': 'policy_evaluated_spf',
96 'pg_table': 'report_items',
97 'pg_type': 'varchar'},
98 './record[{}]/row/source_ip': {'pg_field_name': 'source_ip',
99 'pg_table': 'report_items',
101 './report_metadata/date_range/begin': {'pg_field_name': 'report_metadata_date_range_begin',
102 'pg_table': 'reports',
103 'pg_type': 'timestamptz'},
104 './report_metadata/date_range/end': {'pg_field_name': 'report_metadata_date_range_end',
105 'pg_table': 'reports',
106 'pg_type': 'timestamptz'},
107 './report_metadata/email': {'pg_field_name': 'report_metadata_email',
108 'pg_table': 'reports',
109 'pg_type': 'varchar'},
110 './report_metadata/org_name': {'pg_field_name': 'report_metadata_org_name',
111 'pg_table': 'reports',
112 'pg_type': 'varchar'},
113 './report_metadata/report_id': {'pg_field_name': 'report_metadata_report_id',
114 'pg_table': 'reports',
115 'pg_type': 'varchar'}}
119 def build_insert_command(table_name
, report
, preamble_values
=None, i
=None):
122 values
= preamble_values
.copy()
125 for f
in [f
for f
in field_maps
if field_maps
[f
]['pg_table'] == table_name
]:
130 field_names
+= [field_maps
[f
]['pg_field_name']]
131 if field_maps
[f
]['pg_type'] == 'int':
132 values
[field_maps
[f
]['pg_field_name']] = int(report
.find(fp
).text
)
133 elif field_maps
[f
]['pg_type'] == 'timestamptz':
134 # values[field_maps[f]['pg_field_name']] = datetime.datetime.utcfromtimestamp(int(report.find(fp).text))
135 values
[field_maps
[f
]['pg_field_name']] = \
136 datetime
.datetime
.fromtimestamp(int(report
.find(fp
).text
),
137 tz
=datetime
.timezone
.utc
)
138 elif field_maps
[f
]['pg_type'] == 'inet':
139 values
[field_maps
[f
]['pg_field_name']] = maybe_strip(report
.find(fp
).text
)
141 values
[field_maps
[f
]['pg_field_name']] = maybe_strip(report
.find(fp
).text
)
142 insert_string
= 'insert into {} ('.format(table_name
)
144 insert_string
+= ', '.join(sorted(preamble_values
.keys())) + ', '
145 insert_string
+= ', '.join(field_names
) + ') '
146 insert_string
+= 'values ('
148 insert_string
+= ', '.join('%({})s'.format(fn
) for fn
in sorted(preamble_values
.keys())) + ', '
149 insert_string
+= ', '.join('%({})s'.format(f
) for f
in field_names
) + ');'
150 return insert_string
, values
153 def write_report(connection
, cursor
, report
):
155 insert_string
, values
= build_insert_command('reports', report
)
156 # print(insert_string, values)
157 cursor
.execute(insert_string
, values
)
159 for i
in range(1, len(report
.findall('./record'))+1):
161 cursor
.execute('select id, report_metadata_report_id from reports where report_metadata_report_id = %s;',
162 [report
.find('./report_metadata/report_id').text
])
163 results
= cursor
.fetchall()
164 if len(results
) != 1:
165 raise RuntimeError('Could not find report record for report item')
167 report_id
= results
[0][0]
168 insert_string
, values
= build_insert_command('report_items', report
, i
=i
,
169 preamble_values
={'report_id': report_id
})
170 # print(insert_string, values)
171 cursor
.execute(insert_string
, values
)
173 except AttributeError:
176 config
= configparser
.ConfigParser()
178 config
.read(args
.config_file
)
180 config
.read(['/etc/dmarc_to_database.ini', './dmarc_to_database.ini'])
182 if not config
.sections():
183 raise RuntimeError('Could not find configuration file')
185 conn
= psycopg2
.connect(host
=config
['database']['server'],
186 database
=config
['database']['database'],
187 user
=config
['database']['username'],
188 password
=config
['database']['password'])
191 cur
.execute('select max(report_metadata_date_range_end) from reports')
192 results
= cur
.fetchall()
193 most_recent_date
= results
[0][0]
195 mailbox
= imaplib
.IMAP4(host
=config
['imap']['server'],
196 port
=config
['imap']['port'])
198 mailbox
.login(config
['imap']['username'], config
['imap']['password'])
199 mailbox
.select('INBOX', readonly
=True)
203 mails_from
= "SINCE " + (most_recent_date
- datetime
.timedelta(days
=2)).strftime("%d-%b-%Y")
206 resp
, nums
= mailbox
.uid('SEARCH', None, mails_from
)
209 dmarc_reports
= [report
for report_set
in [extract_report(fetch_msg(n
)) for n
in nums
[0].split()]
210 for report
in report_set
216 for report
in dmarc_reports
:
217 cur
.execute('select id, report_metadata_report_id from reports where report_metadata_report_id = %s;',
218 [report
.find('./report_metadata/report_id').text
])
219 results
= cur
.fetchall()
221 print('write', report
.find('./report_metadata/report_id').text
)
223 write_report(conn
, cur
, report
)