Updated for imported data format
[covid19.git] / data_import.py
1 #!/usr/bin/env python
2 # coding: utf-8
3 # %%
4 # Data from [European Centre for Disease Prevention and Control](https://www.ecdc.europa.eu/en/publications-data/download-todays-data-geographic-distribution-covid-19-cases-worldwide)
5
6 from sqlalchemy.types import Integer, Text, String, DateTime, Date, Float
7 from sqlalchemy import create_engine
8
9 # import itertools
10 # import collections
11 # import json
12 import pandas as pd
13 import numpy as np
14 # from scipy.stats import gmean
15 import datetime
16 import os
17
18
19 # %%
20 connection_string = 'postgresql://covid:3NbjJTkT63@localhost/covid'
21
22 eng = create_engine(connection_string)
23 engine = eng.execution_options(isolation_level="AUTOCOMMIT")
24
25
26 # %%
27 DEATH_COUNT_THRESHOLD = 10
28 COUNTRIES_CORE = 'IT DE UK ES IE FR BE'.split()
29 COUNTRIES_NORDIC = 'SE NO DK FI UK'.split()
30 COUNTRIES_FRIENDS = 'IT UK ES BE SI MX'.split()
31 # COUNTRIES_FRIENDS = 'IT UK ES BE SI PT'.split()
32
33 COUNTRIES_AMERICAS = ['AG', 'AR', 'AW', 'BS', 'BB', 'BZ', 'BM', 'BO', 'BR', 'VG', 'KY', # excluding Canada and USA
34 'CL', 'CO', 'CR', 'CU', 'CW', 'DM', 'DO', 'EC', 'SV', 'GL', 'GD', 'GT',
35 'GY', 'HT', 'HN', 'JM', 'MX', 'MS', 'NI', 'PA', 'PY', 'PE', 'PR', 'KN',
36 'LC', 'VC', 'SX', 'SR', 'TT', 'TC', 'VI', 'UY', 'VE']
37 COUNTRIES_OF_INTEREST = list(set(COUNTRIES_CORE + COUNTRIES_FRIENDS))
38 COUNTRIES_ALL = list(set(COUNTRIES_CORE + COUNTRIES_FRIENDS + COUNTRIES_NORDIC + COUNTRIES_AMERICAS))
39
40
41 # %%
42 os.system('curl https://opendata.ecdc.europa.eu/covid19/casedistribution/csv/ > covid.csv')
43
44
45 # %%
46 # First col is a date, treat geoId of NA as 'Namibia', not "NA" value
47 raw_data = pd.read_csv('covid.csv',
48 parse_dates=[0], dayfirst=True,
49 keep_default_na=False, na_values = [''],
50 # dtype = {'day': np.int64,
51 # 'month': np.int64,
52 # 'year': np.int64,
53 # 'cases': np.int64,
54 # 'deaths': np.int64,
55 # 'countriesAndTerritories': str,
56 # 'geoId': str,
57 # 'countryterritoryCode': str,
58 # 'popData2019': np.int64,
59 # 'continentExp': str,
60 # }
61 )
62
63 raw_data.fillna(0, inplace=True)
64
65
66 # %%
67 COUNTRIES_ALL
68
69 # %%
70 # raw_data.rename(columns={'dateRep':'report_date', 'geoId': 'geo_id',
71 # 'countriesAndTerritories': 'country_name',
72 # 'countryterritoryCode': 'country_territory_code',
73 # 'popData2019': 'population_2019',
74 # 'continentExp': 'continent'}, inplace=True)
75 raw_data.rename(columns={'dateRep':'report_date', 'geoId': 'geo_id',
76 'countriesAndTerritories': 'country_name',
77 'countryterritoryCode': 'country_territory_code',
78 'cases': 'cases_weekly',
79 'deaths': 'deaths_weekly',
80 'popData2019': 'population_2019',
81 'continentExp': 'continent',
82 'Cumulative_number_for_14_days_of_COVID-19_cases_per_100000': 'notification_rate_per_100000_population_14-days'
83 },
84 inplace=True)
85
86
87 # %%
88 raw_data[['report_date', 'cases_weekly', 'deaths_weekly', 'geo_id', 'notification_rate_per_100000_population_14-days']].to_sql(
89 'weekly_cases',
90 engine,
91 if_exists='replace',
92 index=False,
93 chunksize=500,
94 dtype={
95 "report_date": Date,
96 "cases_weekly": Integer,
97 "deaths_weekly": Integer,
98 "geo_id": String,
99 "notification_rate_per_100000_population_14-days": Float
100 }
101 )
102
103
104 # %%
105 raw_data[['country_name', 'geo_id', 'country_territory_code',
106 'population_2019', 'continent']].drop_duplicates().to_sql(
107 'countries',
108 engine,
109 if_exists='replace',
110 index=False,
111 chunksize=500,
112 dtype={
113 "country_name": Text,
114 "geo_id": String,
115 "country_territory_code": String,
116 "population_2019": Integer,
117 "continent": Text
118 }
119 )
120
121
122 # %%
123 with engine.connect() as connection:
124 connection.execute('alter table weekly_cases add primary key (geo_id, report_date)')
125 connection.execute('alter table countries add primary key (geo_id);')
126 connection.execute('alter table weekly_cases add foreign key (geo_id) references countries(geo_id);')
127 connection.execute('alter table weekly_cases add culm_cases integer;')
128 connection.execute('alter table weekly_cases add culm_deaths integer;')
129
130
131 # %%
132 query_string = '''with culm as
133 (select report_date, geo_id,
134 sum(cases_weekly) over (partition by geo_id
135 order by report_date) as culm_data
136 from weekly_cases
137 )
138 update weekly_cases
139 set culm_cases = culm_data
140 from culm
141 where weekly_cases.report_date = culm.report_date and
142 weekly_cases.geo_id = culm.geo_id'''
143 with engine.connect() as connection:
144 connection.execute(query_string)
145
146
147 # %%
148 query_string = '''with culm as
149 (select report_date, geo_id,
150 sum(deaths_weekly) over (partition by geo_id
151 order by report_date) as culm_data
152 from weekly_cases
153 )
154 update weekly_cases
155 set culm_deaths = culm_data
156 from culm
157 where weekly_cases.report_date = culm.report_date and
158 weekly_cases.geo_id = culm.geo_id'''
159 with engine.connect() as connection:
160 connection.execute(query_string)
161
162
163 # %%
164 uk_query_string = (
165 "https://api.coronavirus.data.gov.uk/v2/data?areaType=overview&"
166 "metric=covidOccupiedMVBeds&"
167 "metric=newAdmissions&"
168 "metric=newCasesBySpecimenDate&"
169 "metric=hospitalCases&"
170 "metric=newDeaths28DaysByPublishDate&"
171 "format=csv"
172 )
173
174 os.system(f'curl "{uk_query_string}" > uk_data.csv')
175
176
177 # %%
178 test_query_string = (
179 "https://api.coronavirus.data.gov.uk/v2/data?areaType=overview&"
180 "metric=newPCRTestsByPublishDate&"
181 "metric=newTestsByPublishDate&"
182 "metric=newPillarOneTwoTestsByPublishDate&"
183 "format=csv"
184 )
185 os.system(f'curl "{test_query_string}" > test_data.csv')
186
187
188 # %%
189 uk_data = pd.read_csv('uk_data.csv',
190 parse_dates=[0], dayfirst=True)
191
192
193 # %%
194 test_data = pd.read_csv('test_data.csv',
195 parse_dates=[0], dayfirst=True)
196
197
198 # %%
199 uk_data = uk_data.merge(test_data[['date', 'newPCRTestsByPublishDate',
200 'newTestsByPublishDate', 'newPillarOneTwoTestsByPublishDate']], how='outer', on='date')
201
202
203 # %%
204 uk_data.rename(
205 columns={
206 'covidOccupiedMVBeds': 'ventilator_beds',
207 'newCasesBySpecimenDate': 'new_cases',
208 'hospitalCases': 'hospital_cases',
209 'newDeaths28DaysByPublishDate': 'new_deaths',
210 'newAdmissions': 'new_admissions',
211 'newPCRTestsByPublishDate': 'new_pcr_tests',
212 'newTestsByPublishDate': 'new_tests',
213 'newPillarOneTwoTestsByPublishDate': 'new_pillar_1_2_tests'
214 }, inplace=True)
215
216
217 # %%
218 uk_data[['date',
219 'hospital_cases', 'ventilator_beds',
220 'new_cases', 'new_deaths',
221 'hospital_cases', 'new_admissions',
222 'new_pcr_tests', 'new_tests', 'new_pillar_1_2_tests'
223 ]].to_sql(
224 'uk_data',
225 engine,
226 if_exists='replace',
227 index=False,
228 chunksize=500,
229 dtype={
230 "date": Date,
231 "hospital_cases": Integer,
232 "ventilator_beds": Integer,
233 "new_cases": Integer,
234 "hospital_cases": Integer,
235 "new_deaths": Integer,
236 "new_admissions": Integer,
237 'new_pcr_tests': Integer,
238 'new_tests': Integer,
239 'new_pillar_1_2_tests': Integer
240 }
241 )
242
243
244 # %%
245 query_string = '''drop table if exists uk_data_7;
246 create table uk_data_7
247 (date date primary key,
248 hospital_cases real,
249 ventilator_beds real,
250 new_cases real,
251 new_deaths real,
252 new_admissions real,
253 new_pcr_tests real,
254 new_tests real,
255 new_pillar_1_2_tests real
256 );'''
257
258 with engine.connect() as connection:
259 connection.execute(query_string)
260
261
262 # %%
263 query_string = '''with ownd as (
264 select date,
265 avg(hospital_cases) over wnd as w_hospital_cases,
266 avg(ventilator_beds) over wnd as w_ventilator_beds,
267 avg(new_cases) over wnd as w_new_cases,
268 avg(new_deaths) over wnd as w_new_deaths,
269 avg(new_admissions) over wnd as w_new_admissions,
270 avg(new_pcr_tests) over wnd as w_new_pcr_tests,
271 avg(new_tests) over wnd as w_new_tests,
272 avg(new_pillar_1_2_tests) over wnd as w_new_pillar_1_2_tests,
273 count(*) over wnd as w_size
274 from uk_data
275 window wnd as (
276 order by uk_data.date
277 rows between 3 preceding and 3 following
278 )
279 )
280 insert into uk_data_7(date,
281 hospital_cases,
282 ventilator_beds,
283 new_cases,
284 new_deaths,
285 new_admissions,
286 new_pcr_tests,
287 new_tests,
288 new_pillar_1_2_tests
289 )
290 (select date,
291 w_hospital_cases,
292 w_ventilator_beds,
293 w_new_cases,
294 w_new_deaths,
295 w_new_admissions,
296 w_new_pcr_tests,
297 w_new_tests,
298 w_new_pillar_1_2_tests
299 from ownd
300 where w_size = 7
301 )'''
302 with engine.connect() as connection:
303 connection.execute(query_string)
304