Now using py files, for automation
[covid19.git] / data_import.py
diff --git a/data_import.py b/data_import.py
new file mode 100644 (file)
index 0000000..20593c7
--- /dev/null
@@ -0,0 +1,291 @@
+#!/usr/bin/env python
+# coding: utf-8
+# %%
+# Data from [European Centre for Disease Prevention and Control](https://www.ecdc.europa.eu/en/publications-data/download-todays-data-geographic-distribution-covid-19-cases-worldwide)
+
+from sqlalchemy.types import Integer, Text, String, DateTime, Date, Float
+from sqlalchemy import create_engine
+
+# import itertools
+# import collections
+# import json
+import pandas as pd
+import numpy as np
+# from scipy.stats import gmean
+import datetime
+import os
+
+
+# %%
+connection_string = 'postgresql://covid:3NbjJTkT63@localhost/covid'
+
+eng = create_engine(connection_string)
+engine = eng.execution_options(isolation_level="AUTOCOMMIT")
+
+
+# %%
+DEATH_COUNT_THRESHOLD = 10
+COUNTRIES_CORE = 'IT DE UK ES IE FR BE'.split()
+COUNTRIES_NORDIC = 'SE NO DK FI UK'.split()
+COUNTRIES_FRIENDS = 'IT UK ES BE SI MX'.split()
+# COUNTRIES_FRIENDS = 'IT UK ES BE SI PT'.split()
+
+COUNTRIES_AMERICAS = ['AG', 'AR', 'AW', 'BS', 'BB', 'BZ', 'BM', 'BO', 'BR', 'VG', 'KY', # excluding Canada and USA
+       'CL', 'CO', 'CR', 'CU', 'CW', 'DM', 'DO', 'EC', 'SV', 'GL', 'GD', 'GT',
+       'GY', 'HT', 'HN', 'JM', 'MX', 'MS', 'NI', 'PA', 'PY', 'PE', 'PR', 'KN',
+       'LC', 'VC', 'SX', 'SR', 'TT', 'TC', 'VI', 'UY', 'VE']
+COUNTRIES_OF_INTEREST = list(set(COUNTRIES_CORE + COUNTRIES_FRIENDS))
+COUNTRIES_ALL = list(set(COUNTRIES_CORE + COUNTRIES_FRIENDS + COUNTRIES_NORDIC + COUNTRIES_AMERICAS))
+
+
+# %%
+os.system('curl https://opendata.ecdc.europa.eu/covid19/casedistribution/csv/ > covid.csv')
+
+
+# %%
+# First col is a date, treat geoId of NA as 'Namibia', not "NA" value
+raw_data = pd.read_csv('covid.csv', 
+                       parse_dates=[0], dayfirst=True,
+                       keep_default_na=False, na_values = [''],
+#                        dtype = {'day': np.int64, 
+#                                 'month': np.int64, 
+#                                 'year': np.int64, 
+#                                 'cases': np.int64, 
+#                                 'deaths': np.int64, 
+#                                 'countriesAndTerritories': str, 
+#                                 'geoId': str, 
+#                                 'countryterritoryCode': str, 
+#                                 'popData2019': np.int64, 
+#                                 'continentExp': str, 
+#                                 }
+                      )
+
+raw_data.fillna(0, inplace=True)
+
+
+# %%
+raw_data.rename(columns={'dateRep':'report_date', 'geoId': 'geo_id',
+                        'countriesAndTerritories': 'country_name',
+                        'countryterritoryCode': 'country_territory_code',
+                        'popData2019': 'population_2019',
+                        'continentExp': 'continent'}, inplace=True)
+
+
+# %%
+raw_data[['report_date', 'cases_weekly', 'deaths_weekly', 'geo_id', 'notification_rate_per_100000_population_14-days']].to_sql(
+    'weekly_cases',
+    engine,
+    if_exists='replace',
+    index=False,
+    chunksize=500,
+    dtype={
+        "report_date": Date,
+        "cases_weekly": Integer,
+        "deaths_weekly": Integer,
+        "geo_id": String,
+        "notification_rate_per_100000_population_14-days": Float
+    }
+)
+
+
+# %%
+raw_data[['country_name', 'geo_id', 'country_territory_code',
+       'population_2019', 'continent']].drop_duplicates().to_sql(
+    'countries',
+    engine,
+    if_exists='replace',
+    index=False,
+    chunksize=500,
+    dtype={
+        "country_name": Text,
+        "geo_id": String,
+        "country_territory_code": String,
+        "population_2019": Integer,
+        "continent": Text
+    }
+)
+
+
+# %%
+with engine.connect() as connection:
+    connection.execute('alter table weekly_cases add primary key (geo_id, report_date)')
+    connection.execute('alter table countries add primary key (geo_id);')
+    connection.execute('alter table weekly_cases add foreign key (geo_id) references countries(geo_id);')
+    connection.execute('alter table weekly_cases add culm_cases integer;')
+    connection.execute('alter table weekly_cases add culm_deaths integer;')
+
+
+# %%
+query_string = '''with culm as 
+    (select report_date, geo_id,
+        sum(cases_weekly) over (partition by geo_id 
+                                order by report_date) as culm_data
+     from weekly_cases
+    )
+update weekly_cases
+    set culm_cases = culm_data
+    from culm
+    where weekly_cases.report_date = culm.report_date and
+          weekly_cases.geo_id = culm.geo_id'''
+with engine.connect() as connection:
+    connection.execute(query_string)
+
+
+# %%
+query_string = '''with culm as 
+    (select report_date, geo_id,
+        sum(deaths_weekly) over (partition by geo_id 
+                                order by report_date) as culm_data
+     from weekly_cases
+    )
+update weekly_cases
+    set culm_deaths = culm_data
+    from culm
+    where weekly_cases.report_date = culm.report_date and
+          weekly_cases.geo_id = culm.geo_id'''
+with engine.connect() as connection:
+    connection.execute(query_string)
+
+
+# %%
+uk_query_string = (
+"https://api.coronavirus.data.gov.uk/v2/data?areaType=overview&"
+"metric=covidOccupiedMVBeds&"
+"metric=newAdmissions&"
+"metric=newCasesBySpecimenDate&"
+"metric=hospitalCases&"
+"metric=newDeaths28DaysByPublishDate&"
+"format=csv"
+)
+
+os.system(f'curl "{uk_query_string}" > uk_data.csv')
+
+
+# %%
+test_query_string = (
+"https://api.coronavirus.data.gov.uk/v2/data?areaType=overview&"
+"metric=newPCRTestsByPublishDate&"
+"metric=newTestsByPublishDate&"
+"metric=newPillarOneTwoTestsByPublishDate&"
+"format=csv"
+)
+os.system(f'curl "{test_query_string}" > test_data.csv')
+
+
+# %%
+uk_data = pd.read_csv('uk_data.csv', 
+                       parse_dates=[0], dayfirst=True)
+
+
+# %%
+test_data = pd.read_csv('test_data.csv', 
+                       parse_dates=[0], dayfirst=True)
+
+
+# %%
+uk_data = uk_data.merge(test_data[['date', 'newPCRTestsByPublishDate',
+       'newTestsByPublishDate', 'newPillarOneTwoTestsByPublishDate']], how='outer', on='date')
+
+
+# %%
+uk_data.rename(
+    columns={
+        'covidOccupiedMVBeds': 'ventilator_beds',
+        'newCasesBySpecimenDate': 'new_cases',
+        'hospitalCases': 'hospital_cases', 
+        'newDeaths28DaysByPublishDate': 'new_deaths',
+        'newAdmissions': 'new_admissions',
+        'newPCRTestsByPublishDate': 'new_pcr_tests',
+        'newTestsByPublishDate': 'new_tests',
+        'newPillarOneTwoTestsByPublishDate': 'new_pillar_1_2_tests'
+    }, inplace=True)
+
+
+# %%
+uk_data[['date', 
+         'hospital_cases', 'ventilator_beds',
+         'new_cases', 'new_deaths', 
+         'hospital_cases', 'new_admissions',
+         'new_pcr_tests', 'new_tests', 'new_pillar_1_2_tests'
+        ]].to_sql(
+    'uk_data',
+    engine,
+    if_exists='replace',
+    index=False,
+    chunksize=500,
+    dtype={
+        "date": Date,
+        "hospital_cases": Integer,
+        "ventilator_beds": Integer,
+        "new_cases": Integer,
+        "hospital_cases": Integer,
+        "new_deaths": Integer,
+        "new_admissions": Integer,
+        'new_pcr_tests': Integer, 
+        'new_tests': Integer, 
+        'new_pillar_1_2_tests': Integer
+    }
+)
+
+
+# %%
+query_string = '''drop table if exists uk_data_7;
+create table uk_data_7 
+(date date primary key,
+ hospital_cases real,
+ ventilator_beds real,
+ new_cases real,
+ new_deaths real,
+ new_admissions real,
+ new_pcr_tests real,
+ new_tests real,
+ new_pillar_1_2_tests real
+);'''
+
+with engine.connect() as connection:
+    connection.execute(query_string)
+
+
+# %%
+query_string = '''with ownd as (
+    select date,
+        avg(hospital_cases) over wnd as w_hospital_cases,
+        avg(ventilator_beds) over wnd as w_ventilator_beds,
+        avg(new_cases) over wnd as w_new_cases,
+        avg(new_deaths) over wnd as w_new_deaths,
+        avg(new_admissions) over wnd as w_new_admissions,
+        avg(new_pcr_tests) over wnd as w_new_pcr_tests,
+        avg(new_tests) over wnd as w_new_tests,
+        avg(new_pillar_1_2_tests) over wnd as w_new_pillar_1_2_tests,
+        count(*) over wnd as w_size
+    from uk_data
+    window wnd as (
+        order by uk_data.date
+        rows between 3 preceding and 3 following
+    )
+)
+insert into uk_data_7(date, 
+    hospital_cases, 
+    ventilator_beds, 
+    new_cases,
+    new_deaths,
+    new_admissions, 
+    new_pcr_tests, 
+    new_tests, 
+    new_pillar_1_2_tests
+    )
+(select date,
+        w_hospital_cases,
+        w_ventilator_beds,
+        w_new_cases,
+        w_new_deaths,
+        w_new_admissions,
+        w_new_pcr_tests,
+        w_new_tests,
+        w_new_pillar_1_2_tests
+    from ownd
+    where w_size = 7
+)'''
+with engine.connect() as connection:
+    connection.execute(query_string)
+