Cleanse and Test Raw Data¶
- Use Case: A raw data file must be prepared for analysis by loading it to various database tables and identifying and reporting invalid data.
- Example Recipe: In this example, the recipe uses a star schema modeling approach to data analysis. It takes a global superstore data set, offered by Tableau for training purposes, and splits it into different dimensions, such as product, orders, and customers. It then creates a fact table to store information for analysis, applies a machine learning model to predict seasonal ordering, and finally updates a data catalog.
- Action Node Role: With the raw sales data already transferred to an Amazon S3 data lake, the second node in this variation loads it to tables in an Amazon Redshift database, while separating out the errors and edge case sales orders.

An action node loads the raw data to Amazon Redshift tables and cleanses it of errors and edge cases.
Web App Configuration¶
The following images show how the node is configured using the web app forms.
- Connection to the data source
- Process flow overview of the actions in this example
- Step configurations and the runtime variables to record
- Tests configurations using the runtime variables captured
Note
Tests Follow Steps: Tests declared for a data source in an action node are executed only after all keys/steps have been executed successfully.

The data source connection, configured in the web app, using a pre-defined variable dictionary.

A process flow of the example action node's steps, runtime variables, and tests.

The anatomy of a step configured in the web app for the action node.

Step 6 uses a SQL query to compute a value for later testing.

Step 8 uses a SQL query to append new data to the existing data store.

Test 1 is configured to ensure that the percentage of invalid orders in the data is acceptable.

Test 3 compares the current variable value to the previous value of the same variable to ensure that the data insert was successful.

Test 5 compares the new data against a minimum threshold.
File Contents¶
The platform records the configurations shown above from web app pages in the actions node file: actions/load-and-cleanse-raw.json. Two shared recipe files, used in this node, are called using the "sql": "{{load_text( )}}" syntax and can be found in the recipe's /resources directory.
Note
Default JSON Filename: The web app automatically names the data source used in an action node as "source" and names the JSON file source.json. While these names are not required and can be changed in the File Editor, the name of the source and the filename in the /actions directory must match.
actions/load-and-cleanse-raw.json
{
"name": "load-and-cleanse-raw",
"type": "DKDataSource_PostgreSQL",
"config-ref": "redshiftConfig",
"keys": {
"Create_Schema_if_Nonexistent": {
"description": "Create the schema if one doesn't exist. ",
"sql": "CREATE SCHEMA IF NOT EXISTS {{schema_name}};\n\nSET search_path TO {{schema_name}};\n\nSELECT 1;\n\n",
"query-type": "execute_scalar"
},
"Load_New_Raw_Superstore_from_S3": {
"description": "Load new superstore data and store the number of new records (i.e. orders) added.",
"sql": "{{load_text('load_raw_superstore_from_s3.sql')}}",
"query-type": "execute_scalar",
"set-runtime-vars": {
"result": "num_new_orders"
}
},
"Create_and_Cleanse_Stage": {
"sql": "{{load_text('create_and_cleanse_stage.sql')}}",
"query-type": "execute_scalar"
},
"Num_Orders_with_Errors": {
"description": "Store the number of orders with errors for testing.",
"sql": "SET search_path TO {{schema_name}};\n\nSELECT COUNT(*) FROM demo_superstore_orders_with_errors;",
"query-type": "execute_scalar",
"set-runtime-vars": {
"result": "num_orders_with_errors"
}
},
"Num_Orders_with_Invalid_US_Zip_Codes": {
"description": "Store the number of orders with invalid US zip codes for testing.",
"sql": "SET search_path TO {{schema_name}};\n\nSELECT COUNT(*) FROM demo_superstore_orders_bad_us_zip;",
"query-type": "execute_scalar",
"set-runtime-vars": {
"result": "num_orders_with_invalid_us_zip_codes"
}
},
"Compute_Percent_Invalid_Orders": {
"description": "Compute and store the percentage of new orders that are invalid for use in testing.",
"sql": "SET search_path TO {{schema_name}};\n\nSELECT (100.0 * (SELECT COUNT(*) FROM demo_superstore_orders_with_errors) / (SELECT COUNT(*) FROM new_raw_demo_superstore_orders))::DECIMAL(10,2);",
"query-type": "execute_scalar",
"set-runtime-vars": {
"result": "percent_invalid_orders"
}
},
"Compute_Percent_Invalid_US_Zip_Codes": {
"description": "Compute and store the percentage of new orders with invalid zip codes for use in testing.",
"sql": "SET search_path TO {{schema_name}};\n\nSELECT (100.0 * (SELECT COUNT(*) FROM demo_superstore_orders_bad_us_zip) / (SELECT COUNT(*) FROM new_raw_demo_superstore_orders))::DECIMAL(10,2);",
"query-type": "execute_scalar",
"set-runtime-vars": {
"result": "percent_invalid_us_zip_codes"
}
},
"Append_New_Raw_Superstore": {
"sql": "SET search_path TO {{schema_name}};\n\nCREATE TABLE IF NOT EXISTS raw_demo_superstore_orders\n(\n\trow_id integer not null ,\n\torder_id varchar(100),\n\torder_date date encode ,\n\tship_date date encode ,\n\tship_mode varchar(100),\n\tcustomer_id varchar(100),\n\tcustomer_name varchar(100) ,\n\tsegment varchar(100),\n\tcity varchar(100) ,\n\tstate varchar(100),\n\tcountry varchar(100) ,\n\tpostal_code char(5),\n\tmarket varchar(100),\n\tregion varchar(100) ,\n\tproduct_id varchar(100),\n\tcategory varchar(100),\n\tsub_category varchar(100) ,\n\tproduct_name varchar(1000),\n\tsales numeric(14,3),\n\tquantity integer ,\n\tdiscount numeric(14,3) ,\n\tprofit numeric(14,3),\n\tshipping_cost numeric(14,3) ,\n\torder_priority varchar(100)\n);\n\nINSERT INTO raw_demo_superstore_orders SELECT * FROM new_raw_demo_superstore_orders;\n\nSELECT COUNT(*) FROM raw_demo_superstore_orders;",
"query-type": "execute_scalar",
"set-runtime-vars": {
"result": "total_raw_orders"
}
},
"Append_New_Staged_Superstore": {
"sql": "SET search_path TO {{schema_name}};\n\nCREATE TABLE IF NOT EXISTS stage_demo_superstore_orders\n(\n\trow_id integer not null ,\n\torder_id varchar(100),\n\torder_date date encode ,\n\tship_date date encode ,\n\tship_mode varchar(100),\n\tcustomer_id varchar(100),\n\tcustomer_name varchar(100) ,\n\tsegment varchar(100),\n\tcity varchar(100) ,\n\tstate varchar(100),\n\tcountry varchar(100) ,\n\tpostal_code char(5),\n\tmarket varchar(100),\n\tregion varchar(100) ,\n\tproduct_id varchar(100),\n\tcategory varchar(100),\n\tsub_category varchar(100) ,\n\tproduct_name varchar(1000),\n\tsales numeric(14,3),\n\tquantity integer ,\n\tdiscount numeric(14,3) ,\n\tprofit numeric(14,3),\n\tshipping_cost numeric(14,3) ,\n\torder_priority varchar(100)\n)\n;\n\nINSERT INTO stage_demo_superstore_orders \n\tSELECT * FROM new_stage_demo_superstore_orders;\n\nDROP TABLE IF EXISTS demo_superstore_sales_control;\n\nCREATE TABLE demo_superstore_sales_control AS\n\tSELECT sum(sales) AS total_sales FROM stage_demo_superstore_orders ;\n\nSELECT COUNT(*) FROM stage_demo_superstore_orders;",
"query-type": "execute_scalar",
"set-runtime-vars": {
"result": "total_staged_orders"
}
},
"Cleanup": {
"description": "Drop temporary tables.",
"sql": "SET search_path TO {{schema_name}};\n\nDROP TABLE IF EXISTS new_raw_demo_superstore_orders CASCADE;\nDROP TABLE IF EXISTS new_stage_demo_superstore_orders CASCADE;\n\nSELECT 1;",
"query-type": "execute_scalar"
}
},
"tests": {
"QA_Percent_Invalid_Orders": {
"action": "warning",
"test-variable": "percent_invalid_orders",
"type": "test-contents-as-float",
"test-logic": {
"test-compare": "less-than",
"test-metric": 2
}
},
"QA_Percent_Invalid_US_Zip_Codes": {
"action": "warning",
"test-variable": "percent_invalid_us_zip_codes",
"type": "test-contents-as-float",
"test-logic": {
"test-compare": "less-than",
"test-metric": 1
}
},
"Ensure_Total_Raw_Orders_Always_Increasing": {
"action": "stop-on-error",
"test-variable": "total_raw_orders",
"test-logic": {
"test-compare": "greater-than-equal-to",
"test-metric": {
"historic-calculation": "previous-value",
"historic-metric": "total_raw_orders"
}
},
"keep-history": true
},
"Ensure_Total_Staged_Orders_Always_Increasing": {
"action": "stop-on-error",
"test-variable": "total_staged_orders",
"test-logic": {
"test-compare": "greater-than-equal-to",
"test-metric": {
"historic-calculation": "previous-value",
"historic-metric": "total_staged_orders"
}
},
"keep-history": true
},
"QA_Num_New_Orders_Min": {
"action": "stop-on-error",
"test-variable": "num_new_orders",
"type": "test-contents-as-integer",
"test-logic": {
"test-compare": "greater-than",
"test-metric": 1000
}
},
"Log_Num_Orders_with_Errors": {
"action": "log",
"test-variable": "num_orders_with_errors",
"type": "test-contents-as-integer",
"test-logic": "num_orders_with_errors"
},
"Log_Num_Orders_with_Invalid_US_Zip_Codes": {
"action": "log",
"test-variable": "num_orders_with_invalid_us_zip_codes",
"type": "test-contents-as-integer",
"test-logic": "num_orders_with_invalid_us_zip_codes"
}
}
}
resources/create_and_cleanse_stage.sql
SET search_path TO {{schema_name}};
DROP TABLE IF EXISTS new_stage_demo_superstore_orders;
CREATE TABLE new_stage_demo_superstore_orders
(
row_id integer not null ,
order_id varchar(100),
order_date date encode,
ship_date date encode,
ship_mode varchar(100),
customer_id varchar(100),
customer_name varchar(100) ,
segment varchar(100),
city varchar(100) ,
state varchar(100),
country varchar(100) ,
postal_code char(5),
market varchar(100),
region varchar(100) ,
product_id varchar(100),
category varchar(100),
sub_category varchar(100) ,
product_name varchar(1000),
sales numeric(14,3),
quantity integer ,
discount numeric(14,3) ,
profit numeric(14,3),
shipping_cost numeric(14,3) ,
order_priority varchar(100)
)
;
INSERT INTO new_stage_demo_superstore_orders
SELECT * FROM new_raw_demo_superstore_orders;
UPDATE new_stage_demo_superstore_orders
SET state = replace(state, '''', '')
FROM new_stage_demo_superstore_orders WHERE state LIKE '''%';
DROP TABLE IF EXISTS demo_superstore_orders_bad_US_zip;
CREATE TABLE demo_superstore_orders_bad_US_zip AS
SELECT *
FROM new_raw_demo_superstore_orders WHERE country = 'United States'
AND len(postal_code) <> 5;
DROP TABLE IF EXISTS multiple_customers;
CREATE TABLE multiple_customers AS
SELECT order_id, COUNT(DISTINCT customer_id)
FROM new_stage_demo_superstore_orders
GROUP BY order_id
HAVING COUNT(DISTINCT customer_id) > 1;
DROP TABLE IF EXISTS demo_superstore_orders_with_errors;
CREATE TABLE demo_superstore_orders_with_errors AS
SELECT *
FROM new_stage_demo_superstore_orders WHERE order_id IN
(SELECT order_id FROM multiple_customers)
ORDER BY order_id;
DELETE FROM new_stage_demo_superstore_orders WHERE order_id IN
(SELECT order_id FROM multiple_customers);
INSERT INTO demo_superstore_orders_with_errors
SELECT * FROM new_stage_demo_superstore_orders WHERE order_id = 'ES-2014-1903302';
DELETE FROM new_stage_demo_superstore_orders WHERE order_id = 'ES-2014-1903302';
SELECT 1;
resources/load_raw_superstore_from_s3.sql
SET search_path TO {{schema_name}};
DROP TABLE IF EXISTS new_raw_demo_superstore_orders CASCADE;
CREATE TABLE new_raw_demo_superstore_orders
(
row_id integer not null ,
order_id varchar(100),
order_date date encode,
ship_date date encode,
ship_mode varchar(100),
customer_id varchar(100),
customer_name varchar(100) ,
segment varchar(100),
city varchar(100) ,
state varchar(100),
country varchar(100) ,
postal_code char(5),
market varchar(100),
region varchar(100) ,
product_id varchar(100),
category varchar(100),
sub_category varchar(100) ,
product_name varchar(1000),
sales numeric(14,3),
quantity integer ,
discount numeric(14,3) ,
profit numeric(14,3),
shipping_cost numeric(14,3) ,
order_priority varchar(100)
);
COPY new_raw_demo_superstore_orders
FROM 's3://{{s3Config.bucket}}/{{global_superstore_s3_base_path}}{{global_superstore_filename}}'
CREDENTIALS 'aws_iam_role=arn:aws:iam::{{s3Config.copy_role}}'
DELIMITER AS '|'
TIMEFORMAT AS 'auto'
DATEFORMAT AS 'auto'
FILLRECORD
BLANKSASNULL;
SELECT COUNT(*) FROM new_raw_demo_superstore_orders;