Data Quality Checks
Hamilton now comes with data quality validations included! This is implemented through a simple decorator: check_output.

Adding DQ to Your DataFlow

Let's take a look at ye old marketing spend example...
import pandas as pd
def avg_3wk_spend(spend: pd.Series) -> pd.Series:
"""Rolling 3 week average spend."""
return spend.rolling(3, min_periods=1).mean()
def acquisition_cost(avg_3wk_spend: pd.Series, signups: pd.Series) -> pd.Series:
"""The cost per signup in relation to a rolling average of spend."""
return avg_3wk_spend / signups
Now, let's say we want to assure a few things about acquisition_cost...
  1. 1.
    That it consists of floats (should be obvious from the code, but we want to be sure)
  2. 2.
    That it is greater than 0 (highly unlikely that a customer pays you for advertising)
  3. 3.
    That it is less than `$1000` (anything this high likely means a data issue)
Furthermore, let's say we want the pipeline to log a warning (as opposed to just failing out) if any of the above conditions aren't met.
This is easy with the check_output decorator!
import pandas as pd
import numpy as np
from hamilton import function_modifiers
def avg_3wk_spend(spend: pd.Series) -> pd.Series:
"""Rolling 3 week average spend."""
return spend.rolling(3, min_periods=1).mean()
@function_modifiers.check_output(
range=(0,1000),
data_type=np.float64,
importance="warn"
)
def acquisition_cost(avg_3wk_spend: pd.Series, signups: pd.Series) -> pd.Series:
"""The cost per signup in relation to a rolling average of spend."""
return avg_3wk_spend / signups
It takes in a series of arguments -- you can discover these by exploring the code (more documentation of specific arguments to follow): https://github.com/stitchfix/hamilton/blob/main/hamilton/data_quality/default_validators.py.
You can also utilize pandera to the same effect, simply by providing a schema argument:
import pandas as pd
import numpy as np
from hamilton import function_modifiers
def avg_3wk_spend(spend: pd.Series) -> pd.Series:
"""Rolling 3 week average spend."""
return spend.rolling(3, min_periods=1).mean()
@function_modifiers.check_output(
schema=pa.SeriesSchema(
pa.Column(float),
pa.Check.in_range(0,1000)),
importance="warn"
)
def acquisition_cost(avg_3wk_spend: pd.Series, signups: pd.Series) -> pd.Series:
"""The cost per signup in relation to a rolling average of spend."""
return avg_3wk_spend / signups pyt
Finally, if you want to implement your own checks, you can explore the DataValidator class and the check_output_custom decorator.

Running your Pipelines and Examining the Results

Let's run the pipeline above to get the result of acquisition_cost, injecting some bad data along the way...
import importlib
import logging
import sys
import pandas as pd
from hamilton import driver
logging.basicConfig(stream=sys.stdout)
initial_columns = { # load from actuals or wherever -- this is our initial data we use as input.
# Note: these values don't have to be all series, they could be a scalar.
'signups': pd.Series([1, 10, 50, 100, 200, 400]),
'spend': pd.Series([10, 10, 20, 40, 40, 50]),
}
# we need to tell hamilton where to load function definitions from
module_name = 'my_functions'
module = importlib.import_module(module_name)
dr = driver.Driver(initial_columns, module)
df = dr.execute(['acquisition_cost'])
print(df.to_string())
When we run this, we get the following:
python my_script.py
WARNING:hamilton.data_quality.base:[acquisition_cost:range_validator]
validator failed. Message was: Series contains 5 values in range (0,1000),
and 1 outside.. Diagnostic information is:
{'range': (0, 1000), 'in_range': 5, 'out_range': 1, 'data_size': 6}.
acquisition_cost
0 10.000000
1 5000.500000
2 666.866667
3 333.533333
4 0.166667
5 0.483333
Note that it completed successfully, yet printed out the warning. When we set it to fail, we get an error (as expected). Let's go back to look at the results, and see what we can learn...
If we modify our script to return the data quality results as well, we can capture the results for later use!
import importlib
import logging
import sys
import dataclasses
import pprint
import pandas as pd
from hamilton import base
from hamilton import driver
logging.basicConfig(stream=sys.stdout)
initial_columns = { # load from actuals or wherever -- this is our initial data we use as input.
# Note: these values don't have to be all series, they could be a scalar.
'signups': pd.Series([1, 10, 50, 100, 200, 400]),
'spend': pd.Series([10, 100000, 20, 40, 40, 500]),
}
# we need to tell hamilton where to load function definitions from
module_name = 'my_functions'
module = importlib.import_module(module_name)
adapter = base.SimplePythonGraphAdapter(base.DictResult())
dr = driver.Driver(initial_columns, module, adapter=adapter)
all_validator_variables = [
var.name for var in dr.list_available_variables() if
var.tags.get('hamilton.data_quality.contains_dq_results')]
data = dr.execute(['acquisition_cost'] + all_validator_variables)
pprint.pprint(dataclasses.asdict(data['acquisition_cost_range_validator']))
pprint.pprint(dataclasses.asdict(data['acquisition_cost_data_type_validator']))
Note there's some magic above -- we're working on improving querying and reporting. As always, if you have ideas for the API, let us know. Running the above yields the following:
WARNING:hamilton.data_quality.base:[acquisition_cost:range_validator] validator failed. Message was: Series contains 5 values in range (0,1000), and 1 outside.. Diagnostic information is: {'range': (0, 1000), 'in_range': 5, 'out_range': 1, 'data_size': 6}.
{'diagnostics': {'data_size': 6,
'in_range': 5,
'out_range': 1,
'range': (0, 1000)},
'message': 'Series contains 5 values in range (0,1000), and 1 outside.',
'passes': False}
{'diagnostics': {'actual_dtype': dtype('float64'),
'required_dtype': <class 'numpy.float64'>},
'message': "Requires subclass of datatype: <class 'numpy.float64'>. Got "
'datatype: float64. This is a match.',
'passes': True}
Knowing that data quality produces a series of nodes make this very powerful -- you can query and grab the results, enabling you to programmatically react to them down the line.
This concludes our brief lessons on managing data quality in Hamilton -- we hope you spin it up and give it a try.
In the mean time, we have more detailed documentation here - https://github.com/stitchfix/hamilton/blob/main/data_quality.md and some more examples (including distributed systems support) - https://github.com/stitchfix/hamilton/tree/main/examples/data_quality.