Monitoring platform for keeping systems up and running at all times.
Full stack visibility across the entire stack.
Detect and resolve any incident in record time.
Conform to industry best practices.
Ensure data quality in your S3 data lake using Python, AWS Lambda, SNS, and Great Expectations.
Data lakes used to have a bad reputation when it comes to data quality. In contrast to data warehouses, data doesn’t need to adhere to any predefined schema before we can load it in. Without proper testing and governance, your data lake can easily turn into a data swamp. In this article, we’ll look at how to build automated data tests that will be executed any time new data is loaded to a data lake. We’ll also configure SNS-based alerting to get notified about data that deviates from our expectations.
There are so many tools for data profiling and data testing out there. Just to list some of them:
We’ll focus on the open-source Python library for validating and profiling data called Great Expectations.
The recommended way of using Great Expectations is to:
If you want to implement all these steps, you can follow this official tutorial. And here is the most important page of the entire documentation, listing all expectations you can use.
If you look at all the above bullet points, you may notice that this setup is quite involved. And it doesn’t even cover how to package and deploy the code to make it production-ready, how to set up alerts on failure, how to host and share the data docs, and how to build a repeatable process around it for data testing in data pipelines.
Let’s try to approach it in a more “Pythonic” way. We want to use Great Expectations on data stored in AWS S3. We want data tests to run automatically any time a new file arrives in S3. While Great Expectations provides a data source connector for Athena, it would require running the validation on an entire Athena table rather than validating only a specific batch of data loaded to a data lake. Thus, it would be harder to track down which S3 PUT operation caused anomalies in the respective data source.
As an alternative to the Athena data source, we could configure the expectations suite using Pandas and flat files as a data source, but even then the entire process seems a bit cumbersome as mentioned in the last section.
Let’s look at how we can approach it only using Python — no configuration files. The ultimate goal is to deploy this script to AWS Lambda and configure the function to be triggered on each S3 PUT operation to the desired path in our S3 data lake.
We’ll start by generating an hourly time series with a deliberately chosen range of values. Generating a synthetic dataset will allow us to conveniently insert additional “bad values” and see if our data tests detect those anomalies. Here is an example dataset we will be using:
By and large, any potential anomalies can (and should) be tested. For instance, we can validate:
Here is an example implementation of those tests in a single class. It allows running each data test individually, as well as running all tests at once. Additionally, the method parse_data_test_result() sends an SNS email alert on any failed data test.
import json import boto3 import logging import pandas as pd from great_expectations import from_pandas from great_expectations.dataset.pandas_dataset import PandasDataset from great_expectations.core.expectation_validation_result import ( ExpectationValidationResult, ) class TimeseriesDataTestRunner: def __init__( self, s3_path: str, aws_region: str = "eu-central-1", sns_topic_arn: str = "arn:aws:sns:eu-central-1:338306982838:ge_timeseries_data_test", dt_column: str = "timestamp", dt_column_dtype: str = "datetime64[ns]", nr_column: str = "value", nr_column_dtype: str = "int64", min_value: int = 0, max_value: int = 100, min_total_row_count: int = 672, max_total_row_count: int = 744, ): self.s3_path = s3_path self.aws_region = aws_region self.sns_topic_arn = sns_topic_arn self.dt_column = dt_column self.dt_column_dtype = dt_column_dtype self.nr_column = nr_column self.nr_column_dtype = nr_column_dtype self.min_value = min_value self.max_value = max_value self.min_total_row_count = min_total_row_count self.max_total_row_count = max_total_row_count self.logger = logging.getLogger() self.logger.setLevel(logging.INFO) def send_email_alert(self, message: str) -> None: sns = boto3.client("sns", region_name=self.aws_region) sns_result = sns.publish(TopicArn=self.sns_topic_arn, Message=message) self.logger.info("SNS email sent. Result: %s", sns_result) def parse_data_test_result( self, validation_result: ExpectationValidationResult, ) -> None: test_result = validation_result.to_json_dict() if test_result["success"]: self.logger.info("Data test passed for dataset %s.", self.s3_path) else: alert = ( f"Data test failed for dataset: {self.s3_path}." f" VALIDATION RESULT: {json.dumps(test_result)}" ) self.logger.error(alert) self.send_email_alert(alert) def test_order_of_columns( self, ge_df: PandasDataset ) -> ExpectationValidationResult: return ge_df.expect_table_columns_to_match_ordered_list( column_list=[self.dt_column, self.nr_column] ) def test_row_count(self, ge_df: PandasDataset) -> ExpectationValidationResult: """Hourly time series in monthly batches: 31 days * 24 h = 744 rows, 28 days * 24 h = 672 rows""" return ge_df.expect_table_row_count_to_be_between( max_value=self.max_total_row_count, min_value=self.min_total_row_count ) def test_null_values_nr_column( self, ge_df: PandasDataset ) -> ExpectationValidationResult: return ge_df.expect_column_values_to_not_be_null(column=self.nr_column) def test_null_values_dt_column( self, ge_df: PandasDataset ) -> ExpectationValidationResult: return ge_df.expect_column_values_to_not_be_null(column=self.dt_column) def test_data_type_nr_column( self, ge_df: PandasDataset ) -> ExpectationValidationResult: return ge_df.expect_column_values_to_be_of_type( column=self.nr_column, type_=self.nr_column_dtype ) def test_data_type_dt_column( self, ge_df: PandasDataset ) -> ExpectationValidationResult: return ge_df.expect_column_values_to_be_of_type( column=self.dt_column, type_=self.dt_column_dtype ) def test_value_range(self, ge_df: PandasDataset) -> ExpectationValidationResult: return ge_df.expect_column_values_to_be_between( column=self.nr_column, min_value=self.min_value, max_value=self.max_value ) def run_data_tests(self, dataframe: pd.DataFrame,) -> None: ge_df = from_pandas(dataframe) tests_to_run = [ self.test_order_of_columns(ge_df), self.test_row_count(ge_df), self.test_null_values_nr_column(ge_df), self.test_null_values_dt_column(ge_df), self.test_data_type_nr_column(ge_df), self.test_data_type_dt_column(ge_df), self.test_value_range(ge_df), ] for data_test in tests_to_run: self.parse_data_test_result(data_test)
To create an SNS topic for email alerts, you can run the code below. Then, follow the link from the AWS email to confirm your SNS email subscription.
import boto3 sns = boto3.client("sns", region_name="eu-central-1") # CREATE TOPIC topic_name = "ge_timeseries_data_test" create_response = sns.create_topic(Name=topic_name) topic_arn = create_response.get("TopicArn") print("Create topic response: %s", create_response) # CREATE SUBSCRIPTIONS email_sub = sns.subscribe( TopicArn=topic_arn, Protocol="email", Endpoint="youremail@gmail.com" )
You probably want to test your data locally before moving on to production. The data tests from TimeseriesDataTestRunner can be executed on a local development machine. The code snippet below implements:
import logging import pandas as pd from src.timeseries_data_generator import TimeseriesGenerator from src.timeseries_data_test_runner import TimeseriesDataTestRunner DATASET = "local_test_without_s3" DEFAULT_START_DATE = "2021-07-01" DEFAULT_END_DATE = "2021-07-31 23:59" SKEWED_END_DATE = "2021-07-25 23:59" def _run_data_tests(df: pd.DataFrame) -> None: TimeseriesDataTestRunner(s3_path=DATASET).run_data_tests(df) def test_happy_path() -> None: df = TimeseriesGenerator( start_date=DEFAULT_START_DATE, end_date=DEFAULT_END_DATE ).get_timeseries() _run_data_tests(df) def test_incorrect_order_of_columns() -> None: """Order of columns here is the opposite of what we expect in a time series""" df = TimeseriesGenerator( start_date=DEFAULT_START_DATE, end_date=DEFAULT_END_DATE ).get_timeseries() df = df[["value", "timestamp"]] _run_data_tests(df) def test_incomplete_data() -> None: """Data doesn't contain the full month of July""" df = TimeseriesGenerator( start_date=DEFAULT_START_DATE, end_date=SKEWED_END_DATE ).get_timeseries() _run_data_tests(df) def test_missing_nr_values() -> None: df = TimeseriesGenerator( start_date=DEFAULT_START_DATE, end_date=DEFAULT_END_DATE ).get_timeseries() df.at[0, "value"] = None _run_data_tests(df) def test_missing_timestamp() -> None: df = TimeseriesGenerator( start_date=DEFAULT_START_DATE, end_date=DEFAULT_END_DATE ).get_timeseries() df.at[0, "timestamp"] = None _run_data_tests(df) def test_incorrect_data_type_nr_column() -> None: df = TimeseriesGenerator( start_date=DEFAULT_START_DATE, end_date=DEFAULT_END_DATE ).get_timeseries() df["value"] = df["value"].astype(float) _run_data_tests(df) def test_incorrect_data_type_dt_column() -> None: df = TimeseriesGenerator( start_date=DEFAULT_START_DATE, end_date=DEFAULT_END_DATE ).get_timeseries() df["timestamp"] = df["timestamp"].astype(str) _run_data_tests(df) def test_incorrect_value_range() -> None: df = TimeseriesGenerator( start_date=DEFAULT_START_DATE, end_date=DEFAULT_END_DATE, min_value=0, max_value=120, ).get_timeseries() _run_data_tests(df) if __name__ == "__main__": logging.basicConfig( format="[%(levelname)s] [%(name)s] [%(asctime)s]: %(message)s", level="INFO" ) test_happy_path() test_incorrect_order_of_columns() test_incomplete_data() test_missing_nr_values() test_missing_timestamp() test_incorrect_data_type_nr_column() test_incorrect_data_type_dt_column() test_incorrect_value_range()
When we run this locally, we should receive seven emails similar to this one:
The most efficient way to run those tests automatically is to build a Lambda function with an S3 PUT object event trigger. This way, any time a new file gets uploaded to the specified S3 location, the Lambda function will be automatically triggered to test our data.
To accomplish that, we need a Lambda function that will read the S3 key from the event metadata, read the uploaded file into a Pandas dataframe, and finally run the tests. Here is a simple implementation of that:
import json import logging import awswrangler as wr from timeseries_data_test_runner import TimeseriesDataTestRunner logger = logging.getLogger() logger.setLevel(logging.INFO) def handler(event, context): logger.info("Received event: " + json.dumps(event)) bucket = event["Records"][0]["s3"]["bucket"]["name"] key = event["Records"][0]["s3"]["object"]["key"] s3_path = f"s3://{bucket}/{key}" df = wr.s3.read_parquet(s3_path) TimeseriesDataTestRunner(s3_path=s3_path).run_data_tests(df)
To build this Lambda function, we need:
Note that timeseries_data_test_runner.py, timeseries_data_generator.py and lambda.py are located in the src folder. This is important if you want to use the Dockerfile shown above. The project structure looks as follows (the last file will be explained in the next section):
|– Dockerfile
|– requirements.txt
|– src
| |– lambda.py
| |– timeseries_data_generator.py
| `– timeseries_data_test_runner.py
`– upload_new_timeseries_to_s3.py
Now all that is left is to:
Similar to how we ran data tests locally, we can now start uploading our data to S3 and see the alerts being triggered due to failed data tests on AWS.
If you execute all the tests above, you should receive seven emails from AWS, similar to those:
The biggest advantage of our custom alerts is that they show exactly which S3 file upload caused the specific data test to fail. This is something that seems to be hard to accomplish when using the default implementation of Great Expectations.
Additionally, using purely Pandas-based expectations makes testing easier and seems to be more “Pythonic” than working with configuration files and Jupyter notebooks. However, the downside of the presented approach is the lack of data docs. If you care about those, have a look at this official tutorial.
As you can imagine, if you would build data tests in Lambda for a large number of S3 datasets, monitoring can become overwhelming. Dashbird is a platform that addressed this problem by providing dashboards and well-formatted logs for monitoring, alerting, and observability of serverless resources on AWS.
Get started free
The image below shows how Dashbird can help detect issues and bottlenecks in your serverless data tests.
In this article, we looked at various Python libraries for data profiling and testing. We examined two ways of using Great Expectations: the traditional config-file-based method as well as a more “Pythonic” do-it-yourself approach using custom test runner and email alerts. We then investigated how to execute data tests locally, as well as how to ensure automated test runs after any new file gets uploaded to S3.
Further reading:
How to test serverless applications?
How can a shared Slack channel improve your data quality?
Can data lakes accelerate building ML data pipelines?
In this guide, we’ll talk about common problems developers face with serverless applications on AWS and share some practical strategies to help you monitor and manage your applications more effectively.
Today we are announcing a new, updated pricing model and the end of free tier for Dashbird.
In this article, we’re covering 4 tips for AWS Lambda optimization for production. Covering error handling, memory provisioning, monitoring, performance, and more.
Dashbird was born out of our own need for an enhanced serverless debugging and monitoring tool, and we take pride in being developers.
Dashbird gives us a simple and easy to use tool to have peace of mind and know that all of our Serverless functions are running correctly. We are instantly aware now if there’s a problem. We love the fact that we have enough information in the Slack notification itself to take appropriate action immediately and know exactly where the issue occurred.
Thanks to Dashbird the time to discover the occurrence of an issue reduced from 2-4 hours to a matter of seconds or minutes. It also means that hundreds of dollars are saved every month.
Great onboarding: it takes just a couple of minutes to connect an AWS account to an organization in Dashbird. The UI is clean and gives a good overview of what is happening with the Lambdas and API Gateways in the account.
I mean, it is just extremely time-saving. It’s so efficient! I don’t think it’s an exaggeration or dramatic to say that Dashbird has been a lifesaver for us.
Dashbird provides an easier interface to monitor and debug problems with our Lambdas. Relevant logs are simple to find and view. Dashbird’s support has been good, and they take product suggestions with grace.
Great UI. Easy to navigate through CloudWatch logs. Simple setup.
Dashbird helped us refine the size of our Lambdas, resulting in significantly reduced costs. We have Dashbird alert us in seconds via email when any of our functions behaves abnormally. Their app immediately makes the cause and severity of errors obvious.