Pluggable Rule Driven Data Validation with Spark
Data validation is an essential component in any ETL data pipeline. As we all know most Data Engineers and Scientist spend most of their time cleaning and preparing their databefore they can even get to the core processing of the data.
In this post we will go over a pluggable rule driven data validation solution implemented on Spark. Earlier I had posted about the same solution implemented on Hadoop. This post can be considered as a sequel to the earlier post. The solution is available in my open source project chombo on github.
Anatomy of a Validator
A validator essentially a simple predicate that expresses some constraint on the data. Here are some examples
- Value of a numeric field should be less than xxx
- A date field should have the format xxx
- A date should later than a specified date
- A text field size should be xxx
- A text field should comply with email format
- A categorical field value should be member of a pre defined set of values
There are are more than 30 such validators available out of the box in chombo. You can get the complete list of validators with description from my earlier post.
A validator is characterized as follows. A aalidator can be though of as an UDF that takes a field value as input and return true if the field is valid and false otherwise.
- It has an unique tag
- It’s a Java class that extends a base class
- Custom validator classes can easily be created
- Most validators require some configuration parameters defined
- Multiple validators can be applied to a column
When multiple validators are applied to a field, if at least one of them fails, the field is considered to have failed validation. For a given, field, even after one validator has failed, the rest of the validators in the chain are still executed, so that we can get complete diagnostic of a field that failed validation.
There are some validators that apply to the whole row. Ar tun time these validator get passed the whole row.
Validator Configuration
As in most of my projects, I rely heavily on meta data defined in a JSON file, for example the meta data file for the retail order data that we will be using in this post. It provides meta data on the on the fields in data set being used.
Lot of validator configuration can be gleaned from the meta data files. This is natural as part of metadata consists of various constraints on the data. This meta data file has a list of validator tags for each field in the data set that needs validation. Additional configuration is also available. For example, if we are enforcing max value for some numeric field, we can use max meta data for that field.
There are cases where the validator may require more complex configuration or for the kind of configuration required meta data file is not the natural home. For example, we have a validation check for membership with product ID for our use case.
When the cardinality of the set is small, the members can be defined in the meta data file. That’s not the case for product IDs. In our case we get the set of valid product ID, by reading certain column of a product file. The configurations related to this are defined in the configuration file, which is based on Typesafe configuration format.
Retail Data Analytic Hub
Let’s talk about our use case. Imagine a retailer that has multiple brick and mortar stores. Every few days each store uploads sales data since the last upload to the company data center, where is gets collected, consolidated, cleaned and analyzed.
One of the tasks in the pipe line is data validation. Here are the list of fields along with validators configured for each field in the data set.
- Date of order – (ensureDate, min)
- Store ID – (exactLength)
- Store zip code – (notMissing, membership)
- Order ID
- Product ID – (notMissing, membershipExtSrc)
- Product category – (notMissing, membership)
- Order quantity – (ensureInt, min)
- Monetary amount – (max)
The list of validators in each field is not exhaustive. There could be other many others validator applicable. Here is some sample input data
2018-07-29,DU7PIQ80,94509,4T16WXSXU3DP,4E0B1WFTK4,general,4,26.16
2018-07-29,DU7PIQ80,94509,4T16WXSXU3DP,KQJ6X2LD08,bakery,3,20.91
2018-07-29,DU7PIQ80,94509,BDI8Q9548ZMV,5Q78LFUM12,frozen food,1,4.16
2018-07-29,DU7PIQ80,94509,BDI8Q9548ZMV,8X4U3K5IX3,paper goods,5,9.25
2018-07-29,DU7PIQ80,94509,BDI8Q9548ZMV,2VJ4IF6S5F,produce,3,16.05
2018-07-29,DU7PIQ80,94509,BDI8Q9548ZMV,GMM4848K54,canned food,4,5.92
2018-07-29,DU7PIQ80,94509,BDI8Q9548ZMV,TA9M4BLKUW,canned food,1,1.92
Our retailer is tech savvy and has deployed teams of Data Scientists for performing various Data Science tasks. Here are some example Data Science tasks that could be highly beneficial for the business.
- Association rule mining of products bought together to help improving merchandise layout in stores
- Demand forecasting for various products to help making better purchase and inventory management decisions for the retailer warehouse.
- Analyzing weekly cycle of number of items sold or number of orders and use the insight gained for proper staffing at the stores.
All these lofty initiatives are doomed, if the data quality is poor. For example, if some product IDs are incorrect, output from association rule mining will be useless.
Data Validation Spark Job
The data validator Spark job is implemented in scala object DataValidator. The output can be configured in multiple ways. All the output modes can be controlled with proper configuration.
All the output, include the invalid records could go to the same directory. The invalid data could also be separated and written to a separate directory. In this case the main output directory will contain only valid data.
The invalid fields could be annotated in two ways. Each field in each record could be annotated with all the validators for which the field failed. Alternatively the invalid fields could overwritten with a mask. Here is some sample invalid records.
2018-07-29,DU7PIQ80,94509,R1N29FU8GI32,5H1758IPCY,GeaM#membership,0#min,6.02
2018-07-29,DU7PIQ80,94509,R1N29FU8GI32,W8I61V5I6G,bakery,4,#max
2018-07-29,DU7PIQ80H75#exactLength,94509,4M04TT2PB7VM,8S331G92T7,bKkBry#membership,1,2.82
2018/07/29#ensureDate#min,DU7PIQ80,94509,8K0H0LH9X6TI,59PO8KQEP4,general,0#min,992.32
2018-07-29,DU7PIQ80,94509,6692CU1RY735,JIS3ASRD36,KaperWgoods#membership,3,7.89
2018-07-29,DU7PIQ806#exactLength,94509,6692CU1RY735,335Q54KCYS,cleaner,1,2.76
We can see the validator tags separated by a configurable delimiter and appended to the field in the output. These tags correspond to the validators that failed on the field. The invalid data could be used in many ways, For example, the invalid data could be be quarantined, until root causes have been identified and appropriate remedial actions have been taken.
Record Wise Validation
So far, we have applied validators independently on a per column basis. But some times the validation logic is more complex and involves multiple fields. For example, if the value of field a is x then the value of field b should be greater than y.
Currently there is very limited support for native record wise validation. One should be able to write boolean expressions for validation in terms of multiple fields. There should be a run time support to parse the expression, build a parse tree and execute the parse tree. I have implemented something along that line. But it needs to be integrated with The Spark validation job and tested.
Piped Record Wise Validation
Piping to an external script is another option. Record wise validation by piping to an external script is supported in chombo. With the current example we are doing record wise validation involving product ID and monetary amount fields.
A file provides maximum monetary amount for each product, be performing statistical modeling of historical data and finding a max threshold for each product based on z score. A python script checks whether the monetary amount exceed max threshold.
Here are the steps for configuring row wise validators.
- Define tag for your validator, which has to start with piped. The tag for our validator is pipedMaxMonetaryAmount
- Define all row validators in the schema file. Look for rowValidators element in the JSON file
- Define configuration for validator in the configuration file. The parameter scriptdefines the script you want to run. In our case it’s a python script. The parameter config defines a list of configuration parameters needed by the script. The parameter workingDir contains the working directory for the script
- All scripts and any file referred by the script should be copied to the workingDir in each node of the Spark cluster.
Here one record that failed row validation. Notice the failed row validators are tagged at the end of the 3rd record, separated by ##.
2018-07-29,DU7PIQ80ID#exactLength,#notMissing#membership,XE2BI1NHI6NC,JW9FDE906N,cFeanJr#membership,2,7.78
2018-07-29,DU7PIQ80,94509,AE43VPQ39XU7,8S4666972K,cleaner,0#min,736.91##pipedMaxMonetaryAmount
2018/07/29#ensureDate#min,DU7PIQ80O#exactLength,94509,AE43VPQ39XU7,0KSL332LAT,baVeAy#membership,2,852.31##pipedMaxMonetaryAmount
2018-07-29,DU7PIQ80Q#exactLength,94509,EX3T97U5SGC0,8ABHZY1689,cleUneU#membership,3,3.15
2018-07-29,DU7PIQ80,94509,XTJ3FJAET93N,CFE3H9T4RB,pouBtrK#membership,1,3.77
A word of caution about about the piping.You will find significant slow down of your Spark job, because for each row validator and each row, a process is spawned. It is recommended to wrap all row level validation in one script and have only one row level validator.
Statistical Modeling in Data Validation
Statistical modeling can play an important role in data validation. All the validation rules have one or more parameters. In some cases the parameters values are static and known.For example, it may be known that product ID field is supposed to be exactly 12 characters long.
However, in many other cases the parameters are dynamic and statistical models could be used to estimate them. For example, how do we know what the max value for monetary amount a customer has paid for certain product should be.
In other words, we want to define a max threshold such that any amount above that will be considered anomalous and flagged as invalid.
One approach will be to assume a normal distribution for the monetary amount for each product and set the threshold to mean plus some multiple of standard deviation. Historical data could be used for this purpose. This multiple is also known as z score. The monetary amount is proportional to the quantity. So the same technique could be applied for the quantity, instead of the monetary amount.
Wrapping Up
We have gone through process of validating data on Spark using pluggable rule driven validation framework. The tutorial can be used for step by step execution of the use case.
Originally posted here.