diff --git a/integrations/acquisition/covid_hosp/facility/test_scenarios.py b/integrations/acquisition/covid_hosp/facility/test_scenarios.py index aaa3c5e3b..849c713f5 100644 --- a/integrations/acquisition/covid_hosp/facility/test_scenarios.py +++ b/integrations/acquisition/covid_hosp/facility/test_scenarios.py @@ -2,14 +2,14 @@ # standard library import unittest -from unittest.mock import MagicMock +from unittest.mock import patch # first party -from delphi.epidata.acquisition.covid_hosp.common.database import Database -from delphi.epidata.acquisition.covid_hosp.common.test_utils import UnitTestUtils +from delphi.epidata.acquisition.covid_hosp.facility.database import Database +from delphi.epidata.acquisition.covid_hosp.common.network import Network +from delphi.epidata.acquisition.covid_hosp.common.test_utils import CovidHospTestCase, UnitTestUtils from delphi.epidata.client.delphi_epidata import Epidata -from delphi.epidata.acquisition.covid_hosp.facility.update import Update -import delphi.operations.secrets as secrets +from delphi.epidata.common.covid_hosp.covid_hosp_schema_io import CovidHospSomething # third party from freezegun import freeze_time @@ -19,39 +19,15 @@ NEWLINE="\n" -class AcquisitionTests(unittest.TestCase): +class AcquisitionTests(CovidHospTestCase): - def setUp(self): - """Perform per-test setup.""" - - # configure test data - self.test_utils = UnitTestUtils(__file__) - - # use the local instance of the Epidata API - Epidata.BASE_URL = 'http://delphi_web_epidata/epidata/api.php' - - # use the local instance of the epidata database - secrets.db.host = 'delphi_database_epidata' - secrets.db.epi = ('user', 'pass') - - # clear relevant tables - with Database.connect() as db: - with db.new_cursor() as cur: - cur.execute('truncate table covid_hosp_facility') - cur.execute('truncate table covid_hosp_facility_key') - cur.execute('truncate table covid_hosp_meta') + db_class = Database + test_util_context = __file__ @freeze_time("2021-03-16") def test_acquire_dataset(self): """Acquire a new dataset.""" - # only mock out network calls to external hosts - mock_network = MagicMock() - mock_network.fetch_metadata.return_value = \ - self.test_utils.load_sample_metadata() - mock_network.fetch_dataset.return_value = \ - self.test_utils.load_sample_dataset() - # make sure the data does not yet exist with self.subTest(name='no data yet'): response = Epidata.covid_hosp_facility( @@ -59,8 +35,10 @@ def test_acquire_dataset(self): self.assertEqual(response['result'], -2, response) # acquire sample data into local database - with self.subTest(name='first acquisition'): - acquired = Update.run(network=mock_network) + with self.subTest(name='first acquisition'), \ + patch.object(Network, 'fetch_metadata', return_value=self.test_utils.load_sample_metadata()), \ + patch.object(Network, 'fetch_dataset', return_value=self.test_utils.load_sample_dataset()): + acquired = Database().update_dataset() self.assertTrue(acquired) # make sure the data now exists @@ -89,12 +67,14 @@ def test_acquire_dataset(self): else: self.assertEqual(row[k], v, f"row[{k}] is {row[k]} not {v}") - # expect 113 fields per row (114 database columns, except `id`) - self.assertEqual(len(row), 113) + # Expect len(row) to equal the amount of dynamic columns + one extra issue column + self.assertEqual(len(row), len(list(CovidHospSomething().columns('covid_hosp_facility'))) + 1) # re-acquisition of the same dataset should be a no-op - with self.subTest(name='second acquisition'): - acquired = Update.run(network=mock_network) + with self.subTest(name='second acquisition'), \ + patch.object(Network, 'fetch_metadata', return_value=self.test_utils.load_sample_metadata()), \ + patch.object(Network, 'fetch_dataset', return_value=self.test_utils.load_sample_dataset()): + acquired = Database().update_dataset() self.assertFalse(acquired) # make sure the data still exists @@ -108,16 +88,11 @@ def test_acquire_dataset(self): def test_facility_lookup(self): """Lookup facilities using various filters.""" - # only mock out network calls to external hosts - mock_network = MagicMock() - mock_network.fetch_metadata.return_value = \ - self.test_utils.load_sample_metadata() - mock_network.fetch_dataset.return_value = \ - self.test_utils.load_sample_dataset() - # acquire sample data into local database - with self.subTest(name='first acquisition'): - acquired = Update.run(network=mock_network) + with self.subTest(name='first acquisition'), \ + patch.object(Network, 'fetch_metadata', return_value=self.test_utils.load_sample_metadata()), \ + patch.object(Network, 'fetch_dataset', return_value=self.test_utils.load_sample_dataset()): + acquired = Database().update_dataset() self.assertTrue(acquired) # texas ground truth, sorted by `hospital_pk` @@ -181,16 +156,11 @@ def test_facility_lookup(self): response = Epidata.covid_hosp_facility_lookup(state='not a state') self.assertEqual(response['result'], -2) - # update facility info - mock_network = MagicMock() - mock_network.fetch_metadata.return_value = \ - self.test_utils.load_sample_metadata('metadata_update_facility.csv') - mock_network.fetch_dataset.return_value = \ - self.test_utils.load_sample_dataset('dataset_update_facility.csv') - - # acquire sample data into local database - with self.subTest(name='second acquisition'): - acquired = Update.run(network=mock_network) + # acquire sample data into local database with updated facility info + with self.subTest(name='second acquisition'), \ + patch.object(Network, 'fetch_metadata', return_value=self.test_utils.load_sample_metadata('metadata_update_facility.csv')), \ + patch.object(Network, 'fetch_dataset', return_value=self.test_utils.load_sample_dataset('dataset_update_facility.csv')): + acquired = Database().update_dataset() self.assertTrue(acquired) texas_hospitals[1]['zip'] = '88888' diff --git a/integrations/acquisition/covid_hosp/state_daily/test_scenarios.py b/integrations/acquisition/covid_hosp/state_daily/test_scenarios.py index fd9b6c582..741cbb969 100644 --- a/integrations/acquisition/covid_hosp/state_daily/test_scenarios.py +++ b/integrations/acquisition/covid_hosp/state_daily/test_scenarios.py @@ -11,39 +11,20 @@ # first party from delphi.epidata.acquisition.covid_hosp.state_daily.database import Database -from delphi.epidata.acquisition.covid_hosp.common.test_utils import UnitTestUtils +from delphi.epidata.acquisition.covid_hosp.common.test_utils import CovidHospTestCase, UnitTestUtils from delphi.epidata.client.delphi_epidata import Epidata -from delphi.epidata.acquisition.covid_hosp.state_daily.update import Update -from delphi.epidata.acquisition.covid_hosp.state_daily.network import Network -from delphi.epidata.acquisition.covid_hosp.common.utils import Utils -import delphi.operations.secrets as secrets +from delphi.epidata.acquisition.covid_hosp.common.network import Network +from delphi.epidata.common.covid_hosp.covid_hosp_schema_io import CovidHospSomething # py3tester coverage target (equivalent to `import *`) __test_target__ = \ 'delphi.epidata.acquisition.covid_hosp.state_daily.update' -class AcquisitionTests(unittest.TestCase): +class AcquisitionTests(CovidHospTestCase): - def setUp(self): - """Perform per-test setup.""" - - # configure test data - self.test_utils = UnitTestUtils(__file__) - - # use the local instance of the Epidata API - Epidata.BASE_URL = 'http://delphi_web_epidata/epidata/api.php' - - # use the local instance of the epidata database - secrets.db.host = 'delphi_database_epidata' - secrets.db.epi = ('user', 'pass') - - # clear relevant tables - with Database.connect() as db: - with db.new_cursor() as cur: - cur.execute('truncate table covid_hosp_state_daily') - cur.execute('truncate table covid_hosp_state_timeseries') - cur.execute('truncate table covid_hosp_meta') + db_class = Database + test_util_context = __file__ @freeze_time("2021-03-16") def test_acquire_dataset(self): @@ -61,8 +42,8 @@ def test_acquire_dataset(self): patch.object(Network, 'fetch_dataset', side_effect=[self.test_utils.load_sample_dataset("dataset0.csv"), # dataset for 3/13 self.test_utils.load_sample_dataset("dataset0.csv"), # first dataset for 3/15 self.test_utils.load_sample_dataset()] # second dataset for 3/15 - ) as mock_fetch: - acquired = Update.run() + ): + acquired = Database().update_dataset() self.assertTrue(acquired) self.assertEqual(mock_fetch_meta.call_count, 1) @@ -82,8 +63,8 @@ def test_acquire_dataset(self): self.assertAlmostEqual(actual, expected) self.assertIsNone(row['critical_staffing_shortage_today_no']) - # expect 61 fields per row (62 database columns, except `id`) # TODO: ??? this is wrong! - self.assertEqual(len(row), 118) + # Expect len(row) to equal the amount of dynamic columns + one extra issue column + self.assertEqual(len(row), len(list(CovidHospSomething().columns('state_daily'))) + 1) with self.subTest(name='all date batches acquired'): response = Epidata.covid_hosp('WY', Epidata.range(20200101, 20210101), issues=20210313) @@ -91,9 +72,9 @@ def test_acquire_dataset(self): # re-acquisition of the same dataset should be a no-op with self.subTest(name='second acquisition'), \ - patch.object(Network, 'fetch_metadata', return_value=self.test_utils.load_sample_metadata()) as mock_fetch_meta, \ - patch.object(Network, 'fetch_dataset', return_value=self.test_utils.load_sample_dataset()) as mock_fetch: - acquired = Update.run() + patch.object(Network, 'fetch_metadata', return_value=self.test_utils.load_sample_metadata()), \ + patch.object(Network, 'fetch_dataset', return_value=self.test_utils.load_sample_dataset()): + acquired = Database().update_dataset() self.assertFalse(acquired) # make sure the data still exists @@ -114,18 +95,14 @@ def test_acquire_specific_issue(self): # acquire sample data into local database # mock out network calls to external hosts - with Database.connect() as db: + with Database().connect() as db: pre_max_issue = db.get_max_issue() self.assertEqual(pre_max_issue, pd.Timestamp('1900-01-01 00:00:00')) with self.subTest(name='first acquisition'), \ - patch.object(Network, 'fetch_metadata', return_value=self.test_utils.load_sample_metadata()) as mock_fetch_meta, \ - patch.object(Network, 'fetch_dataset', side_effect=[self.test_utils.load_sample_dataset("dataset0.csv")] - ) as mock_fetch: - acquired = Utils.update_dataset(Database, - Network, - date(2021, 3, 12), - date(2021, 3, 14)) - with Database.connect() as db: + patch.object(Network, 'fetch_metadata', return_value=self.test_utils.load_sample_metadata()), \ + patch.object(Network, 'fetch_dataset', side_effect=[self.test_utils.load_sample_dataset("dataset0.csv")]): + acquired = Database().update_dataset(date(2021, 3, 12), date(2021, 3, 14)) + with Database().connect() as db: post_max_issue = db.get_max_issue() self.assertEqual(post_max_issue, pd.Timestamp('2021-03-13 00:00:00')) self.assertTrue(acquired) diff --git a/integrations/acquisition/covid_hosp/state_timeseries/test_scenarios.py b/integrations/acquisition/covid_hosp/state_timeseries/test_scenarios.py index b36831716..d68db6988 100644 --- a/integrations/acquisition/covid_hosp/state_timeseries/test_scenarios.py +++ b/integrations/acquisition/covid_hosp/state_timeseries/test_scenarios.py @@ -3,13 +3,14 @@ # standard library import unittest from unittest.mock import MagicMock +from unittest.mock import patch # first party -from delphi.epidata.acquisition.covid_hosp.common.database import Database -from delphi.epidata.acquisition.covid_hosp.common.test_utils import UnitTestUtils +from delphi.epidata.acquisition.covid_hosp.state_timeseries.database import Database +from delphi.epidata.acquisition.covid_hosp.common.network import Network +from delphi.epidata.acquisition.covid_hosp.common.test_utils import CovidHospTestCase, UnitTestUtils from delphi.epidata.client.delphi_epidata import Epidata -from delphi.epidata.acquisition.covid_hosp.state_timeseries.update import Update -import delphi.operations.secrets as secrets +from delphi.epidata.common.covid_hosp.covid_hosp_schema_io import CovidHospSomething # third party from freezegun import freeze_time @@ -19,47 +20,28 @@ 'delphi.epidata.acquisition.covid_hosp.state_timeseries.update' -class AcquisitionTests(unittest.TestCase): +class AcquisitionTests(CovidHospTestCase): - def setUp(self): - """Perform per-test setup.""" - - # configure test data - self.test_utils = UnitTestUtils(__file__) - - # use the local instance of the Epidata API - Epidata.BASE_URL = 'http://delphi_web_epidata/epidata/api.php' - - # use the local instance of the epidata database - secrets.db.host = 'delphi_database_epidata' - secrets.db.epi = ('user', 'pass') - - # clear relevant tables - with Database.connect() as db: - with db.new_cursor() as cur: - cur.execute('truncate table covid_hosp_state_daily') - cur.execute('truncate table covid_hosp_state_timeseries') - cur.execute('truncate table covid_hosp_meta') + db_class = Database + test_util_context = __file__ + # TODO: no need for the following after covid_hosp table split is merged + # (in https://github.com/cmu-delphi/delphi-epidata/pull/1126) + extra_tables_used = ['covid_hosp_state_daily'] @freeze_time("2021-03-17") def test_acquire_dataset(self): """Acquire a new dataset.""" - # only mock out network calls to external hosts - mock_network = MagicMock() - mock_network.fetch_metadata.return_value = \ - self.test_utils.load_sample_metadata() - mock_network.fetch_dataset.return_value = \ - self.test_utils.load_sample_dataset() - # make sure the data does not yet exist with self.subTest(name='no data yet'): response = Epidata.covid_hosp('MA', Epidata.range(20200101, 20210101)) self.assertEqual(response['result'], -2) # acquire sample data into local database - with self.subTest(name='first acquisition'): - acquired = Update.run(network=mock_network) + with self.subTest(name='first acquisition'), \ + patch.object(Network, 'fetch_metadata', return_value=self.test_utils.load_sample_metadata()), \ + patch.object(Network, 'fetch_dataset', return_value=self.test_utils.load_sample_dataset()): + acquired = Database().update_dataset() self.assertTrue(acquired) # make sure the data now exists @@ -78,12 +60,14 @@ def test_acquire_dataset(self): self.assertAlmostEqual(actual, expected) self.assertIsNone(row['critical_staffing_shortage_today_no']) - # expect 61 fields per row (62 database columns, except `id`) # TODO: ??? this is wrong! - self.assertEqual(len(row), 118) + # Expect len(row) to equal the amount of dynamic columns + one extra issue column + self.assertEqual(len(row), len(list(CovidHospSomething().columns('state_timeseries'))) + 1) # re-acquisition of the same dataset should be a no-op - with self.subTest(name='second acquisition'): - acquired = Update.run(network=mock_network) + with self.subTest(name='second acquisition'), \ + patch.object(Network, 'fetch_metadata', return_value=self.test_utils.load_sample_metadata()), \ + patch.object(Network, 'fetch_dataset', return_value=self.test_utils.load_sample_dataset()): + acquired = Database().update_dataset() self.assertFalse(acquired) # make sure the data still exists @@ -93,13 +77,11 @@ def test_acquire_dataset(self): self.assertEqual(len(response['epidata']), 1) # acquire new data into local database - with self.subTest(name='first acquisition'): + with self.subTest(name='updated acquisition'), \ + patch.object(Network, 'fetch_metadata', return_value=self.test_utils.load_sample_metadata("metadata2.csv")), \ + patch.object(Network, 'fetch_dataset', return_value=self.test_utils.load_sample_dataset("dataset2.csv")): # acquire new data with 3/16 issue date - mock_network.fetch_metadata.return_value = \ - self.test_utils.load_sample_metadata("metadata2.csv") - mock_network.fetch_dataset.return_value = \ - self.test_utils.load_sample_dataset("dataset2.csv") - acquired = Update.run(network=mock_network) + acquired = Database().update_dataset() self.assertTrue(acquired) with self.subTest(name='as_of checks'): diff --git a/integrations/server/test_covid_hosp.py b/integrations/server/test_covid_hosp.py index 7f53d6174..63c0de079 100644 --- a/integrations/server/test_covid_hosp.py +++ b/integrations/server/test_covid_hosp.py @@ -4,6 +4,8 @@ import unittest # first party +from delphi.epidata.common.covid_hosp.covid_hosp_schema_io import CovidHospSomething +# use state_timeseries DB to get access to a SQL cursor from delphi.epidata.acquisition.covid_hosp.state_timeseries.database import Database from delphi.epidata.client.delphi_epidata import Epidata import delphi.operations.secrets as secrets @@ -23,7 +25,7 @@ def setUp(self): secrets.db.epi = ('user', 'pass') # clear relevant tables - with Database.connect() as db: + with Database().connect() as db: with db.new_cursor() as cur: cur.execute('truncate table covid_hosp_state_daily') cur.execute('truncate table covid_hosp_state_timeseries') @@ -31,13 +33,17 @@ def setUp(self): def insert_timeseries(self, cur, issue, value): - so_many_nulls = ', '.join(['null'] * 114) + # number of dynamic columns + one extra issue column - four columns already in the query + col_count = len(list(CovidHospSomething().columns('state_timeseries'))) - 3 + so_many_nulls = ', '.join(['null'] * col_count) cur.execute(f'''insert into covid_hosp_state_timeseries values ( 0, {issue}, 'PA', 20201118, {value}, {so_many_nulls} )''') def insert_daily(self, cur, issue, value): - so_many_nulls = ', '.join(['null'] * 114) + # number of dynamic columns + one extra issue column - four columns already in the query + col_count = len(list(CovidHospSomething().columns('state_daily'))) - 3 + so_many_nulls = ', '.join(['null'] * col_count) cur.execute(f'''insert into covid_hosp_state_daily values ( 0, {issue}, 'PA', 20201118, {value}, {so_many_nulls} )''') @@ -45,7 +51,7 @@ def insert_daily(self, cur, issue, value): def test_query_by_issue(self): """Query with and without specifying an issue.""" - with Database.connect() as db: + with Database().connect() as db: with db.new_cursor() as cur: # inserting out of order to test server-side order by # also inserting two for 20201201 to test tiebreaker. @@ -91,7 +97,7 @@ def test_query_by_issue(self): def test_query_by_as_of(self): - with Database.connect() as db: + with Database().connect() as db: with db.new_cursor() as cur: self.insert_timeseries(cur, 20201101, 0) self.insert_daily(cur, 20201102, 1) diff --git a/src/acquisition/covid_hosp/common/database.py b/src/acquisition/covid_hosp/common/database.py index 173ae4a7a..637e64971 100644 --- a/src/acquisition/covid_hosp/common/database.py +++ b/src/acquisition/covid_hosp/common/database.py @@ -1,8 +1,8 @@ """Common database code used by multiple `covid_hosp` scrapers.""" # standard library -from collections import namedtuple from contextlib import contextmanager +import datetime import math # third party @@ -11,51 +11,39 @@ # first party import delphi.operations.secrets as secrets +from delphi.epidata.acquisition.covid_hosp.common.network import Network from delphi.epidata.common.logger import get_structured_logger - -Columndef = namedtuple("Columndef", "csv_name sql_name dtype") +from delphi.epidata.common.covid_hosp.covid_hosp_schema_io import CovidHospSomething class Database: - def __init__(self, - connection, - table_name=None, - hhs_dataset_id=None, - columns_and_types=None, - key_columns=None): - """Create a new Database object. + DATASET_NAME = None - Parameters - ---------- - connection - An open connection to a database. - table_name : str - The name of the table which holds the dataset. - hhs_dataset_id : str - The 9-character healthdata.gov identifier for this dataset. - columns_and_types : tuple[str, str, Callable] - List of 3-tuples of (CSV header name, SQL column name, data type) for - all the columns in the CSV file. + def __init__(self): + """ + Create a new Database object. """ + self.logger = get_structured_logger(f"{self.__module__}") + # Make sure connection is reset - set this in connect() + self.connection = None + + if self.DATASET_NAME is None: + raise NameError('no dataset given!') # Must be defined by subclasses + + chs = CovidHospSomething() + # The name of the table which holds the dataset. + self.table_name = chs.get_ds_table_name(self.DATASET_NAME) + # The 9-character healthdata.gov identifier for this dataset. + self.hhs_dataset_id = chs.get_ds_dataset_id(self.DATASET_NAME) + self.metadata_id = chs.get_ds_metadata_id(self.DATASET_NAME) + self.issue_column = chs.get_ds_issue_column(self.DATASET_NAME) + # List of 3-tuples of (CSV header name, SQL column name, data type) for all the columns in the CSV file. + self.columns_and_types = {c.csv_name: c for c in chs.get_ds_ordered_csv_cols(self.DATASET_NAME)} + self.key_columns = chs.get_ds_key_cols(self.DATASET_NAME) + self.aggregate_key_columns = chs.get_ds_aggregate_key_cols(self.DATASET_NAME) - self.connection = connection - self.table_name = table_name - self.hhs_dataset_id = hhs_dataset_id - self.publication_col_name = "issue" if table_name == 'covid_hosp_state_timeseries' or table_name == "covid_hosp_state_daily" else \ - 'publication_date' - self.columns_and_types = { - c.csv_name: c - for c in (columns_and_types if columns_and_types is not None else []) - } - self.key_columns = key_columns if key_columns is not None else [] - - @classmethod - def logger(database_class): - return get_structured_logger(f"{database_class.__module__}") - - @classmethod @contextmanager - def connect(database_class, mysql_connector_impl=mysql.connector): + def connect(self, mysql_connector_impl=mysql.connector): """Connect to a database and provide the connection as a context manager. As long as the context manager exits normally, the connection's transaction @@ -68,7 +56,7 @@ def connect(database_class, mysql_connector_impl=mysql.connector): # connect to the database user, password = secrets.db.epi - connection = mysql_connector_impl.connect( + self.connection = mysql_connector_impl.connect( host=secrets.db.host, user=user, password=password, @@ -76,14 +64,15 @@ def connect(database_class, mysql_connector_impl=mysql.connector): try: # provide the connection to the context manager - yield database_class(connection) + yield self # rollback by default; the following commit will only take place if no # exception was raised in calling code - connection.commit() + self.connection.commit() finally: - # close the connection in any case - connection.close() + # close the connection in any case and make sure self.connection is reset + self.connection.close() + self.connection = None @contextmanager def new_cursor(self): @@ -124,7 +113,58 @@ def contains_revision(self, revision): for (result,) in cursor: return bool(result) - def insert_metadata(self, publication_date, revision, meta_json, logger=False): + # TODO: this may need further changes once https://github.com/cmu-delphi/delphi-epidata/pull/1224 is merged + def update_dataset(self, newer_than=None, older_than=None): + """Acquire the most recent dataset, unless it was previously acquired. + + Parameters + ---------- + newer_than : date + Lower bound (exclusive) of days to get issues for. + older_than : date + Upper bound (exclusive) of days to get issues for + + Returns + ------- + bool + Whether a new dataset was acquired. + """ + logger = self.logger + with self.connect() as db: + max_issue = db.get_max_issue() + + older_than = datetime.datetime.today().date() if newer_than is None else older_than + newer_than = max_issue if newer_than is None else newer_than + metadata = Network.fetch_metadata(self.metadata_id, logger=logger) + daily_issues = self.issues_to_fetch(metadata, newer_than, older_than) + if not daily_issues: + logger.info("no new issues; nothing to do") + return False + datasets = [] + for issue, revisions in daily_issues.items(): + issue_int = int(issue.strftime("%Y%m%d")) + # download the dataset and add it to the database + dataset = self.merge_by_key_cols([Network.fetch_dataset(url, logger=logger) for url, _ in revisions]) + # add metadata to the database + all_metadata = [] + for url, index in revisions: + all_metadata.append((url, metadata.loc[index].reset_index().to_json())) + datasets.append(( + issue_int, + dataset, + all_metadata + )) + with self.connect() as db: + for issue_int, dataset, all_metadata in datasets: + db.insert_dataset(issue_int, dataset) + for url, metadata_json in all_metadata: + db.insert_metadata(issue_int, url, metadata_json) + logger.info("acquired rows", count=len(dataset)) + + # note that the transaction is committed by exiting the `with` block + return True + + def insert_metadata(self, publication_date, revision, meta_json): """Add revision metadata to the database. Parameters @@ -135,8 +175,6 @@ def insert_metadata(self, publication_date, revision, meta_json, logger=False): Unique revision string. meta_json : str Metadata serialized as a JSON string. - logger structlog.Logger [optional; default False] - Logger to receive messages """ with self.new_cursor() as cursor: @@ -154,7 +192,7 @@ def insert_metadata(self, publication_date, revision, meta_json, logger=False): (%s, %s, %s, %s, %s, NOW()) ''', (self.table_name, self.hhs_dataset_id, publication_date, revision, meta_json)) - def insert_dataset(self, publication_date, dataframe, logger=False): + def insert_dataset(self, publication_date, dataframe): """Add a dataset to the database. Parameters @@ -163,9 +201,8 @@ def insert_dataset(self, publication_date, dataframe, logger=False): Date when the dataset was published in YYYYMMDD format. dataframe : pandas.DataFrame The dataset. - logger structlog.Logger [optional; default False] - Logger to receive messages. """ + logger = self.logger dataframe_columns_and_types = [ x for x in self.columns_and_types.values() if x.csv_name in dataframe.columns ] @@ -182,11 +219,10 @@ def nan_safe_dtype(dtype, value): num_columns = 2 + len(dataframe_columns_and_types) value_placeholders = ', '.join(['%s'] * num_columns) columns = ', '.join(f'`{i.sql_name}`' for i in dataframe_columns_and_types) - sql = f'INSERT INTO `{self.table_name}` (`id`, `{self.publication_col_name}`, {columns}) ' \ + sql = f'INSERT INTO `{self.table_name}` (`id`, `{self.issue_column}`, {columns}) ' \ f'VALUES ({value_placeholders})' id_and_publication_date = (0, publication_date) - if logger: - logger.info('updating values', count=len(dataframe.index)) + logger.info('updating values', count=len(dataframe.index)) n = 0 many_values = [] with self.new_cursor() as cursor: @@ -204,18 +240,16 @@ def nan_safe_dtype(dtype, value): cursor.executemany(sql, many_values) many_values = [] except Exception as e: - if logger: - logger.error('error on insert', publ_date=publication_date, in_lines=(n-5_000, n), index=index, values=values, exception=e) + logger.error('error on insert', publ_date=publication_date, in_lines=(n-5_000, n), index=index, values=values, exception=e) raise e # insert final batch if many_values: cursor.executemany(sql, many_values) # deal with non/seldomly updated columns used like a fk table (if this database needs it) - if hasattr(self, 'AGGREGATE_KEY_COLS'): - if logger: - logger.info('updating keys') - ak_cols = self.AGGREGATE_KEY_COLS + if len(self.aggregate_key_columns) > 0: + logger.info('updating keys') + ak_cols = self.aggregate_key_columns # restrict data to just the key columns and remove duplicate rows # sort by key columns to ensure that the last ON DUPLICATE KEY overwrite @@ -241,15 +275,14 @@ def nan_safe_dtype(dtype, value): ak_table = self.table_name + '_key' # assemble full SQL statement ak_insert_sql = f'INSERT INTO `{ak_table}` ({ak_cols_str}) VALUES ({values_str}) AS v ON DUPLICATE KEY UPDATE {ak_updates_str}' - if logger: - logger.info("database query", sql=ak_insert_sql) + logger.info("database query", sql=ak_insert_sql) # commit the data with self.new_cursor() as cur: cur.executemany(ak_insert_sql, ak_data) - def get_max_issue(self, logger=False): + def get_max_issue(self): """Fetch the most recent issue. This is used to bookend what updates we pull in from the HHS metadata. @@ -266,6 +299,79 @@ def get_max_issue(self, logger=False): for (result,) in cursor: if result is not None: return pd.Timestamp(str(result)) - if logger: - logger.warn("get_max_issue", msg="no matching results in meta table; returning 1900/1/1 epoch") + self.logger.warn("get_max_issue", msg="no matching results in meta table; returning 1900/1/1 epoch") return pd.Timestamp("1900/1/1") + + def issues_to_fetch(self, metadata, newer_than, older_than): + """ + Construct all issue dates and URLs to be ingested based on metadata. + + Parameters + ---------- + metadata pd.DataFrame + HHS metadata indexed by issue date and with column "Archive Link" + newer_than Date + Lower bound (exclusive) of days to get issues for. + older_than Date + Upper bound (exclusive) of days to get issues for + logger structlog.Logger [optional; default False] + Logger to receive messages + Returns + ------- + Dictionary of {issue day: list of (download urls, index)} + for issues after newer_than and before older_than + """ + daily_issues = {} + n_beyond = 0 + n_selected = 0 + for index in sorted(set(metadata.index)): + day = index.date() + if day > newer_than and day < older_than: + urls = metadata.loc[index, "Archive Link"] + urls_list = [(urls, index)] if isinstance(urls, str) else [(url, index) for url in urls] + if day not in daily_issues: + daily_issues[day] = urls_list + else: + daily_issues[day] += urls_list + n_selected += len(urls_list) + elif day >= older_than: + n_beyond += 1 + if n_beyond > 0: + self.logger.info("issues available beyond selection", on_or_newer=older_than, count=n_beyond) + self.logger.info("issues selected", newer_than=str(newer_than), older_than=str(older_than), count=n_selected) + return daily_issues + + def merge_by_key_cols(self, dfs): + """Merge a list of data frames as a series of updates. + + Parameters: + ----------- + dfs : list(pd.DataFrame) + Data frames to merge, ordered from earliest to latest. + key_cols: list(str) + Columns to use as the index. + + Returns a single data frame containing the most recent data for each state+date. + """ + + dfs = [df.set_index(self.key_columns) for df in dfs + if not all(k in df.index.names for k in self.key_columns)] + result = dfs[0] + if len(dfs) > 7: + self.logger.warning( + "expensive operation", + msg="concatenating more than 7 files may result in long running times", + count=len(dfs)) + for df in dfs[1:]: + # update values for existing keys + result.update(df) + # add any new keys. + ## repeated concatenation in pandas is expensive, but (1) we don't expect + ## batch sizes to be terribly large (7 files max) and (2) this way we can + ## more easily capture the next iteration's updates to any new keys + result_index_set = set(result.index.to_list()) + new_rows = df.loc[[i for i in df.index.to_list() if i not in result_index_set]] + result = pd.concat([result, new_rows]) + + # convert the index rows back to columns + return result.reset_index(level=self.key_columns) diff --git a/src/acquisition/covid_hosp/common/network.py b/src/acquisition/covid_hosp/common/network.py index 7b6228f16..bb719ba64 100644 --- a/src/acquisition/covid_hosp/common/network.py +++ b/src/acquisition/covid_hosp/common/network.py @@ -6,7 +6,7 @@ class Network: METADATA_URL_TEMPLATE = \ 'https://healthdata.gov/api/views/%s/rows.csv' - def fetch_metadata_for_dataset(dataset_id, logger=False): + def fetch_metadata(dataset_id, logger=False): """Download and return metadata. Parameters diff --git a/src/acquisition/covid_hosp/common/test_utils.py b/src/acquisition/covid_hosp/common/test_utils.py index 2a737b383..cd942e106 100644 --- a/src/acquisition/covid_hosp/common/test_utils.py +++ b/src/acquisition/covid_hosp/common/test_utils.py @@ -9,11 +9,70 @@ """ # standard library +import unittest from pathlib import Path +from unittest.mock import patch # third party import pandas +from delphi.epidata.acquisition.covid_hosp.common.database import Database +from delphi.epidata.client.delphi_epidata import Epidata +from delphi.epidata.common.covid_hosp.covid_hosp_schema_io import CovidHospSomething +import delphi.operations.secrets as secrets + +class TestDatabase(Database): + DATASET_NAME = 'mock_dataset' + + @staticmethod + def create_mock_database(table_name=None, + dataset_id=None, + metadata_id=None, + issue_col=None, + csv_cols=[], + key_cols=[], + aggregate_cols=[]): + with patch.object(CovidHospSomething, 'get_ds_table_name', return_value=table_name), \ + patch.object(CovidHospSomething, 'get_ds_dataset_id', return_value=dataset_id), \ + patch.object(CovidHospSomething, 'get_ds_metadata_id', return_value=metadata_id), \ + patch.object(CovidHospSomething, 'get_ds_issue_column', return_value=issue_col), \ + patch.object(CovidHospSomething, 'get_ds_ordered_csv_cols', return_value=csv_cols), \ + patch.object(CovidHospSomething, 'get_ds_key_cols', return_value=key_cols), \ + patch.object(CovidHospSomething, 'get_ds_aggregate_key_cols', return_value=aggregate_cols): + return TestDatabase() + +class CovidHospTestCase(unittest.TestCase): + # assign these in subclasses: + db_class = None + test_util_context = None + + # optionally extend or recreate this in subclasses: + extra_tables_used = [] + + def setUp(self): + # use the local instance of the Epidata API + Epidata.BASE_URL = 'http://delphi_web_epidata/epidata/api.php' + + # use the local instance of the epidata database + secrets.db.host = 'delphi_database_epidata' + secrets.db.epi = ('user', 'pass') + + # configure test data + self.test_utils = UnitTestUtils(self.test_util_context) + + # create list of relevant tables + chs = CovidHospSomething() + ds_name = self.db_class.DATASET_NAME + tables_used = ['covid_hosp_meta', chs.get_ds_table_name(ds_name)] + if chs.get_ds_aggregate_key_cols(ds_name): + tables_used.append(chs.get_ds_table_name(ds_name)+'_key') + tables_used += self.extra_tables_used + + # clear all relevant tables + with self.db_class().connect() as db: + with db.new_cursor() as cur: + for table in tables_used: + cur.execute(f'truncate table {table}') class UnitTestUtils: diff --git a/src/acquisition/covid_hosp/common/utils.py b/src/acquisition/covid_hosp/common/utils.py deleted file mode 100644 index 5f718ad69..000000000 --- a/src/acquisition/covid_hosp/common/utils.py +++ /dev/null @@ -1,226 +0,0 @@ -"""Code shared among multiple `covid_hosp` scrapers.""" - -# standard library -import datetime -import re - -import pandas as pd - - -class CovidHospException(Exception): - """Exception raised exclusively by `covid_hosp` utilities.""" - - -class Utils: - - # regex to extract issue date from revision field - # example revision: "Mon, 11/16/2020 - 00:55" - REVISION_PATTERN = re.compile(r'^.*\s(\d+)/(\d+)/(\d+)\s.*$') - - def launch_if_main(entrypoint, runtime_name): - """Call the given function in the main entry point, otherwise no-op.""" - - if runtime_name == '__main__': - entrypoint() - - def int_from_date(date): - """Convert a YYYY/MM/DD date from a string to a YYYYMMDD int. - - Parameters - ---------- - date : str - Date in "YYYY/MM/DD.*" format. - - Returns - ------- - int - Date in YYYYMMDD format. - """ - if isinstance(date, str): - return int(date[:10].replace('/', '').replace('-', '')) - return date - - def parse_bool(value): - """Convert a string to a boolean. - - Parameters - ---------- - value : str - Boolean-like value, like "true" or "false". - - Returns - ------- - bool - If the string contains some version of "true" or "false". - None - If the string is None or empty. - - Raises - ------ - CovidHospException - If the string constains something other than a version of "true" or - "false". - """ - - if not value: - return None - if value.lower() == 'true': - return True - if value.lower() == 'false': - return False - raise CovidHospException(f'cannot convert "{value}" to bool') - - def limited_string_fn(length): - def limited_string(value): - value = str(value) - if len(value) > length: - raise CovidHospException(f"Value '{value}':{len(value)} longer than max {length}") - return value - return limited_string - - GEOCODE_LENGTH = 32 - GEOCODE_PATTERN = re.compile(r'POINT \((-?[0-9.]+) (-?[0-9.]+)\)') - def limited_geocode(value): - if len(value) < Utils.GEOCODE_LENGTH: - return value - # otherwise parse and set precision to 6 decimal places - m = Utils.GEOCODE_PATTERN.match(value) - if not m: - raise CovidHospException(f"Couldn't parse geocode '{value}'") - return f'POINT ({" ".join(f"{float(x):.6f}" for x in m.groups())})' - - def issues_to_fetch(metadata, newer_than, older_than, logger=False): - """ - Construct all issue dates and URLs to be ingested based on metadata. - - Parameters - ---------- - metadata pd.DataFrame - HHS metadata indexed by issue date and with column "Archive Link" - newer_than Date - Lower bound (exclusive) of days to get issues for. - older_than Date - Upper bound (exclusive) of days to get issues for - logger structlog.Logger [optional; default False] - Logger to receive messages - Returns - ------- - Dictionary of {issue day: list of (download urls, index)} - for issues after newer_than and before older_than - """ - daily_issues = {} - n_beyond = 0 - n_selected = 0 - for index in sorted(set(metadata.index)): - day = index.date() - if day > newer_than and day < older_than: - urls = metadata.loc[index, "Archive Link"] - urls_list = [(urls, index)] if isinstance(urls, str) else [(url, index) for url in urls] - if day not in daily_issues: - daily_issues[day] = urls_list - else: - daily_issues[day] += urls_list - n_selected += len(urls_list) - elif day >= older_than: - n_beyond += 1 - if logger: - if n_beyond > 0: - logger.info("issues available beyond selection", on_or_newer=older_than, count=n_beyond) - logger.info("issues selected", newer_than=str(newer_than), older_than=str(older_than), count=n_selected) - return daily_issues - - @staticmethod - def merge_by_key_cols(dfs, key_cols, logger=False): - """Merge a list of data frames as a series of updates. - - Parameters: - ----------- - dfs : list(pd.DataFrame) - Data frames to merge, ordered from earliest to latest. - key_cols: list(str) - Columns to use as the index. - logger structlog.Logger [optional; default False] - Logger to receive messages - - Returns a single data frame containing the most recent data for each state+date. - """ - - dfs = [df.set_index(key_cols) for df in dfs - if not all(k in df.index.names for k in key_cols)] - result = dfs[0] - if logger and len(dfs) > 7: - logger.warning( - "expensive operation", - msg="concatenating more than 7 files may result in long running times", - count=len(dfs)) - for df in dfs[1:]: - # update values for existing keys - result.update(df) - # add any new keys. - ## repeated concatenation in pandas is expensive, but (1) we don't expect - ## batch sizes to be terribly large (7 files max) and (2) this way we can - ## more easily capture the next iteration's updates to any new keys - result_index_set = set(result.index.to_list()) - new_rows = df.loc[[i for i in df.index.to_list() if i not in result_index_set]] - result = pd.concat([result, new_rows]) - - # convert the index rows back to columns - return result.reset_index(level=key_cols) - - @staticmethod - def update_dataset(database, network, newer_than=None, older_than=None): - """Acquire the most recent dataset, unless it was previously acquired. - - Parameters - ---------- - database : delphi.epidata.acquisition.covid_hosp.common.database.Database - A `Database` subclass for a particular dataset. - network : delphi.epidata.acquisition.covid_hosp.common.network.Network - A `Network` subclass for a particular dataset. - newer_than : date - Lower bound (exclusive) of days to get issues for. - older_than : date - Upper bound (exclusive) of days to get issues for - - Returns - ------- - bool - Whether a new dataset was acquired. - """ - logger = database.logger() - - metadata = network.fetch_metadata(logger=logger) - datasets = [] - with database.connect() as db: - max_issue = db.get_max_issue(logger=logger) - - older_than = datetime.datetime.today().date() if newer_than is None else older_than - newer_than = max_issue if newer_than is None else newer_than - daily_issues = Utils.issues_to_fetch(metadata, newer_than, older_than, logger=logger) - if not daily_issues: - logger.info("no new issues; nothing to do") - return False - for issue, revisions in daily_issues.items(): - issue_int = int(issue.strftime("%Y%m%d")) - # download the dataset and add it to the database - dataset = Utils.merge_by_key_cols([network.fetch_dataset(url, logger=logger) for url, _ in revisions], - db.KEY_COLS, - logger=logger) - # add metadata to the database - all_metadata = [] - for url, index in revisions: - all_metadata.append((url, metadata.loc[index].reset_index().to_json())) - datasets.append(( - issue_int, - dataset, - all_metadata - )) - with database.connect() as db: - for issue_int, dataset, all_metadata in datasets: - db.insert_dataset(issue_int, dataset, logger=logger) - for url, metadata_json in all_metadata: - db.insert_metadata(issue_int, url, metadata_json, logger=logger) - logger.info("acquired rows", count=len(dataset)) - - # note that the transaction is committed by exiting the `with` block - return True diff --git a/src/acquisition/covid_hosp/facility/database.py b/src/acquisition/covid_hosp/facility/database.py index 0ca583642..5efca7e3b 100644 --- a/src/acquisition/covid_hosp/facility/database.py +++ b/src/acquisition/covid_hosp/facility/database.py @@ -1,22 +1,7 @@ # first party from delphi.epidata.acquisition.covid_hosp.common.database import Database as BaseDatabase -from delphi.epidata.acquisition.covid_hosp.facility.network import Network -from delphi.epidata.common.covid_hosp.covid_hosp_schema_io import CovidHospSomething class Database(BaseDatabase): - chs = CovidHospSomething() - TABLE_NAME = chs.get_ds_table_name('covid_hosp_facility') - KEY_COLS = chs.get_ds_key_cols('covid_hosp_facility') - AGGREGATE_KEY_COLS = chs.get_ds_aggregate_key_cols('covid_hosp_facility') - ORDERED_CSV_COLUMNS = chs.get_ds_ordered_csv_cols('covid_hosp_facility') - - def __init__(self, *args, **kwargs): - super().__init__( - *args, - **kwargs, - table_name=Database.TABLE_NAME, - hhs_dataset_id=Network.DATASET_ID, - key_columns=Database.KEY_COLS, - columns_and_types=Database.ORDERED_CSV_COLUMNS) + DATASET_NAME = 'covid_hosp_facility' diff --git a/src/acquisition/covid_hosp/facility/network.py b/src/acquisition/covid_hosp/facility/network.py deleted file mode 100644 index 6a0092c7f..000000000 --- a/src/acquisition/covid_hosp/facility/network.py +++ /dev/null @@ -1,17 +0,0 @@ -# first party -from delphi.epidata.acquisition.covid_hosp.common.network import Network as BaseNetwork - - -class Network(BaseNetwork): - - DATASET_ID = 'anag-cw7u' - METADATA_ID = 'j4ip-wfsv' - - def fetch_metadata(*args, **kwags): - """Download and return metadata. - - See `fetch_metadata_for_dataset`. - """ - - return Network.fetch_metadata_for_dataset( - *args, **kwags, dataset_id=Network.METADATA_ID) diff --git a/src/acquisition/covid_hosp/facility/update.py b/src/acquisition/covid_hosp/facility/update.py index b2b96c2e3..b178c3d29 100644 --- a/src/acquisition/covid_hosp/facility/update.py +++ b/src/acquisition/covid_hosp/facility/update.py @@ -3,26 +3,8 @@ Facility" dataset provided by the US Department of Health & Human Services via healthdata.gov. """ - -# first party -from delphi.epidata.acquisition.covid_hosp.common.utils import Utils from delphi.epidata.acquisition.covid_hosp.facility.database import Database -from delphi.epidata.acquisition.covid_hosp.facility.network import Network - - -class Update: - - def run(network=Network): - """Acquire the most recent dataset, unless it was previously acquired. - - Returns - ------- - bool - Whether a new dataset was acquired. - """ - - return Utils.update_dataset(Database, network) -# main entry point -Utils.launch_if_main(Update.run, __name__) +if __name__ == '__main__': + Database().update_dataset() diff --git a/src/acquisition/covid_hosp/state_daily/database.py b/src/acquisition/covid_hosp/state_daily/database.py index 7dd142b47..65bc1ff47 100644 --- a/src/acquisition/covid_hosp/state_daily/database.py +++ b/src/acquisition/covid_hosp/state_daily/database.py @@ -1,22 +1,7 @@ # first party from delphi.epidata.acquisition.covid_hosp.common.database import Database as BaseDatabase -from delphi.epidata.acquisition.covid_hosp.common.database import Columndef -from delphi.epidata.acquisition.covid_hosp.state_daily.network import Network -from delphi.epidata.common.covid_hosp.covid_hosp_schema_io import CovidHospSomething class Database(BaseDatabase): - chs = CovidHospSomething() - TABLE_NAME = chs.get_ds_table_name('state_daily') - KEY_COLS = chs.get_ds_key_cols('state_daily') - ORDERED_CSV_COLUMNS = chs.get_ds_ordered_csv_cols('state_daily') - - def __init__(self, *args, **kwargs): - super().__init__( - *args, - **kwargs, - table_name=Database.TABLE_NAME, - hhs_dataset_id=Network.DATASET_ID, - columns_and_types=Database.ORDERED_CSV_COLUMNS, - key_columns=Database.KEY_COLS) + DATASET_NAME = 'state_daily' diff --git a/src/acquisition/covid_hosp/state_daily/network.py b/src/acquisition/covid_hosp/state_daily/network.py deleted file mode 100644 index f4678cc9b..000000000 --- a/src/acquisition/covid_hosp/state_daily/network.py +++ /dev/null @@ -1,36 +0,0 @@ -# first party -from delphi.epidata.acquisition.covid_hosp.common.network import Network as BaseNetwork - -class Network(BaseNetwork): - - DATASET_ID = '6xf2-c3ie' - METADATA_ID = '4cnb-m4rz' - - @staticmethod - def fetch_metadata(*args, **kwags): - """Download and return metadata. - - See `fetch_metadata_for_dataset`. - """ - - return Network.fetch_metadata_for_dataset( - *args, **kwags, dataset_id=Network.METADATA_ID) - - @staticmethod - def fetch_revisions(metadata, newer_than): - """ - Extract all dataset URLs from metadata for issues after newer_than. - - Parameters - ---------- - metadata DataFrame - Metadata DF containing all rows of metadata from data source page. - - newer_than Timestamp or datetime - Date and time of issue to use as lower bound for new URLs. - - Returns - ------- - List of URLs of issues after newer_than - """ - return list(metadata.loc[metadata.index > newer_than, "Archive Link"]) diff --git a/src/acquisition/covid_hosp/state_daily/update.py b/src/acquisition/covid_hosp/state_daily/update.py index 12a51e6c3..b36f6ea98 100644 --- a/src/acquisition/covid_hosp/state_daily/update.py +++ b/src/acquisition/covid_hosp/state_daily/update.py @@ -3,26 +3,8 @@ dataset provided by the US Department of Health & Human Services via healthdata.gov. """ -# first party -from delphi.epidata.acquisition.covid_hosp.common.utils import Utils from delphi.epidata.acquisition.covid_hosp.state_daily.database import Database -from delphi.epidata.acquisition.covid_hosp.state_daily.network import Network -class Update: - - @staticmethod - def run(network=Network): - """Acquire the most recent dataset, unless it was previously acquired. - - Returns - ------- - bool - Whether a new dataset was acquired. - """ - - return Utils.update_dataset(Database, network) - - -# main entry point -Utils.launch_if_main(Update.run, __name__) +if __name__ == '__main__': + Database().update_dataset() diff --git a/src/acquisition/covid_hosp/state_timeseries/database.py b/src/acquisition/covid_hosp/state_timeseries/database.py index 58837565b..aff9cb2a2 100644 --- a/src/acquisition/covid_hosp/state_timeseries/database.py +++ b/src/acquisition/covid_hosp/state_timeseries/database.py @@ -1,22 +1,7 @@ # first party from delphi.epidata.acquisition.covid_hosp.common.database import Database as BaseDatabase -from delphi.epidata.acquisition.covid_hosp.common.database import Columndef -from delphi.epidata.acquisition.covid_hosp.state_timeseries.network import Network -from delphi.epidata.common.covid_hosp.covid_hosp_schema_io import CovidHospSomething class Database(BaseDatabase): - chs = CovidHospSomething() - TABLE_NAME = chs.get_ds_table_name('state_timeseries') - KEY_COLS = chs.get_ds_key_cols('state_timeseries') - ORDERED_CSV_COLUMNS = chs.get_ds_ordered_csv_cols('state_timeseries') - - def __init__(self, *args, **kwargs): - super().__init__( - *args, - **kwargs, - table_name=Database.TABLE_NAME, - hhs_dataset_id=Network.DATASET_ID, - columns_and_types=Database.ORDERED_CSV_COLUMNS, - key_columns=Database.KEY_COLS) + DATASET_NAME = 'state_timeseries' diff --git a/src/acquisition/covid_hosp/state_timeseries/network.py b/src/acquisition/covid_hosp/state_timeseries/network.py deleted file mode 100644 index 7bd5082a8..000000000 --- a/src/acquisition/covid_hosp/state_timeseries/network.py +++ /dev/null @@ -1,17 +0,0 @@ -# first party -from delphi.epidata.acquisition.covid_hosp.common.network import Network as BaseNetwork - - -class Network(BaseNetwork): - - DATASET_ID = 'g62h-syeh' - METADATA_ID = 'qqte-vkut' - - def fetch_metadata(*args, **kwags): - """Download and return metadata. - - See `fetch_metadata_for_dataset`. - """ - - return Network.fetch_metadata_for_dataset( - *args, **kwags, dataset_id=Network.METADATA_ID) diff --git a/src/acquisition/covid_hosp/state_timeseries/update.py b/src/acquisition/covid_hosp/state_timeseries/update.py index 7c8e79941..6b463d738 100644 --- a/src/acquisition/covid_hosp/state_timeseries/update.py +++ b/src/acquisition/covid_hosp/state_timeseries/update.py @@ -3,26 +3,8 @@ Timeseries" dataset provided by the US Department of Health & Human Services via healthdata.gov. """ - -# first party -from delphi.epidata.acquisition.covid_hosp.common.utils import Utils from delphi.epidata.acquisition.covid_hosp.state_timeseries.database import Database -from delphi.epidata.acquisition.covid_hosp.state_timeseries.network import Network - - -class Update: - - def run(network=Network): - """Acquire the most recent dataset, unless it was previously acquired. - - Returns - ------- - bool - Whether a new dataset was acquired. - """ - - return Utils.update_dataset(Database, network) -# main entry point -Utils.launch_if_main(Update.run, __name__) +if __name__ == '__main__': + Database().update_dataset() diff --git a/src/common/covid_hosp/covid_hosp_schema_io.py b/src/common/covid_hosp/covid_hosp_schema_io.py index 7f2ed0989..ce251afa8 100644 --- a/src/common/covid_hosp/covid_hosp_schema_io.py +++ b/src/common/covid_hosp/covid_hosp_schema_io.py @@ -1,10 +1,10 @@ +from collections import namedtuple from datetime import datetime from pathlib import Path import re import sys -from delphi.epidata.acquisition.covid_hosp.common.utils import Utils -from delphi.epidata.acquisition.covid_hosp.common.database import Columndef +from delphi.epidata.common.covid_hosp.utils import TypeUtils # ruamel preserves key ordering, comments, and some formatting for a "round trip" of a yaml file import-->export from ruamel.yaml.main import ( @@ -19,6 +19,8 @@ # print(yaml_dump(yaml_load('NULL: ~'))) # ==> "~: ~\n" +Columndef = namedtuple("Columndef", "csv_name sql_name dtype") + class CovidHospSomething: PYTHON_TYPE_MAPPING = { @@ -26,9 +28,9 @@ class CovidHospSomething: 'float': float, 'str': str, 'fixedstr': str, - 'bool': Utils.parse_bool, - 'intdate': Utils.int_from_date, - 'geocode': Utils.limited_geocode, + 'bool': TypeUtils.parse_bool, + 'intdate': TypeUtils.int_from_date, + 'geocode': TypeUtils.limited_geocode, } SQL_TYPE_MAPPING = { @@ -128,7 +130,19 @@ def get_ds_key_cols(self, ds_name): def get_ds_aggregate_key_cols(self, ds_name): - return self.dataset(ds_name).get('AGGREGATE_KEY_COLS', None) + return self.dataset(ds_name).get('AGGREGATE_KEY_COLS', []) + + + def get_ds_dataset_id(self, ds_name): + return self.dataset(ds_name)['DATASET_ID'] + + + def get_ds_metadata_id(self, ds_name): + return self.dataset(ds_name)['METADATA_ID'] + + + def get_ds_issue_column(self, ds_name): + return self.dataset(ds_name).get('ISSUE_COLUMN', 'issue') def get_ds_ordered_csv_cols(self, ds_name): diff --git a/src/common/covid_hosp/covid_hosp_schemadefs.yaml b/src/common/covid_hosp/covid_hosp_schemadefs.yaml index 42dae5516..ee9dbbe1c 100644 --- a/src/common/covid_hosp/covid_hosp_schemadefs.yaml +++ b/src/common/covid_hosp/covid_hosp_schemadefs.yaml @@ -4,6 +4,7 @@ covid_hosp_facility: DATASET_ID: anag-cw7u METADATA_ID: j4ip-wfsv KEY_COLS: [hospital_pk, collection_week] + ISSUE_COLUMN: publication_date UNIQUE_INDEXES: hospital_pk: [hospital_pk, collection_week, publication_date] INDEXES: @@ -133,6 +134,7 @@ state_timeseries: DATASET_ID: g62h-syeh METADATA_ID: qqte-vkut KEY_COLS: [state, date] + ISSUE_COLUMN: issue UNIQUE_INDEXES: issue_by_state_and_date: [state, date, issue] INDEXES: @@ -262,6 +264,7 @@ state_daily: DATASET_ID: 6xf2-c3ie METADATA_ID: 4cnb-m4rz KEY_COLS: [state, reporting_cutoff_start] + ISSUE_COLUMN: issue UNIQUE_INDEXES: issue_by_state_and_date: [state, date, issue] INDEXES: diff --git a/src/common/covid_hosp/utils.py b/src/common/covid_hosp/utils.py new file mode 100644 index 000000000..cb76a37f9 --- /dev/null +++ b/src/common/covid_hosp/utils.py @@ -0,0 +1,81 @@ +"""Code shared among multiple `covid_hosp` scrapers.""" + +# standard library +import datetime +import re + +import pandas as pd + +from delphi.epidata.acquisition.covid_hosp.common.network import Network + +class CovidHospException(Exception): + """Exception raised exclusively by `covid_hosp` utilities.""" + + +class TypeUtils: + + def int_from_date(date): + """Convert a YYYY/MM/DD date from a string to a YYYYMMDD int. + + Parameters + ---------- + date : str + Date in "YYYY/MM/DD.*" format. + + Returns + ------- + int + Date in YYYYMMDD format. + """ + if isinstance(date, str): + return int(date[:10].replace('/', '').replace('-', '')) + return date + + def parse_bool(value): + """Convert a string to a boolean. + + Parameters + ---------- + value : str + Boolean-like value, like "true" or "false". + + Returns + ------- + bool + If the string contains some version of "true" or "false". + None + If the string is None or empty. + + Raises + ------ + CovidHospException + If the string constains something other than a version of "true" or + "false". + """ + + if not value: + return None + if value.lower() == 'true': + return True + if value.lower() == 'false': + return False + raise CovidHospException(f'cannot convert "{value}" to bool') + + def limited_string_fn(length): + def limited_string(value): + value = str(value) + if len(value) > length: + raise CovidHospException(f"Value '{value}':{len(value)} longer than max {length}") + return value + return limited_string + + GEOCODE_LENGTH = 32 + GEOCODE_PATTERN = re.compile(r'POINT \((-?[0-9.]+) (-?[0-9.]+)\)') + def limited_geocode(value): + if len(value) < TypeUtils.GEOCODE_LENGTH: + return value + # otherwise parse and set precision to 6 decimal places + m = TypeUtils.GEOCODE_PATTERN.match(value) + if not m: + raise CovidHospException(f"Couldn't parse geocode '{value}'") + return f'POINT ({" ".join(f"{float(x):.6f}" for x in m.groups())})' diff --git a/tests/acquisition/covid_hosp/common/test_database.py b/tests/acquisition/covid_hosp/common/test_database.py index c070a00ae..1487f36e3 100644 --- a/tests/acquisition/covid_hosp/common/test_database.py +++ b/tests/acquisition/covid_hosp/common/test_database.py @@ -1,15 +1,19 @@ """Unit tests for database.py.""" # standard library +from datetime import date import math import unittest -from unittest.mock import MagicMock -from unittest.mock import sentinel +from unittest.mock import MagicMock, patch, sentinel # third party import pandas as pd +import mysql.connector -from delphi.epidata.acquisition.covid_hosp.common.database import Database, Columndef +# first party +from delphi.epidata.acquisition.covid_hosp.common.network import Network +from delphi.epidata.acquisition.covid_hosp.common.test_utils import TestDatabase, UnitTestUtils +from delphi.epidata.common.covid_hosp.covid_hosp_schema_io import Columndef # py3tester coverage target __test_target__ = 'delphi.epidata.acquisition.covid_hosp.common.database' @@ -17,12 +21,18 @@ class DatabaseTests(unittest.TestCase): + def setUp(self): + """Perform per-test setup.""" + + # configure test data + self.test_utils = UnitTestUtils(__file__) + def test_commit_and_close_on_success(self): """Commit and close the connection after success.""" mock_connector = MagicMock() - with Database.connect(mysql_connector_impl=mock_connector) as database: + with TestDatabase.create_mock_database().connect(mysql_connector_impl=mock_connector) as database: connection = database.connection mock_connector.connect.assert_called_once() @@ -35,7 +45,7 @@ def test_rollback_and_close_on_failure(self): mock_connector = MagicMock() try: - with Database.connect(mysql_connector_impl=mock_connector) as database: + with TestDatabase.create_mock_database().connect(mysql_connector_impl=mock_connector) as database: connection = database.connection raise Exception('intentional test of exception handling') except Exception: @@ -50,11 +60,13 @@ def test_new_cursor_cleanup(self): mock_connection = MagicMock() mock_cursor = mock_connection.cursor() - database = Database(mock_connection) + mock_database = TestDatabase.create_mock_database() try: - with database.new_cursor() as cursor: - raise Exception('intentional test of exception handling') + with patch.object(mysql.connector, 'connect', return_value=mock_connection), \ + mock_database.connect() as database: + with database.new_cursor() as cursor: + raise Exception('intentional test of exception handling') except Exception: pass @@ -68,12 +80,15 @@ def test_contains_revision(self): mock_connection = MagicMock() mock_cursor = mock_connection.cursor() - database = Database(mock_connection, table_name=sentinel.table_name, hhs_dataset_id=sentinel.hhs_dataset_id) + + mock_database = TestDatabase.create_mock_database(table_name = sentinel.table_name, dataset_id = sentinel.hhs_dataset_id) with self.subTest(name='new revision'): mock_cursor.__iter__.return_value = [(0,)] - result = database.contains_revision(sentinel.revision) + with patch.object(mysql.connector, 'connect', return_value=mock_connection), \ + mock_database.connect() as database: + result = database.contains_revision(sentinel.revision) # compare with boolean literal to test the type cast self.assertIs(result, False) @@ -83,7 +98,9 @@ def test_contains_revision(self): with self.subTest(name='old revision'): mock_cursor.__iter__.return_value = [(1,)] - result = database.contains_revision(sentinel.revision) + with patch.object(mysql.connector, 'connect', return_value=mock_connection), \ + mock_database.connect() as database: + result = database.contains_revision(sentinel.revision) # compare with boolean literal to test the type cast self.assertIs(result, True) @@ -98,12 +115,15 @@ def test_insert_metadata(self): mock_connection = MagicMock() mock_cursor = mock_connection.cursor() - database = Database(mock_connection, table_name=sentinel.table_name, hhs_dataset_id=sentinel.hhs_dataset_id) - result = database.insert_metadata( - sentinel.publication_date, - sentinel.revision, - sentinel.meta_json) + mock_database = TestDatabase.create_mock_database(table_name = sentinel.table_name, dataset_id = sentinel.hhs_dataset_id) + + with patch.object(mysql.connector, 'connect', return_value=mock_connection), \ + mock_database.connect() as database: + result = database.insert_metadata( + sentinel.publication_date, + sentinel.revision, + sentinel.meta_json) self.assertIsNone(result) actual_values = mock_cursor.execute.call_args[0][-1] @@ -130,10 +150,8 @@ def test_insert_dataset(self): ] mock_connection = MagicMock() mock_cursor = mock_connection.cursor() - database = Database( - mock_connection, - table_name=table_name, - columns_and_types=columns_and_types) + + mock_database = TestDatabase.create_mock_database(table_name = table_name, csv_cols=columns_and_types, issue_col='issue') dataset = pd.DataFrame.from_dict({ 'str_col': ['a', 'b', 'c', math.nan, 'e', 'f'], @@ -141,14 +159,16 @@ def test_insert_dataset(self): 'float_col': ['0.1', '0.2', '0.3', '0.4', '0.5', math.nan], }) - result = database.insert_dataset(sentinel.publication_date, dataset) + with patch.object(mysql.connector, 'connect', return_value=mock_connection), \ + mock_database.connect() as database: + result = database.insert_dataset(sentinel.publication_date, dataset) self.assertIsNone(result) self.assertEqual(mock_cursor.executemany.call_count, 1) actual_sql = mock_cursor.executemany.call_args[0][0] self.assertIn( - 'INSERT INTO `test_table` (`id`, `publication_date`, `sql_str_col`, `sql_int_col`, `sql_float_col`)', + 'INSERT INTO `test_table` (`id`, `issue`, `sql_str_col`, `sql_int_col`, `sql_float_col`)', actual_sql) expected_values = [ @@ -168,3 +188,88 @@ def test_insert_dataset(self): # [i]: the ith row inserted in the executemany actual = mock_cursor.executemany.call_args_list[0][0][-1][i] self.assertEqual(actual, (0, sentinel.publication_date) + expected) + + def test_issues_to_fetch(self): + test_metadata = pd.DataFrame({ + "date": [pd.Timestamp("2021-03-13 00:00:00"), + pd.Timestamp("2021-03-14 00:00:00"), + pd.Timestamp("2021-03-15 00:00:01"), + pd.Timestamp("2021-03-15 00:00:00"), + pd.Timestamp("2021-03-16 00:00:00") + ], + "Archive Link": ["a", "b", "d", "c", "e"] + }).set_index("date") + + issues = TestDatabase.create_mock_database().issues_to_fetch(test_metadata, pd.Timestamp("2021-3-13"), pd.Timestamp("2021-3-16")) + self.assertEqual(issues, + {date(2021, 3, 14): [("b", pd.Timestamp("2021-03-14 00:00:00"))], + date(2021, 3, 15): [("c", pd.Timestamp("2021-03-15 00:00:00")), + ("d", pd.Timestamp("2021-03-15 00:00:01"))] + } + ) + + def test_max_issue(self): + """Get the most recent issue added to the database""" + + # Note that query logic is tested separately by integration tests. This + # test just checks that the function maps inputs to outputs as expected. + + mock_connection = MagicMock() + mock_cursor = mock_connection.cursor() + + mock_database = TestDatabase.create_mock_database() + + with patch.object(mysql.connector, 'connect', return_value=mock_connection), \ + mock_database.connect() as database: + result = database.get_max_issue() + + self.assertEqual(mock_cursor.execute.call_count, 1) + self.assertEqual(result, pd.Timestamp("1900/1/1"), "max issue when db is empty") + + def test_run_skip_old_dataset(self): + """Don't re-acquire an old dataset.""" + + mock_connection = MagicMock() + with patch.object(Network, 'fetch_metadata', return_value=self.test_utils.load_sample_metadata()), \ + patch.object(Network, 'fetch_dataset', return_value=None) as fetch_dataset, \ + patch.object(mysql.connector, 'connect', return_value=mock_connection), \ + patch.object(TestDatabase, 'get_max_issue', return_value=pd.Timestamp("2200/1/1")), \ + patch.object(TestDatabase, 'insert_metadata', return_value=None) as insert_metadata, \ + patch.object(TestDatabase, 'insert_dataset', return_value=None) as insert_dataset: + result = TestDatabase.create_mock_database().update_dataset() + + self.assertFalse(result) + fetch_dataset.assert_not_called() + insert_metadata.assert_not_called() + insert_dataset.assert_not_called() + + def test_run_acquire_new_dataset(self): + """Acquire a new dataset.""" + + mock_connection = MagicMock() + fake_dataset = pd.DataFrame({"date": [pd.Timestamp("2020/1/1")], "state": ["ca"]}) + fake_issues = {pd.Timestamp("2021/3/15"): [("url1", pd.Timestamp("2021-03-15 00:00:00")), + ("url2", pd.Timestamp("2021-03-15 00:00:00"))]} + + with patch.object(Network, 'fetch_metadata', return_value=self.test_utils.load_sample_metadata()), \ + patch.object(Network, 'fetch_dataset', return_value=fake_dataset), \ + patch.object(mysql.connector, 'connect', return_value=mock_connection), \ + patch.object(TestDatabase, 'get_max_issue', return_value=pd.Timestamp("1900/1/1")), \ + patch.object(TestDatabase, 'issues_to_fetch', return_value=fake_issues), \ + patch.object(TestDatabase, 'insert_metadata', return_value=None) as insert_metadata, \ + patch.object(TestDatabase, 'insert_dataset', return_value=None) as insert_dataset: + result = TestDatabase.create_mock_database(key_cols=["state", "date"]).update_dataset() + + self.assertTrue(result) + + # should have been called twice + insert_metadata.assert_called() + assert insert_metadata.call_count == 2 + # most recent call should be for the final revision at url2 + args = insert_metadata.call_args[0] + self.assertEqual(args[:2], (20210315, "url2")) + pd.testing.assert_frame_equal( + insert_dataset.call_args[0][1], + pd.DataFrame({"state": ["ca"], "date": [pd.Timestamp("2020/1/1")]}) + ) + self.assertEqual(insert_dataset.call_args[0][0], 20210315) diff --git a/tests/acquisition/covid_hosp/common/test_network.py b/tests/acquisition/covid_hosp/common/test_network.py index bf294f303..981f994f5 100644 --- a/tests/acquisition/covid_hosp/common/test_network.py +++ b/tests/acquisition/covid_hosp/common/test_network.py @@ -14,7 +14,7 @@ class NetworkTests(unittest.TestCase): - def test_fetch_metadata_for_dataset(self): + def test_fetch_metadata(self): """Fetch metadata as JSON.""" with patch.object(pd, "read_csv") as func: @@ -22,7 +22,7 @@ def test_fetch_metadata_for_dataset(self): {"Archive Link": ["test2", "test1", "test3"], "Update Date": ["2020/1/2", "2020/1/1", "2020/1/3"]} ) - result = Network.fetch_metadata_for_dataset("test") + result = Network.fetch_metadata("test") pd.testing.assert_frame_equal( result, pd.DataFrame( diff --git a/tests/acquisition/covid_hosp/state_daily/test_update.py b/tests/acquisition/covid_hosp/common/test_update.py similarity index 65% rename from tests/acquisition/covid_hosp/state_daily/test_update.py rename to tests/acquisition/covid_hosp/common/test_update.py index b716e6888..6253c176e 100644 --- a/tests/acquisition/covid_hosp/state_daily/test_update.py +++ b/tests/acquisition/covid_hosp/common/test_update.py @@ -2,7 +2,6 @@ # standard library import unittest -from unittest.mock import MagicMock from unittest.mock import patch from unittest.mock import sentinel @@ -11,14 +10,13 @@ # first party from delphi.epidata.acquisition.covid_hosp.common.test_utils import UnitTestUtils -from delphi.epidata.acquisition.covid_hosp.common.utils import Utils -from delphi.epidata.acquisition.covid_hosp.state_daily.update import Update -from delphi.epidata.acquisition.covid_hosp.state_daily.network import Network + +# Using state_daily as an example - the update files are identical across datasets from delphi.epidata.acquisition.covid_hosp.state_daily.database import Database # py3tester coverage target __test_target__ = \ - 'delphi.epidata.acquisition.covid_hosp.state_daily.update' + 'delphi.epidata.acquisition.covid_hosp.common.database' class UpdateTests(unittest.TestCase): @@ -28,21 +26,11 @@ def setUp(self): # configure test data self.test_utils = UnitTestUtils(__file__) - def test_run(self): - """Acquire a new dataset.""" - - with patch.object(Utils, 'update_dataset') as mock_update_dataset: - mock_update_dataset.return_value = sentinel.result - - result = Update.run() - - self.assertEqual(result, sentinel.result) - mock_update_dataset.assert_called_once() - - def test_merge(self): """Merging the set of updates in each batch is pretty tricky""" # Generate a set of synthetic updates with overlapping keys + db = Database() + keys = list(db.columns_and_types.keys()) N = 10 dfs = [] for i in range(5): @@ -50,13 +38,13 @@ def test_merge(self): dfs.append(pd.DataFrame(dict( state=range(1, N, i+1), reporting_cutoff_start=range(N+1, 2*N, i+1), - **{spec[0]: i+1 for spec in Database.ORDERED_CSV_COLUMNS[2:]} + **{spec[0]: i+1 for spec in keys[2:]} ))) # add a data frame with unseen keys dfs.append(pd.DataFrame(dict( state=[-1], reporting_cutoff_start=[-1], - **{spec[0]: -1 for spec in Database.ORDERED_CSV_COLUMNS[2:]} + **{spec[0]: -1 for spec in keys[2:]} ))) # now we need to know which data frame was used as the final value. the @@ -73,10 +61,10 @@ def test_merge(self): expected = pd.DataFrame(dict( state=states, reporting_cutoff_start=dates, - **{spec[0]: value_from for spec in Database.ORDERED_CSV_COLUMNS[2:]} - )).astype({spec[0]: 'float64' for spec in Database.ORDERED_CSV_COLUMNS[2:]} + **{spec[0]: value_from for spec in keys[2:]} + )).astype({spec[0]: 'float64' for spec in keys[2:]} ) - result = Utils.merge_by_key_cols(dfs, Database.KEY_COLS) + result = db.merge_by_key_cols(dfs) try: pd.testing.assert_frame_equal(result, expected) except: diff --git a/tests/acquisition/covid_hosp/common/test_utils.py b/tests/acquisition/covid_hosp/common/test_utils.py deleted file mode 100644 index 85dbd110c..000000000 --- a/tests/acquisition/covid_hosp/common/test_utils.py +++ /dev/null @@ -1,142 +0,0 @@ -"""Unit tests for utils.py.""" - -# standard library -from datetime import date -import unittest -from unittest.mock import MagicMock, PropertyMock, patch - -# first party -from delphi.epidata.acquisition.covid_hosp.common.test_utils import UnitTestUtils -from delphi.epidata.acquisition.covid_hosp.common.utils import Utils, CovidHospException - -#third party -import pandas as pd - -# py3tester coverage target -__test_target__ = 'delphi.epidata.acquisition.covid_hosp.common.utils' - - -class UtilsTests(unittest.TestCase): - - def setUp(self): - """Perform per-test setup.""" - - # configure test data - self.test_utils = UnitTestUtils(__file__) - - def test_launch_if_main_when_main(self): - """Launch the main entry point.""" - - mock_entry = MagicMock() - - Utils.launch_if_main(mock_entry, '__main__') - - mock_entry.assert_called_once() - - def test_launch_if_main_when_not_main(self): - """Don't launch the main entry point.""" - - mock_entry = MagicMock() - - Utils.launch_if_main(mock_entry, '__test__') - - mock_entry.assert_not_called() - - def test_int_from_date(self): - """Convert a YYY-MM-DD date to a YYYYMMDD int.""" - - self.assertEqual(Utils.int_from_date('2020-11-17'), 20201117) - self.assertEqual(Utils.int_from_date('2020/11/17'), 20201117) - self.assertEqual(Utils.int_from_date('2020/11/17 10:00:00'), 20201117) - - def test_parse_bool(self): - """Parse a boolean value from a string.""" - - with self.subTest(name='None'): - self.assertIsNone(Utils.parse_bool(None)) - - with self.subTest(name='empty'): - self.assertIsNone(Utils.parse_bool('')) - - with self.subTest(name='true'): - self.assertTrue(Utils.parse_bool('true')) - self.assertTrue(Utils.parse_bool('True')) - self.assertTrue(Utils.parse_bool('tRuE')) - - with self.subTest(name='false'): - self.assertFalse(Utils.parse_bool('false')) - self.assertFalse(Utils.parse_bool('False')) - self.assertFalse(Utils.parse_bool('fAlSe')) - - with self.subTest(name='exception'): - with self.assertRaises(CovidHospException): - Utils.parse_bool('maybe') - - def test_issues_to_fetch(self): - test_metadata = pd.DataFrame({ - "date": [pd.Timestamp("2021-03-13 00:00:00"), - pd.Timestamp("2021-03-14 00:00:00"), - pd.Timestamp("2021-03-15 00:00:01"), - pd.Timestamp("2021-03-15 00:00:00"), - pd.Timestamp("2021-03-16 00:00:00") - ], - "Archive Link": ["a", "b", "d", "c", "e"] - }).set_index("date") - - issues = Utils.issues_to_fetch(test_metadata, pd.Timestamp("2021-3-13"), pd.Timestamp("2021-3-16")) - self.assertEqual(issues, - {date(2021, 3, 14): [("b", pd.Timestamp("2021-03-14 00:00:00"))], - date(2021, 3, 15): [("c", pd.Timestamp("2021-03-15 00:00:00")), - ("d", pd.Timestamp("2021-03-15 00:00:01"))] - } - ) - - def test_run_skip_old_dataset(self): - """Don't re-acquire an old dataset.""" - - mock_network = MagicMock() - mock_network.fetch_metadata.return_value = \ - self.test_utils.load_sample_metadata() - mock_database = MagicMock() - with mock_database.connect() as mock_connection: - pass - mock_connection.get_max_issue.return_value = pd.Timestamp("2200/1/1") - - result = Utils.update_dataset(database=mock_database, network=mock_network) - - self.assertFalse(result) - mock_network.fetch_dataset.assert_not_called() - mock_connection.insert_metadata.assert_not_called() - mock_connection.insert_dataset.assert_not_called() - - def test_run_acquire_new_dataset(self): - """Acquire a new dataset.""" - - mock_network = MagicMock() - mock_network.fetch_metadata.return_value = \ - self.test_utils.load_sample_metadata() - fake_dataset = pd.DataFrame({"date": [pd.Timestamp("2020/1/1")], "state": ["ca"]}) - mock_network.fetch_dataset.return_value = fake_dataset - mock_database = MagicMock() - with mock_database.connect() as mock_connection: - pass - type(mock_connection).KEY_COLS = PropertyMock(return_value=["state", "date"]) - mock_connection.get_max_issue.return_value = pd.Timestamp("1900/1/1") - with patch.object(Utils, 'issues_to_fetch') as mock_issues: - mock_issues.return_value = {pd.Timestamp("2021/3/15"): [("url1", pd.Timestamp("2021-03-15 00:00:00")), - ("url2", pd.Timestamp("2021-03-15 00:00:00"))]} - result = Utils.update_dataset(database=mock_database, network=mock_network) - - self.assertTrue(result) - - # should have been called twice - mock_connection.insert_metadata.assert_called() - assert mock_connection.insert_metadata.call_count == 2 - # most recent call should be for the final revision at url2 - args = mock_connection.insert_metadata.call_args[0] - self.assertEqual(args[:2], (20210315, "url2")) - pd.testing.assert_frame_equal( - mock_connection.insert_dataset.call_args[0][1], - pd.DataFrame({"state": ["ca"], "date": [pd.Timestamp("2020/1/1")]}) - ) - self.assertEqual(mock_connection.insert_dataset.call_args[0][0], 20210315) diff --git a/tests/acquisition/covid_hosp/facility/test_database.py b/tests/acquisition/covid_hosp/facility/test_database.py index 2e1ee29fe..2140e355d 100644 --- a/tests/acquisition/covid_hosp/facility/test_database.py +++ b/tests/acquisition/covid_hosp/facility/test_database.py @@ -3,7 +3,10 @@ # standard library import unittest from unittest.mock import MagicMock -from unittest.mock import sentinel +from unittest.mock import patch, sentinel + +# third party +import mysql.connector # first party from delphi.epidata.acquisition.covid_hosp.common.test_utils import UnitTestUtils @@ -29,34 +32,35 @@ def test_insert_dataset(self): mock_connection = MagicMock() mock_cursor = mock_connection.cursor() - database = Database(mock_connection) - dataset = self.test_utils.load_sample_dataset() - - result = database.insert_dataset(sentinel.publication_date, dataset) - - self.assertIsNone(result) - # once for the values, once for the keys - self.assertEqual(mock_cursor.executemany.call_count, 2) - - # [0]: the first call() object - # [0]: get the positional args out of the call() object - # [-1]: the last arg of the executemany call - # [-1]: the last row inserted in the executemany - last_query_values = mock_cursor.executemany.call_args_list[0][0][-1][-1] - expected_query_values = ( - 0, sentinel.publication_date, '450822', 20201130, - '6800 N MACARTHUR BLVD', 61.1, 7, 428, 60.9, 7, 426, 61.1, 7, 428, - '450822', 'IRVING', '48113', '15', '6', 'MEDICAL CITY LAS COLINAS', - 'Short Term', 14.0, 7, 98, -999999.0, 7, -999999, 69.3, 7, 485, 69.0, - 7, 483, True, True, -999999, -999999, -999999, -999999, -999999, - -999999, -999999, 7, 11, -999999, -999999, -999999, -999999, -999999, - -999999, -999999, -999999, -999999, 16, -999999, -999999, -999999, - -999999, 2, -999999, 11, -999999, 58, 536, 3, 8, 5, 17, 13, 10, 5.9, - 7, 41, -999999.0, 7, 16, -999999.0, 7, 14, 'TX', 6.9, 7, 48, 6.1, 7, - 43, 69.3, 7, 485, 14.3, 7, 100, -999999.0, 7, -999999, -999999.0, 7, - -999999, -999999.0, 7, -999999, -999999.0, 7, -999999, 9, 18, 14, 0, - 1, 4, 6.1, 7, 43, '77777') - self.assertEqual(len(last_query_values), len(expected_query_values)) - - for actual, expected in zip(last_query_values, expected_query_values): - self.assertAlmostEqual(actual, expected) + with patch.object(mysql.connector, 'connect', return_value=mock_connection), \ + Database().connect() as database: + dataset = self.test_utils.load_sample_dataset() + + result = database.insert_dataset(sentinel.publication_date, dataset) + + self.assertIsNone(result) + # once for the values, once for the keys + self.assertEqual(mock_cursor.executemany.call_count, 2) + + # [0]: the first call() object + # [0]: get the positional args out of the call() object + # [-1]: the last arg of the executemany call + # [-1]: the last row inserted in the executemany + last_query_values = mock_cursor.executemany.call_args_list[0][0][-1][-1] + expected_query_values = ( + 0, sentinel.publication_date, '450822', 20201130, + '6800 N MACARTHUR BLVD', 61.1, 7, 428, 60.9, 7, 426, 61.1, 7, 428, + '450822', 'IRVING', '48113', '15', '6', 'MEDICAL CITY LAS COLINAS', + 'Short Term', 14.0, 7, 98, -999999.0, 7, -999999, 69.3, 7, 485, 69.0, + 7, 483, True, True, -999999, -999999, -999999, -999999, -999999, + -999999, -999999, 7, 11, -999999, -999999, -999999, -999999, -999999, + -999999, -999999, -999999, -999999, 16, -999999, -999999, -999999, + -999999, 2, -999999, 11, -999999, 58, 536, 3, 8, 5, 17, 13, 10, 5.9, + 7, 41, -999999.0, 7, 16, -999999.0, 7, 14, 'TX', 6.9, 7, 48, 6.1, 7, + 43, 69.3, 7, 485, 14.3, 7, 100, -999999.0, 7, -999999, -999999.0, 7, + -999999, -999999.0, 7, -999999, -999999.0, 7, -999999, 9, 18, 14, 0, + 1, 4, 6.1, 7, 43, '77777') + self.assertEqual(len(last_query_values), len(expected_query_values)) + + for actual, expected in zip(last_query_values, expected_query_values): + self.assertAlmostEqual(actual, expected) diff --git a/tests/acquisition/covid_hosp/facility/test_network.py b/tests/acquisition/covid_hosp/facility/test_network.py deleted file mode 100644 index 3872a206b..000000000 --- a/tests/acquisition/covid_hosp/facility/test_network.py +++ /dev/null @@ -1,25 +0,0 @@ -"""Unit tests for network.py.""" - -# standard library -import unittest -from unittest.mock import patch -from unittest.mock import sentinel - -from delphi.epidata.acquisition.covid_hosp.facility.network import Network - -# py3tester coverage target -__test_target__ = 'delphi.epidata.acquisition.covid_hosp.facility.network' - - -class NetworkTests(unittest.TestCase): - - def test_fetch_metadata(self): - """Fetch metadata as JSON.""" - - with patch.object(Network, 'fetch_metadata_for_dataset') as func: - func.return_value = sentinel.json - - result = Network.fetch_metadata() - - self.assertEqual(result, sentinel.json) - func.assert_called_once_with(dataset_id=Network.METADATA_ID) diff --git a/tests/acquisition/covid_hosp/facility/test_update.py b/tests/acquisition/covid_hosp/facility/test_update.py deleted file mode 100644 index 01fb049b8..000000000 --- a/tests/acquisition/covid_hosp/facility/test_update.py +++ /dev/null @@ -1,27 +0,0 @@ -"""Unit tests for update.py.""" - -# standard library -import unittest -from unittest.mock import patch -from unittest.mock import sentinel - -# first party -from delphi.epidata.acquisition.covid_hosp.common.utils import Utils -from delphi.epidata.acquisition.covid_hosp.facility.update import Update - -# py3tester coverage target -__test_target__ = 'delphi.epidata.acquisition.covid_hosp.facility.update' - - -class UpdateTests(unittest.TestCase): - - def test_run(self): - """Acquire a new dataset.""" - - with patch.object(Utils, 'update_dataset') as mock_update_dataset: - mock_update_dataset.return_value = sentinel.result - - result = Update.run() - - self.assertEqual(result, sentinel.result) - mock_update_dataset.assert_called_once() diff --git a/tests/acquisition/covid_hosp/state_daily/test_database.py b/tests/acquisition/covid_hosp/state_daily/test_database.py index ae9acd098..104fcdfb6 100644 --- a/tests/acquisition/covid_hosp/state_daily/test_database.py +++ b/tests/acquisition/covid_hosp/state_daily/test_database.py @@ -3,7 +3,10 @@ # standard library import unittest from unittest.mock import MagicMock -from unittest.mock import sentinel +from unittest.mock import patch, sentinel + +# third party +import mysql.connector # first party from delphi.epidata.acquisition.covid_hosp.state_daily.database import Database @@ -32,44 +35,30 @@ def test_insert_dataset(self): mock_connection = MagicMock() mock_cursor = mock_connection.cursor() - database = Database(mock_connection) - dataset = self.test_utils.load_sample_dataset() - - result = database.insert_dataset(sentinel.issue, dataset) - - self.assertIsNone(result) - self.assertEqual(mock_cursor.executemany.call_count, 1) - - last_query_values = mock_cursor.executemany.call_args[0][-1][-1] - expected_query_values = ( - 0, sentinel.issue, 'WY', 20201209, - 0.2519685039370078, 29, 127, 32, 0.4233576642335766, 31, 137, 58, 22, 2, - 7, None, 2, 8, 0, 1, '2', 5, 29, 3, 4, 0.1172985781990521, 29, 1688, 198, - 1729, 31, 856, 31, 198, 29, 0.4950838635049161, 31, 1729, 856, 5, 6, 7, - 0.2362768496420047, 29, 838, 198, 26, 8, 9, 10, 11, 12, 13, 14, 15, 16, - 17, 18, 19, 20, 21, 22, 23, 31, 24, 25, 15, 26, 27, 28, 29, 30, 31, 32, - 33, 34, 35, 36, 37, 38, 39, 40, 41, 29, 42, 43, 44, 45, 0, 29, 0, 29, - 46, 47, 48, 49, 50, 51, 52, 58, 31, 32, 29, 32, 31, 196, 29, 189, 31, - 53, 54, 55, 56, 2, 29, 2, 29, 137, 31) - self.assertEqual(len(last_query_values), len(expected_query_values)) - - for actual, expected in zip(last_query_values, expected_query_values): - if isinstance(expected, float): - self.assertAlmostEqual(actual, expected) - else: - self.assertEqual(actual, expected) - - def test_max_issue(self): - """Get the most recent issue added to the database""" - - # Note that query logic is tested separately by integration tests. This - # test just checks that the function maps inputs to outputs as expected. - - mock_connection = MagicMock() - mock_cursor = mock_connection.cursor() - database = Database(mock_connection) - - result = database.get_max_issue() - - self.assertEqual(mock_cursor.execute.call_count, 1) - self.assertEqual(result, pd.Timestamp("1900/1/1"), "max issue when db is empty") + with patch.object(mysql.connector, 'connect', return_value=mock_connection), \ + Database().connect() as database: + dataset = self.test_utils.load_sample_dataset() + + result = database.insert_dataset(sentinel.issue, dataset) + + self.assertIsNone(result) + self.assertEqual(mock_cursor.executemany.call_count, 1) + + last_query_values = mock_cursor.executemany.call_args[0][-1][-1] + expected_query_values = ( + 0, sentinel.issue, 'WY', 20201209, + 0.2519685039370078, 29, 127, 32, 0.4233576642335766, 31, 137, 58, 22, 2, + 7, None, 2, 8, 0, 1, '2', 5, 29, 3, 4, 0.1172985781990521, 29, 1688, 198, + 1729, 31, 856, 31, 198, 29, 0.4950838635049161, 31, 1729, 856, 5, 6, 7, + 0.2362768496420047, 29, 838, 198, 26, 8, 9, 10, 11, 12, 13, 14, 15, 16, + 17, 18, 19, 20, 21, 22, 23, 31, 24, 25, 15, 26, 27, 28, 29, 30, 31, 32, + 33, 34, 35, 36, 37, 38, 39, 40, 41, 29, 42, 43, 44, 45, 0, 29, 0, 29, + 46, 47, 48, 49, 50, 51, 52, 58, 31, 32, 29, 32, 31, 196, 29, 189, 31, + 53, 54, 55, 56, 2, 29, 2, 29, 137, 31) + self.assertEqual(len(last_query_values), len(expected_query_values)) + + for actual, expected in zip(last_query_values, expected_query_values): + if isinstance(expected, float): + self.assertAlmostEqual(actual, expected) + else: + self.assertEqual(actual, expected) diff --git a/tests/acquisition/covid_hosp/state_daily/test_network.py b/tests/acquisition/covid_hosp/state_daily/test_network.py deleted file mode 100644 index d0b03df75..000000000 --- a/tests/acquisition/covid_hosp/state_daily/test_network.py +++ /dev/null @@ -1,46 +0,0 @@ -"""Unit tests for network.py.""" - -# standard library -import requests -import unittest -from unittest.mock import patch -from unittest.mock import sentinel - -# first party -from delphi.epidata.acquisition.covid_hosp.common.test_utils import UnitTestUtils -from delphi.epidata.acquisition.covid_hosp.state_daily.network import Network - -# third party -import pandas as pd - -# py3tester coverage target -__test_target__ = \ - 'delphi.epidata.acquisition.covid_hosp.state_daily.network' - - -class NetworkTests(unittest.TestCase): - def setUp(self): - """Perform per-test setup.""" - - # configure test data - self.test_utils = UnitTestUtils(__file__) - - def test_fetch_metadata(self): - """Fetch metadata as JSON.""" - - with patch.object(Network, 'fetch_metadata_for_dataset') as func: - func.return_value = sentinel.json - - result = Network.fetch_metadata() - - self.assertEqual(result, sentinel.json) - func.assert_called_once_with(dataset_id=Network.METADATA_ID) - - def test_fetch_revisions(self): - """Scrape CSV files from revision pages""" - - test_metadata = pd.DataFrame( - {"Archive Link": ["test1", "test2", "test3"]}, - index=pd.date_range("2020/1/1", "2020/1/3") - ) - assert Network.fetch_revisions(test_metadata, pd.Timestamp("2020/1/1")) == ["test2", "test3"] \ No newline at end of file diff --git a/tests/acquisition/covid_hosp/state_timeseries/test_database.py b/tests/acquisition/covid_hosp/state_timeseries/test_database.py index ecea27f59..b6de0d5ce 100644 --- a/tests/acquisition/covid_hosp/state_timeseries/test_database.py +++ b/tests/acquisition/covid_hosp/state_timeseries/test_database.py @@ -3,7 +3,10 @@ # standard library import unittest from unittest.mock import MagicMock -from unittest.mock import sentinel +from unittest.mock import patch, sentinel + +# third party +import mysql.connector # first party from delphi.epidata.acquisition.covid_hosp.common.test_utils import UnitTestUtils @@ -30,29 +33,30 @@ def test_insert_dataset(self): mock_connection = MagicMock() mock_cursor = mock_connection.cursor() - database = Database(mock_connection) - dataset = self.test_utils.load_sample_dataset() - - result = database.insert_dataset(sentinel.issue, dataset) - - self.assertIsNone(result) - self.assertEqual(mock_cursor.executemany.call_count, 1) - - last_query_values = mock_cursor.executemany.call_args[0][-1][-1] - expected_query_values = ( - 0, sentinel.issue, 'WY', 20200826, 0.0934579439252336, 26, 107, 10, - 0.4298245614035088, 28, 114, 49, 19, 7, 2, None, 4, 2, 0, 1, '2', 0, 26, - 3, 4, 0.0119465917076598, 26, 1423, 17, 1464, 28, 629, 28, 17, 26, - 0.4296448087431694, 28, 1464, 629, 5, 6, 7, 0.0275974025974025, 26, 616, - 17, 2, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 28, - 24, 25, 13, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, - 41, 26, 42, 43, 44, 45, 0, 21, 0, 22, 46, 47, 48, 49, 50, 51, 52, 49, - 28, 10, 26, 7, 28, 17, 26, 14, 28, 53, 54, 55, 56, 0, 26, 0, 26, - 114, 28) - self.assertEqual(len(last_query_values), len(expected_query_values)) - - for actual, expected in zip(last_query_values, expected_query_values): - if isinstance(expected, float): - self.assertAlmostEqual(actual, expected) - else: - self.assertEqual(actual, expected) + with patch.object(mysql.connector, 'connect', return_value=mock_connection), \ + Database().connect() as database: + dataset = self.test_utils.load_sample_dataset() + + result = database.insert_dataset(sentinel.issue, dataset) + + self.assertIsNone(result) + self.assertEqual(mock_cursor.executemany.call_count, 1) + + last_query_values = mock_cursor.executemany.call_args[0][-1][-1] + expected_query_values = ( + 0, sentinel.issue, 'WY', 20200826, 0.0934579439252336, 26, 107, 10, + 0.4298245614035088, 28, 114, 49, 19, 7, 2, None, 4, 2, 0, 1, '2', 0, 26, + 3, 4, 0.0119465917076598, 26, 1423, 17, 1464, 28, 629, 28, 17, 26, + 0.4296448087431694, 28, 1464, 629, 5, 6, 7, 0.0275974025974025, 26, 616, + 17, 2, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 28, + 24, 25, 13, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, + 41, 26, 42, 43, 44, 45, 0, 21, 0, 22, 46, 47, 48, 49, 50, 51, 52, 49, + 28, 10, 26, 7, 28, 17, 26, 14, 28, 53, 54, 55, 56, 0, 26, 0, 26, + 114, 28) + self.assertEqual(len(last_query_values), len(expected_query_values)) + + for actual, expected in zip(last_query_values, expected_query_values): + if isinstance(expected, float): + self.assertAlmostEqual(actual, expected) + else: + self.assertEqual(actual, expected) diff --git a/tests/acquisition/covid_hosp/state_timeseries/test_network.py b/tests/acquisition/covid_hosp/state_timeseries/test_network.py deleted file mode 100644 index 420826e83..000000000 --- a/tests/acquisition/covid_hosp/state_timeseries/test_network.py +++ /dev/null @@ -1,27 +0,0 @@ -"""Unit tests for network.py.""" - -# standard library -import unittest -from unittest.mock import patch -from unittest.mock import sentinel - -from delphi.epidata.acquisition.covid_hosp.state_timeseries.network import Network - - -# py3tester coverage target -__test_target__ = \ - 'delphi.epidata.acquisition.covid_hosp.state_timeseries.network' - - -class NetworkTests(unittest.TestCase): - - def test_fetch_metadata(self): - """Fetch metadata as JSON.""" - - with patch.object(Network, 'fetch_metadata_for_dataset') as func: - func.return_value = sentinel.json - - result = Network.fetch_metadata() - - self.assertEqual(result, sentinel.json) - func.assert_called_once_with(dataset_id=Network.METADATA_ID) diff --git a/tests/acquisition/covid_hosp/state_timeseries/test_update.py b/tests/acquisition/covid_hosp/state_timeseries/test_update.py deleted file mode 100644 index 57f6c2b54..000000000 --- a/tests/acquisition/covid_hosp/state_timeseries/test_update.py +++ /dev/null @@ -1,28 +0,0 @@ -"""Unit tests for update.py.""" - -# standard library -import unittest -from unittest.mock import patch -from unittest.mock import sentinel - -# first party -from delphi.epidata.acquisition.covid_hosp.common.utils import Utils -from delphi.epidata.acquisition.covid_hosp.state_timeseries.update import Update - -# py3tester coverage target -__test_target__ = \ - 'delphi.epidata.acquisition.covid_hosp.state_timeseries.update' - - -class UpdateTests(unittest.TestCase): - - def test_run(self): - """Acquire a new dataset.""" - - with patch.object(Utils, 'update_dataset') as mock_update_dataset: - mock_update_dataset.return_value = sentinel.result - - result = Update.run() - - self.assertEqual(result, sentinel.result) - mock_update_dataset.assert_called_once() diff --git a/tests/common/covid_hosp/test_covid_hosp_schema_io.py b/tests/common/covid_hosp/test_covid_hosp_schema_io.py index dae06d817..775f3b881 100644 --- a/tests/common/covid_hosp/test_covid_hosp_schema_io.py +++ b/tests/common/covid_hosp/test_covid_hosp_schema_io.py @@ -1,9 +1,8 @@ from pathlib import Path import unittest -from delphi.epidata.acquisition.covid_hosp.common.database import Columndef -from delphi.epidata.acquisition.covid_hosp.common.utils import Utils -from delphi.epidata.common.covid_hosp.covid_hosp_schema_io import CovidHospSomething +from delphi.epidata.common.covid_hosp.utils import TypeUtils +from delphi.epidata.common.covid_hosp.covid_hosp_schema_io import Columndef, CovidHospSomething # py3tester coverage target (equivalent to `import *`) @@ -24,11 +23,11 @@ def test_get_ds_info(self): assert AGGREGATE_KEY_COLS == ["address", "ccn", "city", "fips_code", "geocoded_hospital_address", "hhs_ids", "hospital_name", "hospital_pk", "hospital_subtype", "is_metro_micro", "state", "zip"] assert ORDERED_CSV_COLUMNS == [ Columndef('hospital_pk', 'hospital_pk', str), - Columndef('collection_week', 'collection_week', Utils.int_from_date), - Columndef('reporting_cutoff_start', 'date', Utils.int_from_date), + Columndef('collection_week', 'collection_week', TypeUtils.int_from_date), + Columndef('reporting_cutoff_start', 'date', TypeUtils.int_from_date), Columndef('all_adult_hospital_beds_7_day_avg', 'all_adult_hospital_beds_7_day_avg', float), Columndef('all_adult_hospital_beds_7_day_coverage', 'all_adult_hospital_beds_7_day_coverage', int), Columndef('fips_code', 'fips_code', str), - Columndef('geocoded_hospital_address', 'geocoded_hospital_address', Utils.limited_geocode), - Columndef('is_corrected', 'is_corrected', Utils.parse_bool) + Columndef('geocoded_hospital_address', 'geocoded_hospital_address', TypeUtils.limited_geocode), + Columndef('is_corrected', 'is_corrected', TypeUtils.parse_bool) ] diff --git a/tests/common/covid_hosp/test_utils.py b/tests/common/covid_hosp/test_utils.py new file mode 100644 index 000000000..f9b494447 --- /dev/null +++ b/tests/common/covid_hosp/test_utils.py @@ -0,0 +1,42 @@ +"""Unit tests for utils.py.""" + +# standard library +import unittest + +# first party +from delphi.epidata.common.covid_hosp.utils import TypeUtils, CovidHospException + +# py3tester coverage target +__test_target__ = 'delphi.epidata.common.covid_hosp.utils' + +class UtilsTests(unittest.TestCase): + + def test_int_from_date(self): + """Convert a YYY-MM-DD date to a YYYYMMDD int.""" + + self.assertEqual(TypeUtils.int_from_date('2020-11-17'), 20201117) + self.assertEqual(TypeUtils.int_from_date('2020/11/17'), 20201117) + self.assertEqual(TypeUtils.int_from_date('2020/11/17 10:00:00'), 20201117) + + def test_parse_bool(self): + """Parse a boolean value from a string.""" + + with self.subTest(name='None'): + self.assertIsNone(TypeUtils.parse_bool(None)) + + with self.subTest(name='empty'): + self.assertIsNone(TypeUtils.parse_bool('')) + + with self.subTest(name='true'): + self.assertTrue(TypeUtils.parse_bool('true')) + self.assertTrue(TypeUtils.parse_bool('True')) + self.assertTrue(TypeUtils.parse_bool('tRuE')) + + with self.subTest(name='false'): + self.assertFalse(TypeUtils.parse_bool('false')) + self.assertFalse(TypeUtils.parse_bool('False')) + self.assertFalse(TypeUtils.parse_bool('fAlSe')) + + with self.subTest(name='exception'): + with self.assertRaises(CovidHospException): + TypeUtils.parse_bool('maybe')