You are here: Home Technologies ETL / EAI / Data Warehousing / OLAP / Reporting White Paper

White Paper

High transactional performance and Unit of Work Management in parallel distributed ETL processing.

Background:

Unit of Work management during high performance transaction processing in a parallel, distributed ETL presents the following main challenges:

1. Preservation of Referential Integrity at the target ODS as transactions from various heterogeneous systems are processed.

2. Serialization of transactions to prevent deadlocks and preserve update order as appropriate in the target ODS. This is also known as the isolation level in the RDBMS world.

3. Load balancing and scalability optimization and management under the constraints of 1 & 2 above.

4. Recoverability & fault-tolerance in the event of failure, specifically distributed commit failure at target ODS.

Problem formulation:

Transactions with a commit boundary occur in a source application system somewhere in a corporate topology. The application might not be based on a relational database[1] and have a well-defined, relational transaction handling protocol. A transaction can incorporate insert, update or delete to one or more source system entities or files. Entire committed transaction or unit of work (UofW) from an application is send to a queues[2] in a round robin fashion for load balancing. All transaction components are “tagged” with a unique transaction id and end of transaction (commit) or message for a given transaction id will always be application generated[3] to a queue. Queues will be subscribed to, UofW components collected and absorbed by the adapters or listeners of the ETL engine[4]. Source UofW components can be (for example) multi-table, hierarchical structures exhibiting parent-child data relationship or path, thus it is important to reflect the order precedence in populating of the target data architecture when updates to more then one[5] source table are involved in the UofW. The preservation of order or referential integrity at the target can be translated into foreign key path chasing and there are a number of algorithms[6] to accomplish this. Components of a transaction or UofW can be send by the application, transmitted by the queue manager and received by the ETL job instances in any order. Order of application in the target is important.

The target data architecture can be homogeneous (one Oracle server for example) or heterogeneous and distributed, thus transaction coordination across[7] heterogeneous target resources needs to be managed. Target architecture needs to provide data visibility in the right order of precedence (take into account referential integrity constraints) to all transactions published on the queues. Thus queries on the target platform should always reflect consistent[8], committed data across all entities on production platforms.

Since transaction volume can become significant and latency of target architecture is an issue, the process of transaction acquisition from queues needs to be parallelized[9] and managed. Status of parallel transaction processing should be easy to monitor, restart and control by production support staff.

For the purposes of this work it is assumed that there exists a standardized, well-defined set of ETL transformation and jobs each populating one or more target tables with the granularity of insert, update, insert and update, delete operations. These jobs need to be structured, parallelized, coordinated, optimized and managed in the ETL environment to manage transaction flow as described above. This specifically means, among other things, that there has to be order of precedence or sequencing mechanism for job distribution, execution and coordination formulated. This sequencing mechanism needs to be decoupled from the rest of the parallel ETL transformation logic. Furthermore, since this problem is being solved utilizing ETL itself, the solution will be metadata driven which adds significant benefit to the entire development process and enhancements / maintenance of the data architecture.

Failure and restart mechanism needs to be incorporated as transactional flow can “fail” at any point[10]. The process needs effective alert and monitoring capabilities. Granular and informative error and warning messaging based on enterprise standards needs to be enforced.

Proposed ETL based solution:

The high level diagram above depicts the proposed Unit of Work (UofW) management utilizing parallel ETL for transaction processing[11]. The flow is as follows:

1. Application Source System performs a unit of work (UofW) that generates some database update activity. Application source releases committed delta changes either via database triggers or from an application server via API to a Collect updates process. All delta records for a given unit of work are tagged by the application source with unique UofW identification.

2. Collect updates process collects all components of the unit of work. Components can be i/u/d events for one or more source tables. Collect updates process releases all components of an application committed transaction to one of the queues (from a pool of N available queues) in a round robin to increase parallelism and facilitate load balancing.

3. Each individual message queue receives delta records of the UofW. Message queues, each with unique message tags and id’s, publish delta records to ETL adapters and thus to ETL job instances that subscribe to delta changes. There is an established correspondence (usually 1:1) between message queues and ETL job instances.

4. ETL adapters[12] of a job instance forward delta records to ETL transformation engine that applies ETL logic to append delta records with UofW ID’s[13] in a memory resident serialization log[14] and set proper status. Serialization log is constructed to be shared between ETL job instances and serialize updates to target ODS by managing conflict/resolution at the record level for any target ODS table. Continuous flow transformation components are utilized to minimize latency.

5. Utilizing continuous flow capabilities within a given ETL process[15] delta records are transformed. There can be M transformation job instances to achieve desired scalability. Each instance can in turn have its own, internal parallel processing. Please note that key transformations can also take place from sources to some intermediate targets before delta changes will be used to update the target ODS. Therefore it is only important to observe the effects of immediate ancestors[16] of the target ODS tables for referential integrity and UofW serialization. All intermediate transformations can and should be done in parallel and continuously whenever possible by the hardware architecture. To accommodate for all components of a given UofW that are parellalized in the transform phase, Trans ID In is initialized in the serialization log upon entry of a new UofW into a transformation job instance, a push on the stack equivalent operation.

6. Transformed delta records are collected for a given UofW until end of UofW is derived or send from the source system. Specifically, as components of UofW are received from transformation job instances, Trans ID Out is updated for a given UofW in the serialization log, a pop from the stack equivalent operation. Until Trans ID In list matches Trans ID Out list, the UofW is not released further. Collection step is choke point and is required before the final update to the target ODS in order to parallelize independent update, preserve referential integrity and serialize transactions to achieve the desired isolation level. There is a 1:1 correspondence Append UofW ID ETL job instances and Collect UofW instances. As all components of a given UofW are gathered, they are registered in the serialization log along with (Target Table (T), Target Primary Key (K)) pairs used as signatures to identify conflict with other updates in progress to the target table at the record level and serialize to avoid the conflict.

7. When end of UofW (for a given UofW ID) is available in the Collect UofW step, the ETL process will lookup and append sequence numbers[17] for all delta records within a given UofW ID. These sequence numbers represent the precedence order with which to apply updates to target databases. Sequence numbers can be the same value for a given UofW, meaning they can be executed simultaneously. This increases the degree of parallelism in the subsequent processing steps. In the case where one source table can update multiple target tables, an ascending list or a vector of sequence numbers will be assembled for a given source (delta) record.

8. To achieve desired serializability, UofW need to be assigned to the proper ETL job instance from a pool of ETL job instances utilized to update target ODS tables. This is done by examining the signatures or (T,K) pairs of a given UofW against all “in-progress” UofW to determine if there is a conflict at the record level of any target table. If there is a conflict, the offending UofW and all its components are send to the same ETL job instance as conflicting “in-progress” UofW. This is guaranteed to work no matter how complex a UofW is! If there is no conflict then a round robin approach is used to achieve scalability and load balancing. Please note that steps 6,7 and 8 utilize continuous flow components and are 1:1 with one another, no further parallization opportunities are found. Blocking I/O is only there to facilitate serialization when conflict at target table record level occurs. This isolation level can be relaxed as specifyied by the application. It is also assumed for the purposes of this document that all UofW numbers are monotonically increasing from the source application representing the order in which transactions where issued, thus further blocking can also be incorporated to process transactions in increasing order and wait if out of order UofW transaction id’s are received for update! However, this is a very severe limitation and most applications do not require this level of serialization.

9. Finally, a UofW delta records with their sequence id’s are applied to target databases from a determined ETL job instance as follows:

a. Sort delta records by UofW ID, Seq ID (in case of Seq Id is a vector sort by th first element, loop throught the rest sequentially within a list.

b. Start transaction

i. Perform updates to all databases and tables for a given UofW utilizing Seq Id’s in ascending order.

ii. Record queue acknowledgements to a table in the target database[18]

c. Commit or Rollback all the updates for a given UofW. If a delta record failed within a UofW, it is not clear how to handle future units of work where a delta record has the same key value as the previously failed delta record. For one, the future delta record can be a “correction” or “reversal” record and should be automatically allowed to go through. However, if its not a correction, it should be blocked. All failed records should be either found in an error log and/or their status set properly in a serialization log thus ETL transform logic should determine the proper action to take with the next delta record with the same key value as the previously failed delta record in a previous UofW. ETL business rules should deal with startup or initialization logic to handle resolution to failed transaction and processing of all pending transactions.

Sequence generation algorithm and ETL pre-process.

Sequence generation algorithm pre-computes sequence numbers for the target data architecture. This is accomplished by extracting target data architecture[19]: table identifiers with corresponding primary key, foreign key metadata. Utilizing this metadata a directed graph of table dependencies is created and numbered. The numbers and corresponding table identifiers are stored in a lookup table that is referred to during ETL transaction processing to properly sequence transactions for a given Unit of Work. The specifics of the algorithm please refer to the detailed design document to follow.

High Level Component Design

Sequence numbering in Append Sequence component

Purpose:

Pre-process target table dependencies in order to perform data operations on sets of target tables, ordered such that no referential integrity constraints are violated.

Approach:

Order the target tables based on the FK definitions and assign to each a number, such that, given two tables (X,Y), the one with the lower number will be processed before the one with the higher number. If the tables have equal numbers, they may be processed independently.

This is accomplished by constructing a graph of the dependencies (DG) where the tables are nodes and the FK definitions are directed edges between the nodes.

After the DG is constructed, a topological traversal of DG is performed, assigning numbers to each node. The fully qualified table names (and/or corresponding unique Table ID’s) and calculated sequence numbers are captured in a fast lookup store to be called Sequence Hash. Sequence Hash is relatively static and will be produced and updated as part of the pre-process before transaction processing. Thus, given a set of fully qualified table names as input, the sequence numbers can be fetched from the Sequence Hash, to create an ordered list for execution in Append Sequence process. It is intended for Sequence Hash file production to be a pre-process before transaction processing occurs and for Append Sequence process to just use the Sequence Hash file as pre-computed input.

Traversal Algorithm:

Terms defined:

  • Node - any table on the DG
  • Top Node – Table with no parent node
  • Root – terminal node or node with no children node(s)
  • Level – defines the level of DG from 1 for top nodes to N for root nodes.

 

1. Two hash tables (Parent, Child) representing the DG are initialized with Table ID’s and FK derived parent-child relationships[20]. Each record in Parent contains  (Table ID, list of child Table ID’s). Each record in Child contains (Table ID, list of parent Table ID’s). Every target table will have a record in the Parent but only those tables that have a parent will be in the Child. Thus Top Node(s) can be computed by Parent minus Child, where minus is a set operation.

2. After Parent and Child hash tables are initialized, recursive algorithm is applied to derive Sequence Hash:

a. Set the Current Sequence Number (CSN) or current graph level to 1

b. Until Parent and Child are empty do the following:

i. Compute the Top Node(s) and assign CSN

ii. Insert CSN assigned nodes to Sequence Hash as (TableID, CSN)

iii. Remove Top Node(s) from both Parent and Child

iv. Increment CSN by 1

Psuedo Code:

1      graph level = 1

2      while graph is not empty

a. fetch set of root nodes from graph

b. for each root node X

i. insert entry (X, graph level) into output

ii. for each node A that is a child of X

1. remove edge (X, A)

iii. delete node X from graph

c. increment graph level

Sequence Hash usage in Append Sequence:

For a given transaction or UofW, the set of target tables are obtained (from the metadata repository). For each target table in the set, the fully qualified table name or corresponding Table ID is used as a key into the Sequence Hash to obtain the current dependency sequence number. The target tables affected by the transaction are sorted and processed by numerically ascending order.

A detailed Perl code example:

# read the metadata, create the graph structures PHASH and CHASH

While($record=<INPUT>) # read each FK definition

{

chomp $record;

($parent,$child)=split(/\s+/,$record);  # parse out parent and child tokens

$PHASH{$parent}{$child}=undef;  # create the parent and child hash record

$CHASH($child}{$parent}=undef;  # with keys. no value, use undef

}

# topsort the graph, creating output hash THASH

for($glevel=1;(scalar(keys %PHASH))>0;$glevel++)

{

foreach $parent (keys %PHASH)  # get parents

{

next if exists($CHASH{$parent}); # skip if it’s a child, it’s not a root

$THASH{$parent}=$glevel;        # assign dependency sequence number

foreach $child (keys %{$PHASH{$parent}}) # remove parent-child edges

{

delete $CHASH{$child}{$parent};

delete $CHASH{$child} # remove from childhash if no parents

if (scalar(keys %{CHASH{$child}}))==0);

}

delete $PHASH{$parent};   # remove root from the graph

}

}

Serialization log & serialization process component

Purpose:

To achieve coordinated serialization of transactional update to target ODS across a parallel ETL implementation.

Approach:

Create a serialization process that can be queried via API. This process will be responsible to maintain a serialization log in memory and interact with multiple ETL job instances. The following API operations allow the serialization process to maintain the serialization log

1. Handle = HandShake(userid, pw, machine id, port)
2. Initialize()
3. Insert() (both single record or a vector of records)
4. Update()
5. Mdelete() (Mark for deletion. Deletes will be managed by the process)
6. Truncate()
7. ETL JI ID = Conflict()  (Determine if there is a conflict with other “in progress” UofW)

The serialization log is represented in memory as follows:

Conflict

ID

UofW

Queue ID

ETL JI ID

Signature

Trans ID In

Trans ID Out

Status

 

 

 

 

 

 

 

 


Where:

  • Conflict Represents the ID that identifies the row in serialization log where the conflict occurred. This is useful for debugging and to identify what ETL JI ID to use to send this UofW to upon conflict.
  • ID is a unique row identifier generated by the serialization process.
  • UofW is a unit of Work identifier. There can be multiple UofW as there can be multiple i/u/d statements within the UofW.
  • Queue ID is the ID of the original Queue that send this UofW to the ETL Append UofW ID step as above.
  • ETL JI ID is an ETL job instance ID used to process a particular UofW in the serialization log.
  • Signature is represented by the Target Table, Primary Keys list (T,K) where K can be a list of Primary key values if a primary key consists of more then one field. This is not strictly relational, thus a cell in a row can have lists as an acceptable data type. Signatures or (T,K) pairs can be compared and conflict can be established if exists utilizing Conflict() API to synchronization process from an ETL job instance. If conflict exists, the Conflict() function sets the Conflict flag on and will return the ETL JI ID to which the conflicting record should be assigned.
  • Trans ID In – In a push/pop fashion, as UofW component(s) enter a transformation job instance the transformation job instance must register the UofW in the Trans ID In, the field is a list of ID’s.
  • Trans ID Out – As the componets of UofW enter Collect UofW step, they are registered in Trans ID Out list, or popped of the stack. Collect UofW waits until all ID’s in the Trans ID In are in the Trans ID Out list.
  • Status can simply be “in progress”, “Committed”, “Failed”, “Deleted”.
    • Deleted and Committed records are ignored by the Conflict() API.
    • Deleted and Committed records are periodically purged by the synchronization process from in-memory synchronization log to an audit trail on disk.

 


[1] Data in source systems can also be hierarchical, network, flat, etc.

[2] This document assumes a publish/subscribe messaging engines such as Tibco, MQ-Series, etc.

[3] Sometimes end of transaction event needs to be defined utilizing business rules.

[4] There can be one ETL job instance per queue.

[5] Sometimes, in advanced transformations one delta source record can generate multiple updates to the target. In finance as an example this can be seen when 1 transaction record generates N financial events.

[6] Regardless of the algorithm used, a foreign key relationship of the target data architecture can be converted to a directed graph of dependencies and pre-computed with sequence numbers. Sequence numbers on the same level of the graph are to be equal in value.

[7] Important part of coordination includes deadlock resolution and the degree of serializability during ODS update. Degree of serializability is an application decision and should be easily accommodated.

[8] The degree of data consistency and enforcement is known in the database world as isolation level. SQL read consistency is the isolation level that needs to be achieved as part of data acquisition to ODS.

[9] There are 3 major ways to parallelize utilizing modern ETL engines:

1. Piped - continuous flow of transactional data from one ETL component to the next without waiting for previous transactions to complete all of the ETL transformation logic.

2. Process - independent target in the ETL transforms are forked and processed independently.

3. Data - ETL processes are replicated for segments of data to achieve constant performance as transactional volumes rise.

[10] Specific issues such as what to do with a pending transaction with the same key value as the previous transaction that failed – need to be addressed in the ETL business rules.

[11] It is assumed in this document that error and warning messages and logs are granular and their standardized use is part of ETL transformation logic.

[12] ETL adapter architecture is typically either multi-threaded or multi-process capable and parallelizable.

[13] Unit of Work ID, if not passed through source system can sometimes be derived and produced by business rules.

[14] Serialization log is control by a process via API. It is read/write stable, memory based for performance and contains the some of the following fields: TrgtID, Conflict, ID, UofW, Signature, Qcurr, Qdisp, Status. For details please refer to the design document.

[15] It is not efficient to start and stop ETL processes during transaction processing, thus a set of ETL processes should be started once for the duration of transaction processing. Two ETL processes per server CPU usually yields optimal performance results. There can also be a pool of ETL processes that is started upon initialization and managed. More on this to be found  in the design documentation.

[16] Given DG of tables in an ETL transformation logic, ancestors of a target table are one level below and are direct children to a given target table. Thus a target table “inherits” data from its ancestors.

[17] Sequence numbering is derived utilizing metadata dynamically from foreign key chasing algorithms describe briefly in the next section and in more detail in the design document.

[18] There can be a shadow process that during commit reads acknowledgements and sends them to the message queue manager. This is important because if commit fails, queue information will be resend, if commit succeeds queue information receipt will ensure that records will not be resend by the queue manager. This same shadow process can clear all successfully committed UofW from serialization log by marking them as deleted and letting the process in charge of the serialization log physically remove marked as deleted during “idle” periods.

[19] The target data architecture can be imagined here in the form of E-R diagram reflecting foreign key relationships between target tables.

[20] FK relationships can be derived directly from target architecture DBMS data dictionary or from ETL metadata.