Skip to content

Refactor covid_hosp auto columns #1203

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 24 additions & 35 deletions integrations/acquisition/covid_hosp/facility/test_scenarios.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@

# standard library
import unittest
from unittest.mock import MagicMock
from unittest.mock import patch

# first party
from delphi.epidata.acquisition.covid_hosp.common.database import Database
from delphi.epidata.acquisition.covid_hosp.facility.database import Database
from delphi.epidata.acquisition.covid_hosp.common.network import Network
from delphi.epidata.acquisition.covid_hosp.common.test_utils import UnitTestUtils
from delphi.epidata.acquisition.covid_hosp.common.utils import Utils
from delphi.epidata.client.delphi_epidata import Epidata
from delphi.epidata.acquisition.covid_hosp.facility.update import Update
from delphi.epidata.common.covid_hosp.covid_hosp_schema_io import CovidHospSomething
import delphi.operations.secrets as secrets

# third party
Expand Down Expand Up @@ -45,22 +47,17 @@ def setUp(self):
def test_acquire_dataset(self):
"""Acquire a new dataset."""

# only mock out network calls to external hosts
mock_network = MagicMock()
mock_network.fetch_metadata.return_value = \
self.test_utils.load_sample_metadata()
mock_network.fetch_dataset.return_value = \
self.test_utils.load_sample_dataset()

# make sure the data does not yet exist
with self.subTest(name='no data yet'):
response = Epidata.covid_hosp_facility(
'450822', Epidata.range(20200101, 20210101))
self.assertEqual(response['result'], -2, response)

# acquire sample data into local database
with self.subTest(name='first acquisition'):
acquired = Update.run(network=mock_network)
with self.subTest(name='first acquisition'), \
patch.object(Network, 'fetch_metadata', return_value=self.test_utils.load_sample_metadata()), \
patch.object(Network, 'fetch_dataset', return_value=self.test_utils.load_sample_dataset()):
acquired = Utils.update_dataset(Database)
self.assertTrue(acquired)

# make sure the data now exists
Expand Down Expand Up @@ -89,12 +86,14 @@ def test_acquire_dataset(self):
else:
self.assertEqual(row[k], v, f"row[{k}] is {row[k]} not {v}")

# expect 113 fields per row (114 database columns, except `id`)
self.assertEqual(len(row), 113)
# Expect len(row) to equal the amount of dynamic columns + one extra issue column
self.assertEqual(len(row), len(list(CovidHospSomething().columns('covid_hosp_facility'))) + 1)

# re-acquisition of the same dataset should be a no-op
with self.subTest(name='second acquisition'):
acquired = Update.run(network=mock_network)
with self.subTest(name='second acquisition'), \
patch.object(Network, 'fetch_metadata', return_value=self.test_utils.load_sample_metadata()), \
patch.object(Network, 'fetch_dataset', return_value=self.test_utils.load_sample_dataset()):
acquired = Utils.update_dataset(Database)
self.assertFalse(acquired)

# make sure the data still exists
Expand All @@ -108,16 +107,11 @@ def test_acquire_dataset(self):
def test_facility_lookup(self):
"""Lookup facilities using various filters."""

# only mock out network calls to external hosts
mock_network = MagicMock()
mock_network.fetch_metadata.return_value = \
self.test_utils.load_sample_metadata()
mock_network.fetch_dataset.return_value = \
self.test_utils.load_sample_dataset()

# acquire sample data into local database
with self.subTest(name='first acquisition'):
acquired = Update.run(network=mock_network)
with self.subTest(name='first acquisition'), \
patch.object(Network, 'fetch_metadata', return_value=self.test_utils.load_sample_metadata()), \
patch.object(Network, 'fetch_dataset', return_value=self.test_utils.load_sample_dataset()):
acquired = Utils.update_dataset(Database)
self.assertTrue(acquired)

# texas ground truth, sorted by `hospital_pk`
Expand Down Expand Up @@ -181,16 +175,11 @@ def test_facility_lookup(self):
response = Epidata.covid_hosp_facility_lookup(state='not a state')
self.assertEqual(response['result'], -2)

# update facility info
mock_network = MagicMock()
mock_network.fetch_metadata.return_value = \
self.test_utils.load_sample_metadata('metadata_update_facility.csv')
mock_network.fetch_dataset.return_value = \
self.test_utils.load_sample_dataset('dataset_update_facility.csv')

# acquire sample data into local database
with self.subTest(name='second acquisition'):
acquired = Update.run(network=mock_network)
# acquire sample data into local database with updated facility info
with self.subTest(name='second acquisition'), \
patch.object(Network, 'fetch_metadata', return_value=self.test_utils.load_sample_metadata('metadata_update_facility.csv')), \
patch.object(Network, 'fetch_dataset', return_value=self.test_utils.load_sample_dataset('dataset_update_facility.csv')):
acquired = Utils.update_dataset(Database)
self.assertTrue(acquired)

texas_hospitals[1]['zip'] = '88888'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
from delphi.epidata.acquisition.covid_hosp.state_daily.database import Database
from delphi.epidata.acquisition.covid_hosp.common.test_utils import UnitTestUtils
from delphi.epidata.client.delphi_epidata import Epidata
from delphi.epidata.acquisition.covid_hosp.state_daily.update import Update
from delphi.epidata.acquisition.covid_hosp.state_daily.network import Network
from delphi.epidata.acquisition.covid_hosp.common.network import Network
from delphi.epidata.acquisition.covid_hosp.common.utils import Utils
from delphi.epidata.common.covid_hosp.covid_hosp_schema_io import CovidHospSomething
import delphi.operations.secrets as secrets

# py3tester coverage target (equivalent to `import *`)
Expand Down Expand Up @@ -62,7 +62,7 @@ def test_acquire_dataset(self):
self.test_utils.load_sample_dataset("dataset0.csv"), # first dataset for 3/15
self.test_utils.load_sample_dataset()] # second dataset for 3/15
) as mock_fetch:
acquired = Update.run()
acquired = Utils.update_dataset(Database)
self.assertTrue(acquired)
self.assertEqual(mock_fetch_meta.call_count, 1)

Expand All @@ -82,8 +82,8 @@ def test_acquire_dataset(self):
self.assertAlmostEqual(actual, expected)
self.assertIsNone(row['critical_staffing_shortage_today_no'])

# expect 61 fields per row (62 database columns, except `id`) # TODO: ??? this is wrong!
self.assertEqual(len(row), 118)
# Expect len(row) to equal the amount of dynamic columns + one extra issue column
self.assertEqual(len(row), len(list(CovidHospSomething().columns('state_daily'))) + 1)

with self.subTest(name='all date batches acquired'):
response = Epidata.covid_hosp('WY', Epidata.range(20200101, 20210101), issues=20210313)
Expand All @@ -93,7 +93,7 @@ def test_acquire_dataset(self):
with self.subTest(name='second acquisition'), \
patch.object(Network, 'fetch_metadata', return_value=self.test_utils.load_sample_metadata()) as mock_fetch_meta, \
patch.object(Network, 'fetch_dataset', return_value=self.test_utils.load_sample_dataset()) as mock_fetch:
acquired = Update.run()
acquired = Utils.update_dataset(Database)
self.assertFalse(acquired)

# make sure the data still exists
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@
# standard library
import unittest
from unittest.mock import MagicMock
from unittest.mock import patch

# first party
from delphi.epidata.acquisition.covid_hosp.common.database import Database
from delphi.epidata.acquisition.covid_hosp.state_timeseries.database import Database
from delphi.epidata.acquisition.covid_hosp.common.network import Network
from delphi.epidata.acquisition.covid_hosp.common.test_utils import UnitTestUtils
from delphi.epidata.acquisition.covid_hosp.common.utils import Utils
from delphi.epidata.client.delphi_epidata import Epidata
from delphi.epidata.acquisition.covid_hosp.state_timeseries.update import Update
from delphi.epidata.common.covid_hosp.covid_hosp_schema_io import CovidHospSomething
import delphi.operations.secrets as secrets

# third party
Expand Down Expand Up @@ -45,21 +48,16 @@ def setUp(self):
def test_acquire_dataset(self):
"""Acquire a new dataset."""

# only mock out network calls to external hosts
mock_network = MagicMock()
mock_network.fetch_metadata.return_value = \
self.test_utils.load_sample_metadata()
mock_network.fetch_dataset.return_value = \
self.test_utils.load_sample_dataset()

# make sure the data does not yet exist
with self.subTest(name='no data yet'):
response = Epidata.covid_hosp('MA', Epidata.range(20200101, 20210101))
self.assertEqual(response['result'], -2)

# acquire sample data into local database
with self.subTest(name='first acquisition'):
acquired = Update.run(network=mock_network)
with self.subTest(name='first acquisition'), \
patch.object(Network, 'fetch_metadata', return_value=self.test_utils.load_sample_metadata()), \
patch.object(Network, 'fetch_dataset', return_value=self.test_utils.load_sample_dataset()):
acquired = Utils.update_dataset(Database)
self.assertTrue(acquired)

# make sure the data now exists
Expand All @@ -78,12 +76,14 @@ def test_acquire_dataset(self):
self.assertAlmostEqual(actual, expected)
self.assertIsNone(row['critical_staffing_shortage_today_no'])

# expect 61 fields per row (62 database columns, except `id`) # TODO: ??? this is wrong!
self.assertEqual(len(row), 118)
# Expect len(row) to equal the amount of dynamic columns + one extra issue column
self.assertEqual(len(row), len(list(CovidHospSomething().columns('state_timeseries'))) + 1)

# re-acquisition of the same dataset should be a no-op
with self.subTest(name='second acquisition'):
acquired = Update.run(network=mock_network)
with self.subTest(name='second acquisition'), \
patch.object(Network, 'fetch_metadata', return_value=self.test_utils.load_sample_metadata()), \
patch.object(Network, 'fetch_dataset', return_value=self.test_utils.load_sample_dataset()):
acquired = Utils.update_dataset(Database)
self.assertFalse(acquired)

# make sure the data still exists
Expand All @@ -93,13 +93,11 @@ def test_acquire_dataset(self):
self.assertEqual(len(response['epidata']), 1)

# acquire new data into local database
with self.subTest(name='first acquisition'):
with self.subTest(name='updated acquisition'), \
patch.object(Network, 'fetch_metadata', return_value=self.test_utils.load_sample_metadata("metadata2.csv")), \
patch.object(Network, 'fetch_dataset', return_value=self.test_utils.load_sample_dataset("dataset2.csv")):
# acquire new data with 3/16 issue date
mock_network.fetch_metadata.return_value = \
self.test_utils.load_sample_metadata("metadata2.csv")
mock_network.fetch_dataset.return_value = \
self.test_utils.load_sample_dataset("dataset2.csv")
acquired = Update.run(network=mock_network)
acquired = Utils.update_dataset(Database)
self.assertTrue(acquired)

with self.subTest(name='as_of checks'):
Expand Down
35 changes: 17 additions & 18 deletions src/acquisition/covid_hosp/common/database.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
"""Common database code used by multiple `covid_hosp` scrapers."""

# standard library
from collections import namedtuple
from contextlib import contextmanager
import math

Expand All @@ -12,17 +11,15 @@
# first party
import delphi.operations.secrets as secrets
from delphi.epidata.common.logger import get_structured_logger

Columndef = namedtuple("Columndef", "csv_name sql_name dtype")
from delphi.epidata.common.covid_hosp.covid_hosp_schema_io import CovidHospSomething

class Database:

DATASET_NAME = None

def __init__(self,
connection,
table_name=None,
hhs_dataset_id=None,
columns_and_types=None,
key_columns=None):
chs = CovidHospSomething()):
"""Create a new Database object.

Parameters
Expand All @@ -39,15 +36,17 @@ def __init__(self,
"""

self.connection = connection
self.table_name = table_name
self.hhs_dataset_id = hhs_dataset_id
self.publication_col_name = "issue" if table_name == 'covid_hosp_state_timeseries' or table_name == "covid_hosp_state_daily" else \
'publication_date'
self.columns_and_types = {
c.csv_name: c
for c in (columns_and_types if columns_and_types is not None else [])
}
self.key_columns = key_columns if key_columns is not None else []

if self.DATASET_NAME is None:
raise NameError('no dataset given!') # Must be defined by subclasses

self.table_name = chs.get_ds_table_name(self.DATASET_NAME)
self.hhs_dataset_id = chs.get_ds_dataset_id(self.DATASET_NAME)
self.metadata_id = chs.get_ds_metadata_id(self.DATASET_NAME)
self.publication_col_name = chs.get_ds_issue_column(self.DATASET_NAME)
self.columns_and_types = {c.csv_name: c for c in chs.get_ds_ordered_csv_cols(self.DATASET_NAME)}
self.key_columns = chs.get_ds_key_cols(self.DATASET_NAME)
self.aggregate_key_columns = chs.get_ds_aggregate_key_cols(self.DATASET_NAME)

@classmethod
def logger(database_class):
Expand Down Expand Up @@ -212,10 +211,10 @@ def nan_safe_dtype(dtype, value):
cursor.executemany(sql, many_values)

# deal with non/seldomly updated columns used like a fk table (if this database needs it)
if hasattr(self, 'AGGREGATE_KEY_COLS'):
if len(self.aggregate_key_columns) > 0:
if logger:
logger.info('updating keys')
ak_cols = self.AGGREGATE_KEY_COLS
ak_cols = self.aggregate_key_columns

# restrict data to just the key columns and remove duplicate rows
# sort by key columns to ensure that the last ON DUPLICATE KEY overwrite
Expand Down
2 changes: 1 addition & 1 deletion src/acquisition/covid_hosp/common/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ class Network:
METADATA_URL_TEMPLATE = \
'https://healthdata.gov/api/views/%s/rows.csv'

def fetch_metadata_for_dataset(dataset_id, logger=False):
def fetch_metadata(dataset_id, logger=False):
"""Download and return metadata.

Parameters
Expand Down
13 changes: 4 additions & 9 deletions src/acquisition/covid_hosp/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import pandas as pd

from delphi.epidata.acquisition.covid_hosp.common.network import Network

class CovidHospException(Exception):
"""Exception raised exclusively by `covid_hosp` utilities."""
Expand All @@ -17,12 +18,6 @@ class Utils:
# example revision: "Mon, 11/16/2020 - 00:55"
REVISION_PATTERN = re.compile(r'^.*\s(\d+)/(\d+)/(\d+)\s.*$')

def launch_if_main(entrypoint, runtime_name):
"""Call the given function in the main entry point, otherwise no-op."""

if runtime_name == '__main__':
entrypoint()

def int_from_date(date):
"""Convert a YYYY/MM/DD date from a string to a YYYYMMDD int.

Expand Down Expand Up @@ -168,7 +163,7 @@ def merge_by_key_cols(dfs, key_cols, logger=False):
return result.reset_index(level=key_cols)

@staticmethod
def update_dataset(database, network, newer_than=None, older_than=None):
def update_dataset(database, network=Network, newer_than=None, older_than=None):
"""Acquire the most recent dataset, unless it was previously acquired.

Parameters
Expand All @@ -189,9 +184,9 @@ def update_dataset(database, network, newer_than=None, older_than=None):
"""
logger = database.logger()

metadata = network.fetch_metadata(logger=logger)
datasets = []
with database.connect() as db:
metadata = network.fetch_metadata(db.metadata_id, logger=logger)
max_issue = db.get_max_issue(logger=logger)

older_than = datetime.datetime.today().date() if newer_than is None else older_than
Expand All @@ -204,7 +199,7 @@ def update_dataset(database, network, newer_than=None, older_than=None):
issue_int = int(issue.strftime("%Y%m%d"))
# download the dataset and add it to the database
dataset = Utils.merge_by_key_cols([network.fetch_dataset(url, logger=logger) for url, _ in revisions],
db.KEY_COLS,
db.key_columns,
logger=logger)
# add metadata to the database
all_metadata = []
Expand Down
17 changes: 1 addition & 16 deletions src/acquisition/covid_hosp/facility/database.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,7 @@
# first party
from delphi.epidata.acquisition.covid_hosp.common.database import Database as BaseDatabase
from delphi.epidata.acquisition.covid_hosp.facility.network import Network
from delphi.epidata.common.covid_hosp.covid_hosp_schema_io import CovidHospSomething


class Database(BaseDatabase):

chs = CovidHospSomething()
TABLE_NAME = chs.get_ds_table_name('covid_hosp_facility')
KEY_COLS = chs.get_ds_key_cols('covid_hosp_facility')
AGGREGATE_KEY_COLS = chs.get_ds_aggregate_key_cols('covid_hosp_facility')
ORDERED_CSV_COLUMNS = chs.get_ds_ordered_csv_cols('covid_hosp_facility')

def __init__(self, *args, **kwargs):
super().__init__(
*args,
**kwargs,
table_name=Database.TABLE_NAME,
hhs_dataset_id=Network.DATASET_ID,
key_columns=Database.KEY_COLS,
columns_and_types=Database.ORDERED_CSV_COLUMNS)
DATASET_NAME = 'covid_hosp_facility'
Loading