Create data pipeline

The Create data pipeline plugin, also referred as the ETL plugin, allows users to compose event logs from one or more tables or data sources. It aims to integrate data from multiple sources by extracting, merging, transforming, and loading to gain essential business insights for competitive advantage.

To define a data pipeline, click on Create data pipeline icon ETL038

../_images/ETL002.png

The Data Pipeline Creation window appears.

../_images/ETL003.png

Four operations can be done in the ETL pipeline: Extract, Merge, Transform, Load, each with its view.

Note

We can switch between the four main views. However, to create the insightful result, it is essential to complete steps in the default order: Extract – Merge – Transform – Load.

Extract data

The Extract view is used for extracting data from a data source. We can extract data from five sources: relational databases (PostgreSQL, and MySQL), Local File System, Apromore’s Workspace or AWS S3 bucket.

Local file system

To upload a file from the local file system, click the File upload button. This open our local file system where the file can be selected and uploaded.

Note

Files uploaded from the local file system should be in CSV or parquet file format.

Apromore workspace

To upload a file from Apromore Workspace, click the Workspace button. This opens the file in the Apromore Workspace. Select one or more files and click Extract.

../_images/ETL004.png

Apromore dataset importer automatically discover the column headers in the dataset alongside their datatype.

../_images/ETL005.png

We can click on the edit icon ETL039 to change the column header or click the dropdown to change the datatype.

../_images/ETL006.png

Click Import to import the data.

Note

In ETL data pipelines, start timestamps cannot be automatically discovered.

MySQL and PostgreSQL databases

To upload a file a MySQL or PostgreSQL database, click on the PostgreSQL or MySQL button. Enter the connection details of the PostgreSQL or MySQL database. Choose the suitable Connection Type option from the drop-down. Click Connect.

../_images/ETL007.png

AWS S3 bucket

Every Apromore tenancy is connected to an Amazon S3 bucket that stores files in the tenancy. We can either extract the log from a file or folder in the S3 bucket.

  • Extract a file from the S3 bucket

To upload a file from an S3 bucket, click Amazon S3. The folders and files in the S3 bucket will appear in the window. Select the log file we wish to upload and click Extract.

../_images/ETL008.png

Note

Extracting a log file through Amazon S3 bucket is only available to users with superuser or data engineer roles.

Once the file is successfully extracted using any of the method, the extracted file appears in the side bar.

../_images/ETL009.png

We can preview the extracted file by clicking on the eye icon.

../_images/ETL044.png

To delete a file, click on the Trash bin icon ETL040 next to the file name.

Note

The names of the files uploaded to a data pipeline in the Extract Phase should contain only alphanumeric characters and the underscore character. If a file name contains spaces, each space is automatically replaced by an underscore character when the file is uploaded. For example, a file name “Purchase Orders” is renamed to “Purchase_Orders”. Other characters besides alphanumeric, spaces, and underscores are ignored.

  • Extract micro-batches from an S3 folder

When scheduling an ETL pipeline, we may be required to load datasets from a folder at set intervals. Micro-batch ingestion is a method where data is ingested in small, manageable batches instead of being ingested in a single, large batch.

Apromore S3 Connector supports micro-batch ingestion. In the S3 Connector, a micro-batch is a file containing a relatively small number of rows. When adding a new table to a pipeline using the S3 connector, we can specify that this table must be ingested in micro-batches, instead of it being ingested in a single go from one single file.

To do so, instead of specifying that the rows of a table originate from a single file, we specify that the rows of the table originate from a folder. In this case, the S3 Connector treats each file in the designated folder as a micro-batch. The micro-batches in the folder are processed in First-In, First-Out (FIFO) order.

If a data pipeline includes a table associated to an S3 folder (a micro-batched table), every time the pipeline is executed, Apromore identifies all the micro-batches in the S3 folder that have not been previously processed. If this number is below the maximum number of micro-batches per pipeline run, all unprocessed micro-batches will be processed in the current pipeline run. If the number of unprocessed micro-batches exceeds this limit, the unprocessed micro-batches above the limit will remain unprocessed and will be handled by subsequent pipeline runs. By default, the maximum number of micro-batches per pipeline run is 3. This limit may be adjusted upon request.

If there are no unprocessed micro-batches at the start of a pipeline run, the pipeline run is not started. In this scenario, the pipeline run will show up as “failed” in the Pipeline Management Console with a message indicating that there were no micro-batches to be processed.

Note

By design, we consider a micro-batch unprocessed if its filename is different than any other filename of previously (tentatively or successfully) processed micro-batches. As such, re-uploading a micro-batch with the same filename as an already processed micro-batch (including the case of file replacement) will not add any additional new data during a pipeline run.

If the pipeline run fails for other reasons, the micro-batches included in the pipeline run are left unprocessed, an error message is reported in the Pipeline Management console. Apromore will re-attempt to ingest these micro-batches in subsequent pipeline runs. The maximum number of ingestion attempts is three. If the ingestion of a micro-batch is not successful after three attempts, this micro-batch will not be processed anymore. A data engineer or administrator may address the reported issues and re-upload these micro-batches for subsequent processing.

Limitations

  • In any given pipeline, at most one of the tables can be associated to an S3 folder. In other words, if we associate one table in a pipeline to an S3 folder, no other table in that same pipeline can be associated to an S3 folder.

  • If a pipeline includes a table associated with an S3 folder, then the pipeline cannot be chained with another pipeline.

  • The micro-batches must be uncompressed CSV files.

  • If a pipeline includes a micro-batched table, the only available loading strategy will be “Always append to the same log”.

As an example, let’s say we have a folder “demofiles” in our S3 bucket, and we want to create an ETL pipeline that loads the micro-batches from this folder. In the Extract step, select Amazon S3, also known as S3 connector. This allows us to browse the Apromore to an S3 bucket. This operation can only be performed by users with permission to access the S3 bucket, including users with data engineering or superuser roles.

S3003

Click the Folder icon to browse the S3 bucket.

S3004

Note

The selected folder must have some files to be configured for the pipeline, and the file names in the selected folder should be unique. It is also advisable that the file names consist of the timestamp and sequence number so we can track the uploads.

Apromore displays the files and subfolders in the connected bucket. In our case, we select the demofiles folder and click Extract. By extracting a folder, we indicate that this table will be ingested in micro-batches.

S3005

Note

Ensure the files are in CSV format and consistent with the schema and datatype.

After selecting the folder, Apromore displays the folder path we have selected. Click Extract to complete the extracting step.

S3006

The dataset importer opens with a preview of the dataset. Review the data and click Import.

S3007

The folder name appears on the left side panel, indicating it has been successfully extracted.

S3008

Note

We can only import one S3 folder in a data pipeline.

Merge data

We can join two datasets based on an operation and specific keys in the two datasets. This can be useful when a file is incomplete, and the rest of the file is in a separate file. To join two files, go to the Merge tab.

../_images/ETL010.png

Note

There has to be at least two datasets to perform a merge.

Assuming the initial dataset we extracted this does not have the Invoice column. However, there is another dataset that contain the Invoice date column. We can merge both datasets to form a complete dataset. To begin, go to the Extract view and import the second dataset. After importing the two dataset, go the Merge tab. Both datasets will appear in the side panel.

../_images/ETL011.png

Select Table A and Table B from the drop down. In the Operator section, select the kind of merge operation to be performed.

There are six kinds of merge in the merge tab:

  • INNER JOIN - Returns records that have matching values in both tables.

  • FULL JOIN - Returns all records when there is a match in either left or right table.

  • LEFT JOIN - Returns all records from the left table, and the matched records from the right table.

  • RIGHT JOIN - Returns all records from the right table, and the matched records from the left table.

  • APPEND - This is used to join two event logs of a similar process. The join is done adding the second log (child process) to the end of the first log (parent process). This is useful when merging the event log of the same process but across different periods. For example, merging the order-to-cash process for the period January-March, with the event log of the same process for the period April-June.

  • PARENT CHILD APPEND - This is used where an instance of one process (the parent process) spawns one or more instances of another process (the child process). The Parent-Child union operator fills in any missing case identifiers (ID) of the parent process. This is so that we can use this ID as a case identifier when loading the resulting log into the Apromore workspace.

For illustration, below is a usage scenario of the Inner Join and Parent-Child Union.

Inner join

We consider an example with the following two tables:

  • A “Loan Application Header” table with information about each loan application, such as the Country and the Property value. This table has one row per loan application. Each loan application has a loan identifier.

  • A “Loan Application Log” table with information about each activity instance in a loan application process. For each activity instance, the tables give us the name of the activity, the start and end time, and information about the employee who performed the activity.

We can perform an inner join on the ID columns of both tables. This will combine the information from both tables by matching the ID columns.

To construct an event log from these two tables, select “Loan Application Header “as Table A, “Loan Application Log” as Table B, select the “Inner Join” operator, and select the “ID” column as the join key (both for Table A and Table B).

../_images/ETL013.png

Click Submit.

../_images/ETL014.png

The output of this operation can be loaded into Apromore using ID from any of the tables as the case identifier.

../_images/ETL016.png

Note

In the Transform step, we should select only one ID column to avoid duplicate column names.

The files corresponding to the above example are here.

Parent-child append

We consider an example with the following two tables:

  • A table corresponding to an event log of Business Travel Permit Applications (the parent table). The case identifier of this log is the business travel identifier.

  • A table corresponding to an event log of Business Travel Expense Declarations (the Child table). The case identifier of this log is the expense declaration identifier.

  • For every declaration, there is an activity called “Declaration Submitted”. The rows corresponding to this activity contain both the business travel ID and the expense declaration ID

  • None of the other events in the Business Travel Expense Declarations table contains a reference to the business travel ID. Only the activity “Declaration Submitted” contains a reference both to the business travel ID and to the expense declaration ID.

In this scenario:

  • Business Travel Permit Applications is the Parent Table. This should be marked as “Table A” in the Merge step.

  • Expense Declaration is the Child table (every business travel is the “parent” of an expense declaration). The Expense Declaration table should be marked as Table B in the Merge step.

  • The Parent Key (Key A) is Business Travel ID.

  • The Child Key (Key B) is Expense Declaration ID.

../_images/ETL015.png

Click Submit. The Parent-Child union will construct a table that contains:

  • Every row in the parent table (every activity instance of the business travel process)

  • Every row in the child table (every activity instance of the expense declaration process).

  • Every row in the output table will contain the attributes of the parent table and the attributes of the child table.

../_images/ETL041.png

The output, with all columns included, can be loaded to Apromore using the BusinessTravelID as the case identifier.

The files corresponding to this example are below:

Transform data

The Transform view is where data transformation is done. The list of tables and columns appear on the left.

../_images/ETL017.png

We use this list to select the columns we wish to transform. Click the “+” sign to add the column that needs transformation. As the columns are added, the table is displayed.

../_images/ETL018.png

To delete a column from the Transformation table, click on the red “X” button next to  the column name.

../_images/ETL043.png

Note

If the column was deleted by mistake, we can easily add it to the table again from the columns list.

../_images/ETL019.png

At any point in the transformation step, we can undo and redo a previous transformation action by using the undo and redo buttons respectively.

../_images/ETL042.png

Note

To complete transformation, we must select at least three compulsory columns: case id, activity, and timestamp.

Warning

In the case of large logs, the addition of columns might take some time. Do not click on the same column several times to prevent column duplicates in the transformed table.

To add the whole extracted table to the Transformed table, click the bold “+” button next to the table name.

../_images/ETL020.png

We can add all the columns in Table A and the Invoice_amount column in Table B.

../_images/ETL021.png

We can also create a new column based on a set condition. Suppose we wish to create a new column called Invoice_class. In this column, any case with invoice less than 100000 is filled with ‘low invoice’ while those higher than 100000 is filled with ‘high invoice’. To do this, click the New Column button in the upper right corner. Enter the name of the new column.

../_images/ETL022.png

Click Create. A new column is created.

Define the rules for the column by clicking on the Add Rules button. A window to define the transformation rule will appear. Use the drop-down menu to create the rule.

../_images/ETL023.png

Note

To add more rules, click on the Add new rule button. To delete a rule, click the Trash bin icon ETL040 in the far right.

Click OK to apply the transformation rule.

../_images/ETL024.png

To check or change the values of a column, click Edit rules beneath the name of the column. The Edit rules window appears. The existing rule is displayed which can be changed.

A transformation rule consists of an optional IF <Conditional Operation> THEN <Transformation Operation> ELSE … statement.

The following operations can be used in the <Conditional Operation>:

  • Is equal, not equal, less than, greater than, less than or equal to, greater than or equal to: This checks whether the value of the attribute is equal, not equal, less than, greater than, less than or equal to, greater than or equal to a given value respectively.

  • Is null or is not null: This checks whether or not the value of the attribute is explicitly “null”.

  • Is empty or is not empty: This checks whether or not the value of the attribute is empty, i.e., has no character.

  • Is of type: This operator returns True if the contents of a cell match the pattern expected from a value of type date time, numerical, or string, respectively. For example, when applied to a cell such as 123.4, the operation “Is of type” returns True.

  • Contains pattern: This conditional operator returns True if the contents of a cell contain a given string, for example, if a cell contains the string “Invoice”. It is also possible to specify a regular expression by clicking Regex. When the Regex option is selected, the pattern must be defined using the syntax of regular expressions of the Java programming language.

Below is an example where the Regex option has been selected in the conditional expressions. The IF condition of this rule matches a cell in the Activity column if the cell starts with the word Approve (Java regex: “^Approve”). The ELSE IF condition matches a cell in the Activity column if the cell starts with either the word Verify or with the word Check (Java regex “^(Verify|Check).”).

Transf003

When we add a column in the Transform step of a data pipeline, whether by using the “+” button next to a column or the “Create new column” button, the new column becomes a so-called user column. The user columns are the columns that will be included in the final transformed table. These columns will be available in the Load tab.

The original columns are the columns of the tables that were uploaded or extracted in the Extract step of the data pipeline.

In a rule, we can refer to the original columns or the user columns. The User columns are listed at the top of the dropdown menu.

../_images/Transf004.png

While the original columns are listed at the bottom.

../_images/Transf005.png

Note

When defining a rule to populate a column, we can refer to any other user column or original column. However, the rule to populate a column XYZ cannot refer to the column XYZ itself.

The following operations can be used in the <Transformation Operation>:

  • Value: Returns either a fixed string or the value of the selected column

  • Concatenation: Returns the concatenation of two columns. It is also possible to add a separator between the values of the concatenated columns.

  • Find and replace: Replaces the occurrences of a string with those of another. For example, below is an example where all the occurrences of the word “Center” in the System column are replaced with an empty string in such a way that the word “Center” is removed in the resulting column.

  • Trim: Removes the N first characters or the last N characters of a string

  • Arithmetic operators: Addition, Subtraction, Multiplication, or Division

  • Max of two columns, Min of two columns, or Average of two columns

../_images/Transf002.png

Load data

The load view is used to load the transformed data to Apromore workspace. In other words, it saves the transformed data. To load the transformed data, click the Load view. The transformed log is displayed.

../_images/ETL025.png

Apromore automatically detect and applies the encoding of the data. To change the encoding, select the Encoding dropdown and click Detect.

../_images/ETL026.png

We can decide not to import a column by attaching the Ignore tag to it. To make it easier to ignore multiple selected event attribute columns, click on the Event attribute -> Ignore button at the upper-left corner. From that point on, the selected event attribute columns are tagged as Ignore.

../_images/ETL027.png

Similarly, if we need to tag multiple columns as Event attribute, we can click on Ignore –> Event Attribute in the upper-left corner next to Event attribute -> Ignore. From that point on, the selected event attribute columns are tagged as Event Attribute.

../_images/ETL028.png
There are three approaches to loading a data pipeline in Apromore.
  • Uploading the log.

  • Scheduling the pipeline.

  • Chaining the pipeline to another pipeline.

Upload log

We use Upload log when we wish to run the pipeline once and upload the resulting log to our Apromore workspace. To upload the pipeline resulting log, click Upload Log.

../_images/ETL045.png

We will be prompted to enter the log name and specify a path where the log will be saved.

../_images/ETL029.png

Apromore runs the pipeline and the log file will be saved to the specified path in our Apromore Workspace.

Schedule pipelines

We use Schedule Pipeline when we wish to run the pipeline at regular intervals and upload the resulting log to our Apromore workspace.

To do this, click Schedule Pipeline.

../_images/ETL030.png

A dialog box appear where we fill the scheduling details.

../_images/ETL031.png

To learn more about scheduling a pipeline, see Schedule data pipeline.

Tip

When scheduling a pipeline with a micro-batched table, the schedule frequency of the pipeline should be consistent with the frequency of arrival of new micro-batches to the S3 folder. For instance, if an external extraction process automatically uploads a micro-batch every hour, we can set the pipeline schedule to run hourly. To learn more about micro-batch ingestion, see Extract micro-batches from an S3 folder.

Chaining pipelines

We use Chain Pipeline when we wish to chain the current pipeline to another pipeline. This means that the output of the first pipeline will be given as one of the inputs to the second pipeline, allowing us to define our pipelines modularly. For example, a data pipeline that extracts data from a CRM system can be chained with a second pipeline that enhances the CRM data with data from a legacy system

To chain an ETL pipeline to another ETL pipeline, click Chain New Pipeline.

../_images/ETL035.png

The Save data pipeline window appears. Enter a name to save the current pipeline with.

../_images/ETL036.png

Click Create. It saves the data pipeline and returns the the Extract view to add another ETL pipeline.

../_images/ETL037.png