Skip to content

Commit 890e8b8

Browse files
committed
moving over to geomapper's generic function
1 parent 07c6c90 commit 890e8b8

File tree

3 files changed

+21
-160
lines changed

3 files changed

+21
-160
lines changed

_delphi_utils_python/DEVELOP.md

+2
Original file line numberDiff line numberDiff line change
@@ -54,3 +54,5 @@ When you are finished, the virtual environment can be deactivated and
5454
deactivate
5555
rm -r env
5656
```
57+
## Releasing the module
58+
If you have made enough changes that it warrants updating [the PyPi project](https://pypi.org/project/delphi-utils/), currently this is done as part of merging from `main` to `prod`.

nwss_wastewater/delphi_nwss/run.py

+18-56
Original file line numberDiff line numberDiff line change
@@ -26,62 +26,18 @@
2626
from datetime import datetime
2727

2828
import numpy as np
29-
import pandas as pd
30-
from delphi_utils import S3ArchiveDiffer, get_structured_logger, create_export_csv
29+
from delphi_utils import (
30+
GeoMapper,
31+
S3ArchiveDiffer,
32+
get_structured_logger,
33+
create_export_csv,
34+
)
3135
from delphi_utils.nancodes import add_default_nancodes
3236

3337
from .constants import GEOS, METRIC_SIGNALS, PROVIDER_NORMS, SIGNALS
3438
from .pull import pull_nwss_data
3539

3640

37-
def sum_all_nan(x):
38-
"""Return a normal sum unless everything is NaN, then return that."""
39-
all_nan = np.isnan(x).all()
40-
if all_nan:
41-
return np.nan
42-
return np.nansum(x)
43-
44-
45-
def generate_weights(df, column_aggregating="pcr_conc_smoothed"):
46-
"""
47-
Weigh column_aggregating by population.
48-
49-
generate the relevant population amounts, and create a weighted but
50-
unnormalized column, derived from `column_aggregating`
51-
"""
52-
# set the weight of places with na's to zero
53-
df[f"relevant_pop_{column_aggregating}"] = (
54-
df["population_served"] * np.abs(df[column_aggregating]).notna()
55-
)
56-
# generate the weighted version
57-
df[f"weighted_{column_aggregating}"] = (
58-
df[column_aggregating] * df[f"relevant_pop_{column_aggregating}"]
59-
)
60-
return df
61-
62-
63-
def weighted_state_sum(df: pd.DataFrame, geo: str, sensor: str):
64-
"""Sum sensor, weighted by population for non NA's, grouped by state."""
65-
agg_df = df.groupby(["timestamp", geo]).agg(
66-
{f"relevant_pop_{sensor}": "sum", f"weighted_{sensor}": sum_all_nan}
67-
)
68-
agg_df["val"] = agg_df[f"weighted_{sensor}"] / agg_df[f"relevant_pop_{sensor}"]
69-
agg_df = agg_df.reset_index()
70-
agg_df = agg_df.rename(columns={"state": "geo_id"})
71-
return agg_df
72-
73-
74-
def weighted_nation_sum(df: pd.DataFrame, sensor: str):
75-
"""Sum sensor, weighted by population for non NA's."""
76-
agg_df = df.groupby("timestamp").agg(
77-
{f"relevant_pop_{sensor}": "sum", f"weighted_{sensor}": sum_all_nan}
78-
)
79-
agg_df["val"] = agg_df[f"weighted_{sensor}"] / agg_df[f"relevant_pop_{sensor}"]
80-
agg_df = agg_df.reset_index()
81-
agg_df["geo_id"] = "us"
82-
return agg_df
83-
84-
8541
def add_needed_columns(df, col_names=None):
8642
"""Short util to add expected columns not found in the dataset."""
8743
if col_names is None:
@@ -140,7 +96,7 @@ def run_module(params):
14096
## build the base version of the signal at the most detailed geo level you can get.
14197
## compute stuff here or farm out to another function or file
14298
df_pull = pull_nwss_data(socrata_token, logger)
143-
## aggregate
99+
geomapper = GeoMapper()
144100
# iterate over the providers and the normalizations that they specifically provide
145101
for provider, normalization in zip(
146102
PROVIDER_NORMS["provider"], PROVIDER_NORMS["normalization"]
@@ -153,16 +109,22 @@ def run_module(params):
153109
for sensor in [*SIGNALS, *METRIC_SIGNALS]:
154110
full_sensor_name = sensor + "_" + provider + "_" + normalization
155111
df_prov_norm = df_prov_norm.rename(columns={sensor: full_sensor_name})
156-
# add weighed column
157-
df = generate_weights(df_prov_norm, full_sensor_name)
158112
for geo in GEOS:
159113
logger.info(
160114
"Generating signal and exporting to CSV", metric=full_sensor_name
161115
)
162116
if geo == "nation":
163-
agg_df = weighted_nation_sum(df, full_sensor_name)
164-
else:
165-
agg_df = weighted_state_sum(df, geo, full_sensor_name)
117+
df_prov_norm["nation"] = "us"
118+
agg_df = geomapper.aggregate_by_weighted_sum(
119+
df_prov_norm,
120+
geo,
121+
full_sensor_name,
122+
"timestamp",
123+
"population_served",
124+
)
125+
agg_df = agg_df.rename(
126+
columns={geo: "geo_id", f"weighted_{full_sensor_name}": "val"}
127+
)
166128
# add se, sample_size, and na codes
167129
agg_df = add_needed_columns(agg_df)
168130
# actual export

nwss_wastewater/tests/test_run.py

+1-104
Original file line numberDiff line numberDiff line change
@@ -2,110 +2,7 @@
22
import pandas as pd
33
from pandas.testing import assert_frame_equal
44

5-
from delphi_nwss.run import (
6-
add_needed_columns,
7-
generate_weights,
8-
sum_all_nan,
9-
weighted_state_sum,
10-
weighted_nation_sum,
11-
)
12-
13-
14-
def test_sum_all_nan():
15-
"""Check that sum_all_nan returns NaN iff everything is a NaN"""
16-
assert sum_all_nan(np.array([3, 5])) == 8
17-
assert np.isclose(sum_all_nan([np.nan, 3, 5]), 8)
18-
assert np.isnan(np.array([np.nan, np.nan])).all()
19-
20-
21-
def test_weight_generation():
22-
dataFrame = pd.DataFrame(
23-
{
24-
"a": [1, 2, 3, 4, np.nan],
25-
"b": [5, 6, 7, 8, 9],
26-
"population_served": [10, 5, 8, 1, 3],
27-
}
28-
)
29-
weighted = generate_weights(dataFrame, column_aggregating="a")
30-
weighted_by_hand = pd.DataFrame(
31-
{
32-
"a": [1, 2, 3, 4, np.nan],
33-
"b": [5, 6, 7, 8, 9],
34-
"population_served": [10, 5, 8, 1, 3],
35-
"relevant_pop_a": [10, 5, 8, 1, 0],
36-
"weighted_a": [10.0, 2 * 5.0, 3 * 8, 4.0 * 1, np.nan * 0],
37-
}
38-
)
39-
assert_frame_equal(weighted, weighted_by_hand)
40-
# operations are in-place
41-
assert_frame_equal(weighted, dataFrame)
42-
43-
44-
def test_weighted_state_sum():
45-
dataFrame = pd.DataFrame(
46-
{
47-
"state": ["al", "al", "ca", "ca", "nd", "me", "me"],
48-
"timestamp": np.zeros(7),
49-
"a": [1, 2, 3, 4, 12, -2, 2],
50-
"b": [5, 6, 7, np.nan, np.nan, -1, -2],
51-
"population_served": [10, 5, 8, 1, 3, 1, 2],
52-
}
53-
)
54-
weighted = generate_weights(dataFrame, column_aggregating="b")
55-
agg = weighted_state_sum(weighted, "state", "b")
56-
expected_agg = pd.DataFrame(
57-
{
58-
"timestamp": np.zeros(4),
59-
"geo_id": ["al", "ca", "me", "nd"],
60-
"relevant_pop_b": [10 + 5, 8 + 0, 1 + 2, 0],
61-
"weighted_b": [5 * 10 + 6 * 5, 7 * 8 + 0, 1 * -1 + -2 * 2, np.nan],
62-
"val": [80 / 15, 56 / 8, -5 / 3, np.nan],
63-
}
64-
)
65-
assert_frame_equal(agg, expected_agg)
66-
67-
weighted = generate_weights(dataFrame, column_aggregating="a")
68-
agg_a = weighted_state_sum(weighted, "state", "a")
69-
expected_agg_a = pd.DataFrame(
70-
{
71-
"timestamp": np.zeros(4),
72-
"geo_id": ["al", "ca", "me", "nd"],
73-
"relevant_pop_a": [10 + 5, 8 + 1, 1 + 2, 3],
74-
"weighted_a": [1 * 10 + 2 * 5, 3 * 8 + 1 * 4, -2 * 1 + 2 * 2, 12 * 3],
75-
"val": [20 / 15, 28 / 9, (-2 * 1 + 2 * 2) / 3, 36 / 3],
76-
}
77-
)
78-
assert_frame_equal(agg_a, expected_agg_a)
79-
80-
81-
def test_weighted_nation_sum():
82-
dataFrame = pd.DataFrame(
83-
{
84-
"state": [
85-
"al",
86-
"al",
87-
"ca",
88-
"ca",
89-
"nd",
90-
],
91-
"timestamp": np.hstack((np.zeros(3), np.ones(2))),
92-
"a": [1, 2, 3, 4, 12],
93-
"b": [5, 6, 7, np.nan, np.nan],
94-
"population_served": [10, 5, 8, 1, 3],
95-
}
96-
)
97-
weighted = generate_weights(dataFrame, column_aggregating="a")
98-
agg = weighted_nation_sum(weighted, "a")
99-
expected_agg = pd.DataFrame(
100-
{
101-
"timestamp": [0.0, 1],
102-
"relevant_pop_a": [10 + 5 + 8, 1 + 3],
103-
"weighted_a": [1 * 10 + 2 * 5 + 3 * 8, 1 * 4 + 3 * 12],
104-
"val": [44 / 23, 40 / 4],
105-
"geo_id": ["us", "us"],
106-
}
107-
)
108-
assert_frame_equal(agg, expected_agg)
5+
from delphi_nwss.run import add_needed_columns
1096

1107

1118
def test_adding_cols():

0 commit comments

Comments
 (0)