Skip to content

Source-Sink Example

This code-based example shows an active source and sink connected to the container. You can read files from any available data source, load them into a container, transform them using a script, then export them to any data sink.

In this example, the process imports a CSV file from an S3 bucket into the container, modifies the contents of the file, tests the row count to ensure it has not changed, and exports it back to S3.

Note

Container images are built with the DataKitchen Interface Layer and require specific files and file structures to be present in a container node. For more information, see GPC File Structure and Configuration.

File contents

config.json

The config.json file defines a shell script, which will make a modification to a file.

{
    "apt-dependencies": [ ],
    "dependencies": [ ],
    "keys": {
        "run_shell_script": {
            "script": "transform_data.sh",
            "parameters": {},
            "environment": {}
        }
    }
}

datasource.json

The s3_datasource.json file identifies the input connection for the container as an S3 bucket, using secrets defined in the Kitchen Overrides and vault. It uses Jinja expressions with variables defined in the recipe's variables.json for filename and path values in the mappings. The data source mapping sets a row_count runtime variable for use in testing.

Using the Node Editor in the UI, you would define the same values in the Source Connections section of the Connections tab, and in the Container > Target File Path fields of the Inputs tab.

{
    "name": "s3_datasource",
    "type": "DKDataSource_S3",
    "config": {
        "access-key": "{{s3Config.access_key}}",
        "secret-key": "{{s3Config.secret_key}}",
        "bucket": "{{s3Config.bucket}}"
    },
    "keys": {
        "mapping1": {
            "file-key": "{{source_sink_example_node.source_filepath}}",
            "use-only-file-key": true,
            "set-runtime-vars": {
                "row_count": "source_row_count"
            }
        }
    },
    "tests": {
        "test_source_row_count": {
            "action": "stop-on-error",
            "test-variable": "source_row_count",
            "type": "test-contents-as-integer",
            "test-logic": {
                "test-compare": "greater-than",
                "test-metric": 99
            }
        }
    }
}

datasink.json

The s3_datasink.json file identifies the output connection for the container as an S3 bucket, using secrets defined in the Kitchen Overrides and vault. It uses Jinja expressions with variables defined in the recipe's variables.json for filename and path values in the mappings.

Using the Node Editor in the UI, you would define the same values in the Container > Container File Path and Sink > Target File Path fields of the Outputs tab.

{
    "name": "s3_datasink",
    "type": "DKDataSink_S3",
    "config": {
        "access-key": "{{s3Config.access_key}}",
        "secret-key": "{{s3Config.secret_key}}",
        "bucket": "{{s3Config.bucket}}"
    },
    "keys": {
        "mapping1": {
            "file-key": "{{source_sink_example_node.sink_filepath}}",
            "use-only-file-key": true,
            "set-runtime-vars": {
                "row_count": "sink_row_count"
            }
        }
    },
    "tests": {
        "test_source_sink_rowcounts_match": {
            "action": "stop-on-error",
            "test-variable": "sink_row_count",
            "type": "test-contents-as-integer",
            "test-logic": "sink_row_count == {{source_row_count}}"
        }
    }
}

data.sh

The transform_data.sh script executes a simple search-and-replace function on every line in the loaded file to append the filename at the end.

#!/bin/bash

# Navigate to docker-share directory where source files were added
cd ./docker-share
echo "The files injected into $(pwd):"
ls

# Append column to CSV file containing filename
sed -i "s/$/,{{global_superstore_orders_filename}}/" {{global_superstore_orders_filename}}

notebook.json

The notebook.json file includes input and output file keys.

{
    "image-repo": "{{dockerhubConfig.image_repo.general_purpose}}",
    "image-tag": "{{dockerhubConfig.image_tag.general_purpose}}",
    "dockerhub-namespace": "{{dockerhubConfig.namespace.general_purpose}}",
    "dockerhub-username": "{{dockerhubConfig.username}}",
    "dockerhub-password": "{{dockerhubConfig.password}}",
    "container-input-file-keys": [
        {
            "filename": "{{global_superstore_orders_filename}}",
            "key": "s3_datasource.mapping1"
        }
    ],
    "container-output-file-keys": [
        {
            "filename": "{{global_superstore_orders_filename}}",
            "key": "s3_datasink.mapping1"
        }
    ]
}