Bulk Mutation in an Integration Data Lake with Spark
Data lakes act as repository of data from various sources, possibly of different formats. It can be used to build data warehouse or to perform other data analysis activities. Data lakes are generally built on top of Hadoop Distributed File (HDFS), which is append only. HDFS is essentially WORM file system i.e. Write Once and Read Many Times.
In an integration scenario, however your source data streams may have updates and deletes. This post is about performing updates and deletes in an HDFS backed data lake.The Spark based solution is available in my open source project chombo.
Virtual Update and Delete
In databases like HBase, backed by HDFS you can perform updates and deletes. How do they do it, when the underlying storage only allows append? When you update, HBase simply creates a new version of a record by appending. When you query it returns the latest version of a record.
Periodically, it goes through a background garbage collection process, where it rewrites the data and while doing that it discards all the older versions of a record.
For deletes, a record is marked for delete. During the garbage collection process, it does not write any record marked for delete.
Mutation in a Data Lake
We will use a similar approach. The solution is based on following assumptions about the source data.
- Data is flat record oriented
- Each record has an unique key comprising of one or more fields
- There is one field which is either a time stamp or a monotonically increasing sequence
- As data trickles in from various sources, the Spark job runs to integrate the data into the data lake.
We will discuss two approaches to the solution, depending on how deletes are handled. In the first approach, the source system is burdened with some extra work. It’s characterized as follows
- For inserts and updates the source system sends the only newly inserted and updated records since the last synchronization
- For deletes, the source system send the deleted records, since the last synchronization. It implies that the source system has to save and track the deleted records.
- It operates in 2 modes depending on the type of mutation, whether insert/update or delete. The configuration parameter mutation.op is set accordingly.
In the second approach, inserts and updates are handled the same way, but the deletes are handled in a different way. Here is how it works
- For inserts and updates the source system sends the only newly inserted and updated records since the last synchronization
- For deletes, the source system send all the records in the database. This operation can be performed only after all the source systems have sent all their records.Thisoperation may involve large amount of data.
- It operates in 2 modes depending on the type of synchronization, whether partial for insert/update or full for delete. The configuration parameter sync.mode is set accordingly
Integration Spark Job
The complete data set is stored in a set of directories and files in HDFS. The incremental data arriving from the source system is saved in a separate set of directories and files in HDFS.
The complete data is processed along with the incremental data set to create a new complete data set which reflects all the mutations in the source systems. Inserts, updates and deletes are handled in the following ways
- In insert/update mode, for updates, there will be multiple records for a given key. The latest one as per time stamp or sequence number is written.
- In insert/update mode, for inserts, there will be only one record for a given key. The record is written
- For delete mode, in the first approach, if there are two records for a given key, none of them is written. With the second approach, for a given key there will be only one record, which is the existing record,. The record is not written.
The Spark job is implemented in the Scala object RecordSetBulkMutator. It reads the existing data and incremental data in two separate RDD. Then it performs a group by key operation to process insert, update and delete.
Retail Supply Chain
We will use retail inventory data for a retailer as the use case. Consider a retailer that has multiple online and brick and mortar stores supported by some some distribution centers.Inventory data for all the distributions centers are integrated into a data lake. Each distribution center might send all the changes made to the database once a day.
The data lake might be used used for various purposes. The stores will use it get inventory data across all distributions centers when placing orders for store inventory replenishment. It could also be used for various analytics and reporting purposes.
The data has the following fields. The first two fields comprise the unique key for a record.
- distribution center ID
- product ID
- product category ID
- product brand ID
- inventory amount
- product price
- time stamp
Here is some sample data. The data is update heavy, because inventory changes on a daily basis. When a new product is introduced in any distribution center, it will cause an insert in the data lake. When a product is retired, a delete operation will be processed in the data lake.
Data lakes act as repository of data from various sources, possibly of different formats. It can be used to build data warehouse or to perform other data analysis activities. Data lakes are generally built on top of Hadoop Distributed File (HDFS), which is append only. HDFS is essentially WORM file system i.e. Write Once and Read Many Times.
In an integration scenario, however your source data streams may have updates and deletes. This post is about performing updates and deletes in an HDFS backed data lake.The Spark based solution is available in my open source project chombo.
Virtual Update and Delete
In databases like HBase, backed by HDFS you can perform updates and deletes. How do they do it, when the underlying storage only allows append? When you update, HBase simply creates a new version of a record by appending. When you query it returns the latest version of a record.
Periodically, it goes through a background garbage collection process, where it rewrites the data and while doing that it discards all the older versions of a record.
For deletes, a record is marked for delete. During the garbage collection process, it does not write any record marked for delete.
Mutation in a Data Lake
We will use a similar approach. The solution is based on following assumptions about the source data.
- Data is flat record oriented
- Each record has an unique key comprising of one or more fields
- There is one field which is either a time stamp or a monotonically increasing sequence
- As data trickles in from various sources, the Spark job runs to integrate the data into the data lake.
We will discuss two approaches to the solution, depending on how deletes are handled. In the first approach, the source system is burdened with some extra work. It’s characterized as follows
- For inserts and updates the source system sends the only newly inserted and updated records since the last synchronization
- For deletes, the source system send the deleted records, since the last synchronization. It implies that the source system has to save and track the deleted records.
- It operates in 2 modes depending on the type of mutation, whether insert/update or delete. The configuration parameter mutation.op is set accordingly.
In the second approach, inserts and updates are handled the same way, but the deletes are handled in a different way. Here is how it works
- For inserts and updates the source system sends the only newly inserted and updated records since the last synchronization
- For deletes, the source system send all the records in the database. This operation can be performed only after all the source systems have sent all their records.Thisoperation may involve large amount of data.
- It operates in 2 modes depending on the type of synchronization, whether partial for insert/update or full for delete. The configuration parameter sync.mode is set accordingly
Integration Spark Job
The complete data set is stored in a set of directories and files in HDFS. The incremental data arriving from the source system is saved in a separate set of directories and files in HDFS.
The complete data is processed along with the incremental data set to create a new complete data set which reflects all the mutations in the source systems. Inserts, updates and deletes are handled in the following ways
- In insert/update mode, for updates, there will be multiple records for a given key. The latest one as per time stamp or sequence number is written.
- In insert/update mode, for inserts, there will be only one record for a given key. The record is written
- For delete mode, in the first approach, if there are two records for a given key, none of them is written. With the second approach, for a given key there will be only one record, which is the existing record,. The record is not written.
The Spark job is implemented in the Scala object RecordSetBulkMutator. It reads the existing data and incremental data in two separate RDD. Then it performs a group by key operation to process insert, update and delete.
Retail Supply Chain
We will use retail inventory data for a retailer as the use case. Consider a retailer that has multiple online and brick and mortar stores supported by some some distribution centers.Inventory data for all the distributions centers are integrated into a data lake. Each distribution center might send all the changes made to the database once a day.
The data lake might be used used for various purposes. The stores will use it get inventory data across all distributions centers when placing orders for store inventory replenishment. It could also be used for various analytics and reporting purposes.
The data has the following fields. The first two fields comprise the unique key for a record.
- distribution center ID
- product ID
- product category ID
- product brand ID
- inventory amount
- product price
- time stamp
Here is some sample data. The data is update heavy, because inventory changes on a daily basis. When a new product is introduced in any distribution center, it will cause an insert in the data lake. When a product is retired, a delete operation will be processed in the data lake.
EZLM1D,Z5RG258E8363SW,B1SC710M53,Z4760444J819,208,6.50,1541818018
EZLM1D,ZCQ38B0725H39G,B1SC710M53,U14JHB6H9250,285,5.07,1541818018
EZLM1D,K8Y06H241STI12,AWG8KF0FLH,5DQ031C07OVO,43,12.03,1541818018
EZLM1D,403CHU5GKB0J3J,AWG8KF0FLH,5DQ031C07OVO,170,3.79,1541818018
EZLM1D,G7K8I3GNK4VO2D,P0JR8OCKR8,7U39R8H1L66L,46,11.47,1541818018
Versioning and Change Capture
As data gets updated, optionally older versions of records can be saved in a specified directory location. To enable this feature, the parameter maintain.version needs to be set to true. An output directory path for the versioned data is provided through the parameter versioned.filePath.
This feature could be useful, for example, for historical trend analysis for inventory level. This kind analysis could be useful for inventory management.
You might be building a data warehouse and you may want to keep track of the slowly changing dimensions.
Mutation in Hive and Impala
Although we have focused on mutation on raw HDFS, many databases backed by HDFS also have started supporting updates and deletes. Since our solution by passes the SQL engine it is likely to be more efficient.
If you are using Hive, you can do updates and deletes in recent versions of Hive. For delete in Hive, you need to know the where clause conditions or list of the records that need to be deleted.
With Impala, you can update as long as you are using Kudu storage. Deletes are also supported with Kudu tables.
Wrapping Up
We have a gone through a Spark based solution for integration in data lake including all mutating operations. In the example, we have shown how to integrate changes for one table. The process can be repeated for as many tables as necessary. You could follow the tutorial to execute the supply chain use case.
Originally posted here.