Put a Stop to Data Swamps with Event-Driven Data Testing

Ensure data quality in your S3 data lake using Python, AWS Lambda, SNS, and Great Expectations.

Data lakes used to have a bad reputation when it comes to data quality. In contrast to data warehouses, data doesn’t need to adhere to any predefined schema before we can load it in. Without proper testing and governance, your data lake can easily turn into a data swamp. In this article, we’ll look at how to build automated data tests that will be executed any time new data is loaded to a data lake. We’ll also configure SNS-based alerting to get notified about data that deviates from our expectations.


Python libraries for data quality

There are so many tools for data profiling and data testing out there. Just to list some of them: 

  • Pandas Profiling allows generating an HTML report showing quantile statistics, histograms, correlations, NULL value distribution, text analysis, categorical variables with high cardinality, and many more.
  • dbt Tests let us validate the uniqueness, accepted values, NULL values, and build any custom data test to detect anomalies by using SQL queries,
  • Bulwark provides decorators for functions that return Pandas dataframes ex. @dc.HasNoNans(),
  • mobyDQ a tool from Ubisoft to generate GraphQL-based web application for data validation,
  • TensorFlow Data Validation to detect anomalies in training and model serving data.

We’ll focus on the open-source Python library for validating and profiling data called Great Expectations


Using Great Expectations

The recommended way of using Great Expectations is to:

  • install the package: pip install great_expectations,
  • initialize a project (great_expectations –v3-api init),
  • configure a connection to your data source ex. your data warehouse or flat files for Pandas or Pyspark validation (great_expectations –v3-api datasource new),
  • create an initial expectations suite either manually, interactively using a batch of data, or automatically using a built-in profiler (great_expectations –v3-api suite new),
  • edit this expectations suite in a Jupyter notebook (great_expectations –v3-api suite edit suite_name) or directly modifying a JSON file (great_expectations/expectations/<suite_name>.json),
  • create a checkpoint mapping the expectations suite to a data_asset_name which is the actual data that you want to test (great_expectations –v3-api checkpoint new checkpoint_name),
  • run the validation process on a new batch of data (great_expectations –v3-api checkpoint run checkpoint_name),
  • finally, figure out how to deploy it and run on schedule, ex. by creating a Python script (great_expectations –v3-api checkpoint script suite_name).

If you want to implement all these steps, you can follow this official tutorial. And here is the most important page of the entire documentation, listing all expectations you can use.

If you look at all the above bullet points, you may notice that this setup is quite involved. And it doesn’t even cover how to package and deploy the code to make it production-ready, how to set up alerts on failure, how to host and share the data docs, and how to build a repeatable process around it for data testing in data pipelines.


Using Great Expectations for event-driven data testing

Let’s try to approach it in a more “Pythonic” way. We want to use Great Expectations on data stored in AWS S3. We want data tests to run automatically any time a new file arrives in S3. While Great Expectations provides a data source connector for Athena, it would require running the validation on an entire Athena table rather than validating only a specific batch of data loaded to a data lake. Thus, it would be harder to track down which S3 PUT operation caused anomalies in the respective data source. 

As an alternative to the Athena data source, we could configure the expectations suite using Pandas and flat files as a data source, but even then the entire process seems a bit cumbersome as mentioned in the last section. 

Let’s look at how we can approach it only using Python — no configuration files. The ultimate goal is to deploy this script to AWS Lambda and configure the function to be triggered on each S3 PUT operation to the desired path in our S3 data lake.

Monitor serverless applications

Demo: generating time series data for testing

We’ll start by generating an hourly time series with a deliberately chosen range of values. Generating a synthetic dataset will allow us to conveniently insert additional “bad values” and see if our data tests detect those anomalies. Here is an example dataset we will be using:

dataset data testing
Example dataset — image by author

Implementing data tests using Great Expectations

Which tests can we run for this data?

By and large, any potential anomalies can (and should) be tested. For instance, we can validate:

  • the order of columns,
  • the row count, i.e. the granularity of our time series → since we are dealing with hourly time series data, we can expect 24 rows per day provided that no data is missing (and that there is no Daylight Saving Time!),
  • the presence of any potential NULL values,
  • data types → the timestamp should be a datetime column, while value is an integer column,
  • whether the range of values matches our expectations → in this example, it must be between 0 and 100.

How to implement data tests?

Here is an example implementation of those tests in a single class. It allows running each data test individually, as well as running all tests at once. Additionally, the method parse_data_test_result() sends an SNS email alert on any failed data test.

import json
import boto3
import logging
import pandas as pd
from great_expectations import from_pandas
from great_expectations.dataset.pandas_dataset import PandasDataset
from great_expectations.core.expectation_validation_result import (
    ExpectationValidationResult,
)


class TimeseriesDataTestRunner:
    def __init__(
        self,
        s3_path: str,
        aws_region: str = "eu-central-1",
        sns_topic_arn: str = "arn:aws:sns:eu-central-1:338306982838:ge_timeseries_data_test",
        dt_column: str = "timestamp",
        dt_column_dtype: str = "datetime64[ns]",
        nr_column: str = "value",
        nr_column_dtype: str = "int64",
        min_value: int = 0,
        max_value: int = 100,
        min_total_row_count: int = 672,
        max_total_row_count: int = 744,
    ):
        self.s3_path = s3_path
        self.aws_region = aws_region
        self.sns_topic_arn = sns_topic_arn
        self.dt_column = dt_column
        self.dt_column_dtype = dt_column_dtype
        self.nr_column = nr_column
        self.nr_column_dtype = nr_column_dtype
        self.min_value = min_value
        self.max_value = max_value
        self.min_total_row_count = min_total_row_count
        self.max_total_row_count = max_total_row_count
        self.logger = logging.getLogger()
        self.logger.setLevel(logging.INFO)

    def send_email_alert(self, message: str) -> None:
        sns = boto3.client("sns", region_name=self.aws_region)
        sns_result = sns.publish(TopicArn=self.sns_topic_arn, Message=message)
        self.logger.info("SNS email sent. Result: %s", sns_result)

    def parse_data_test_result(
        self, validation_result: ExpectationValidationResult,
    ) -> None:
        test_result = validation_result.to_json_dict()
        if test_result["success"]:
            self.logger.info("Data test passed for dataset %s.", self.s3_path)
        else:
            alert = (
                f"Data test failed for dataset: {self.s3_path}."
                f" VALIDATION RESULT: {json.dumps(test_result)}"
            )
            self.logger.error(alert)
            self.send_email_alert(alert)

    def test_order_of_columns(
        self, ge_df: PandasDataset
    ) -> ExpectationValidationResult:
        return ge_df.expect_table_columns_to_match_ordered_list(
            column_list=[self.dt_column, self.nr_column]
        )

    def test_row_count(self, ge_df: PandasDataset) -> ExpectationValidationResult:
        """Hourly time series in monthly batches:
                31 days * 24 h = 744 rows,
                28 days * 24 h = 672 rows"""
        return ge_df.expect_table_row_count_to_be_between(
            max_value=self.max_total_row_count, min_value=self.min_total_row_count
        )

    def test_null_values_nr_column(
        self, ge_df: PandasDataset
    ) -> ExpectationValidationResult:
        return ge_df.expect_column_values_to_not_be_null(column=self.nr_column)

    def test_null_values_dt_column(
        self, ge_df: PandasDataset
    ) -> ExpectationValidationResult:
        return ge_df.expect_column_values_to_not_be_null(column=self.dt_column)

    def test_data_type_nr_column(
        self, ge_df: PandasDataset
    ) -> ExpectationValidationResult:
        return ge_df.expect_column_values_to_be_of_type(
            column=self.nr_column, type_=self.nr_column_dtype
        )

    def test_data_type_dt_column(
        self, ge_df: PandasDataset
    ) -> ExpectationValidationResult:
        return ge_df.expect_column_values_to_be_of_type(
            column=self.dt_column, type_=self.dt_column_dtype
        )

    def test_value_range(self, ge_df: PandasDataset) -> ExpectationValidationResult:
        return ge_df.expect_column_values_to_be_between(
            column=self.nr_column, min_value=self.min_value, max_value=self.max_value
        )

    def run_data_tests(self, dataframe: pd.DataFrame,) -> None:
        ge_df = from_pandas(dataframe)

        tests_to_run = [
            self.test_order_of_columns(ge_df),
            self.test_row_count(ge_df),
            self.test_null_values_nr_column(ge_df),
            self.test_null_values_dt_column(ge_df),
            self.test_data_type_nr_column(ge_df),
            self.test_data_type_dt_column(ge_df),
            self.test_value_range(ge_df),
        ]
        for data_test in tests_to_run:
            self.parse_data_test_result(data_test)

To create an SNS topic for email alerts, you can run the code below. Then, follow the link from the AWS email to confirm your SNS email subscription.

import boto3

sns = boto3.client("sns", region_name="eu-central-1")

# CREATE TOPIC
topic_name = "ge_timeseries_data_test"
create_response = sns.create_topic(Name=topic_name)
topic_arn = create_response.get("TopicArn")
print("Create topic response: %s", create_response)

# CREATE SUBSCRIPTIONS
email_sub = sns.subscribe(
    TopicArn=topic_arn, Protocol="email", Endpoint="youremail@gmail.com"
)

How to run data tests locally?

You probably want to test your data locally before moving on to production. The data tests from TimeseriesDataTestRunner can be executed on a local development machine. The code snippet below implements:

  • one happy-path test that will succeed because it generates data that matches our expectations,
  • seven failing tests corresponding to the above test cases → by deliberately generating skewed data, we can ensure that our tests are working correctly and detect data that deviates from our expectations.
import logging
import pandas as pd

from src.timeseries_data_generator import TimeseriesGenerator
from src.timeseries_data_test_runner import TimeseriesDataTestRunner

DATASET = "local_test_without_s3"
DEFAULT_START_DATE = "2021-07-01"
DEFAULT_END_DATE = "2021-07-31 23:59"
SKEWED_END_DATE = "2021-07-25 23:59"


def _run_data_tests(df: pd.DataFrame) -> None:
    TimeseriesDataTestRunner(s3_path=DATASET).run_data_tests(df)


def test_happy_path() -> None:
    df = TimeseriesGenerator(
        start_date=DEFAULT_START_DATE, end_date=DEFAULT_END_DATE
    ).get_timeseries()
    _run_data_tests(df)


def test_incorrect_order_of_columns() -> None:
    """Order of columns here is the opposite of what we expect in a time series"""
    df = TimeseriesGenerator(
        start_date=DEFAULT_START_DATE, end_date=DEFAULT_END_DATE
    ).get_timeseries()
    df = df[["value", "timestamp"]]
    _run_data_tests(df)


def test_incomplete_data() -> None:
    """Data doesn't contain the full month of July"""
    df = TimeseriesGenerator(
        start_date=DEFAULT_START_DATE, end_date=SKEWED_END_DATE
    ).get_timeseries()
    _run_data_tests(df)


def test_missing_nr_values() -> None:
    df = TimeseriesGenerator(
        start_date=DEFAULT_START_DATE, end_date=DEFAULT_END_DATE
    ).get_timeseries()
    df.at[0, "value"] = None
    _run_data_tests(df)


def test_missing_timestamp() -> None:
    df = TimeseriesGenerator(
        start_date=DEFAULT_START_DATE, end_date=DEFAULT_END_DATE
    ).get_timeseries()
    df.at[0, "timestamp"] = None
    _run_data_tests(df)


def test_incorrect_data_type_nr_column() -> None:
    df = TimeseriesGenerator(
        start_date=DEFAULT_START_DATE, end_date=DEFAULT_END_DATE
    ).get_timeseries()
    df["value"] = df["value"].astype(float)
    _run_data_tests(df)


def test_incorrect_data_type_dt_column() -> None:
    df = TimeseriesGenerator(
        start_date=DEFAULT_START_DATE, end_date=DEFAULT_END_DATE
    ).get_timeseries()
    df["timestamp"] = df["timestamp"].astype(str)
    _run_data_tests(df)


def test_incorrect_value_range() -> None:
    df = TimeseriesGenerator(
        start_date=DEFAULT_START_DATE,
        end_date=DEFAULT_END_DATE,
        min_value=0,
        max_value=120,
    ).get_timeseries()
    _run_data_tests(df)


if __name__ == "__main__":
    logging.basicConfig(
        format="[%(levelname)s] [%(name)s] [%(asctime)s]: %(message)s", level="INFO"
    )
    test_happy_path()
    test_incorrect_order_of_columns()
    test_incomplete_data()
    test_missing_nr_values()
    test_missing_timestamp()
    test_incorrect_data_type_nr_column()
    test_incorrect_data_type_dt_column()
    test_incorrect_value_range()

When we run this locally, we should receive seven emails similar to this one:

Email about failed data test — image by author

How to run data tests on AWS Lambda?

The most efficient way to run those tests automatically is to build a Lambda function with an S3 PUT object event trigger. This way, any time a new file gets uploaded to the specified S3 location, the Lambda function will be automatically triggered to test our data. 

To accomplish that, we need a Lambda function that will read the S3 key from the event metadata, read the uploaded file into a Pandas dataframe, and finally run the tests. Here is a simple implementation of that:

import json
import logging
import awswrangler as wr
from timeseries_data_test_runner import TimeseriesDataTestRunner

logger = logging.getLogger()
logger.setLevel(logging.INFO)


def handler(event, context):
    logger.info("Received event: " + json.dumps(event))
    bucket = event["Records"][0]["s3"]["bucket"]["name"]
    key = event["Records"][0]["s3"]["object"]["key"]
    s3_path = f"s3://{bucket}/{key}"
    df = wr.s3.read_parquet(s3_path)
    TimeseriesDataTestRunner(s3_path=s3_path).run_data_tests(df)

To build this Lambda function, we need:

Note that timeseries_data_test_runner.py, timeseries_data_generator.py and lambda.py are located in the src folder. This is important if you want to use the Dockerfile shown above. The project structure looks as follows (the last file will be explained in the next section):

|– Dockerfile

|– requirements.txt

|– src

|   |– lambda.py

|   |– timeseries_data_generator.py

|   `– timeseries_data_test_runner.py

`– upload_new_timeseries_to_s3.py

Now all that is left is to:

  • #1 Create your Lambda function, 
  • #2 Adjust the memory and timeout settings based on your use case (the defaults are too small for data testing — you can allocate up to 10GB of memory to your Lambda function and set timeout to up to 15 minutes), 
  • #3 Configure the S3 trigger, 
  • #4 Set the IAM permissions for the Lambda function so that it can read the files from S3 and trigger the SNS alert,
  • #5 Test the process by uploading new files to your data lake.
create lambda function
#1 Create your Lambda function — image by author
change lambda memory and timeout
#2 Change the memory and timeout settings— image by author
add S3 trigger
#3 Set the S3 trigger — image by author
add S3 trigger
IAM permissions
#4 Set the IAM permissions — image by author


Testing the AWS process by uploading new files to a data lake

Similar to how we ran data tests locally, we can now start uploading our data to S3 and see the alerts being triggered due to failed data tests on AWS.

If you execute all the tests above, you should receive seven emails from AWS, similar to those:

test trigger s3
Results of failed data tests being triggered automatically on each upload to S3 — image by author

The biggest advantage of our custom alerts is that they show exactly which S3 file upload caused the specific data test to fail. This is something that seems to be hard to accomplish when using the default implementation of Great Expectations. 

Additionally, using purely Pandas-based expectations makes testing easier and seems to be more “Pythonic” than working with configuration files and Jupyter notebooks. However, the downside of the presented approach is the lack of data docs. If you care about those, have a look at this official tutorial.


How to monitor this process?

As you can imagine, if you would build data tests in Lambda for a large number of S3 datasets, monitoring can become overwhelming. Dashbird is a platform that addressed this problem by providing dashboards and well-formatted logs for monitoring, alerting, and observability of serverless resources on AWS.

The image below shows how Dashbird can help detect issues and bottlenecks in your serverless data tests.

dashbird data test
monitor serverless data test
Monitoring of serverless data tests in Dashbird — image by the author

Conclusion

In this article, we looked at various Python libraries for data profiling and testing. We examined two ways of using Great Expectations: the traditional config-file-based method as well as a more “Pythonic” do-it-yourself approach using custom test runner and email alerts. We then investigated how to execute data tests locally, as well as how to ensure automated test runs after any new file gets uploaded to S3.


Further reading:

How to test serverless applications?

How can a shared Slack channel improve your data quality?

Can data lakes accelerate building ML data pipelines?

Read our blog

4 Tips for AWS Lambda Performance Optimization

In this article, we’re covering 4 tips for AWS Lambda optimization for production. Covering error handling, memory provisioning, monitoring, performance, and more.

AWS Lambda Free Tier: Where Are The Limits?

In this article we’ll go through the ins and outs of AWS Lambda pricing model, how it works, what additional charges you might be looking at and what’s in the fine print.

AWS Lambda vs EC2: A Comparative Guide

Made by developers for developers

Dashbird was born out of our own need for an enhanced serverless debugging and monitoring tool, and we take pride in being developers.

What our customers say

Dashbird gives us a simple and easy to use tool to have peace of mind and know that all of our Serverless functions are running correctly. We are instantly aware now if there’s a problem. We love the fact that we have enough information in the Slack notification itself to take appropriate action immediately and know exactly where the issue occurred.

Thanks to Dashbird the time to discover the occurrence of an issue reduced from 2-4 hours to a matter of seconds or minutes. It also means that hundreds of dollars are saved every month.

Great onboarding: it takes just a couple of minutes to connect an AWS account to an organization in Dashbird. The UI is clean and gives a good overview of what is happening with the Lambdas and API Gateways in the account.

I mean, it is just extremely time-saving. It’s so efficient! I don’t think it’s an exaggeration or dramatic to say that Dashbird has been a lifesaver for us.

Dashbird provides an easier interface to monitor and debug problems with our Lambdas. Relevant logs are simple to find and view. Dashbird’s support has been good, and they take product suggestions with grace.

Great UI. Easy to navigate through CloudWatch logs. Simple setup.

Dashbird helped us refine the size of our Lambdas, resulting in significantly reduced costs. We have Dashbird alert us in seconds via email when any of our functions behaves abnormally. Their app immediately makes the cause and severity of errors obvious.