Snowflake¶
Snowflake data sources and sinks are in the database category of I/O connectors.
Tool documentation¶
As a data warehousing solution, Snowflake can perform more than just database operations. For complete details, see Snowflake's documentation.
Connector type values¶
The "type": value to use in the source or sink JSON files.
| Connector type | Value |
|---|---|
| Data source | DKDataSource_Snowflake |
| Data sink | DKDataSink_Snowflake |
Connection properties¶
The properties to use when connecting to a Snowflake instance from Automation.
| Field | Scope | Type | Required? | Description |
|---|---|---|---|---|
account |
source/sink | string | yes | Snowflake account name. Do not include snowflakecomputing.com in the account name. The Automation system adds this information automatically. |
role |
source/sink | string | no | Snowflake role. |
warehouse |
source/sink | string | no | Snowflake warehouse name. |
database |
source/sink | string | no | Snowflake database (a grouping of schemas). |
schema |
source/sink | string | no | Snowflake schema (a grouping of database objects). |
username |
source/sink | string | yes | Snowflake username. |
password |
source/sink | string | yes | Snowflake password. |
Connections¶
See Connection Properties for more details on connection configurations.
Defined in kitchen-level variables¶
snowflakeConfig
{
"snowflakeConfig": {
"account": "#{vault://snowflake/account}",
"role": "#{vault://snowflake/role}",
"warehouse": "#{vault://snowflake/warehouse}",
"database": "#{vault://snowflake/database}",
"schema": "#{vault://snowflake/schema}",
"username": "#{vault://snowflake/username}",
"password": "#{vault://snowflake/password}"
}
}
The Connection tab in a Node Editor¶

Expanded connection syntax¶
For a data source¶
source.json
{
"type": "DKDataSource_Snowflake",
"name": "source",
"config": {
"username": "{{snowflakeConfig.username}}",
"password": "{{snowflakeConfig.password}}",
"account": "{{snowflakeConfig.account}}",
"role": "{{snowflakeConfig.role}}",
"warehouse": "{{snowflakeConfig.warehouse}}",
"database": "{{snowflakeConfig.database}}",
"schema": "{{snowflakeConfig.schema}}"
},
"keys": {},
"tests": {}
}
For a data sink¶
sink.json
{
"type": "DKDataSink_Snowflake",
"name": "sink",
"config": {
"username": "{{snowflakeConfig.username}}",
"password": "{{snowflakeConfig.password}}",
"account": "{{snowflakeConfig.account}}",
"role": "{{snowflakeConfig.role}}",
"warehouse": "{{snowflakeConfig.warehouse}}",
"database": "{{snowflakeConfig.database}}",
"schema": "{{snowflakeConfig.schema}}"
},
"keys": {},
"tests": {}
}
Condensed connection syntax¶
Note
Note: Do not use quotes for condensed connection configuration variables.
For a data source¶
source.json
{
"type": "DKDataSource_Snowflake",
"name": "source",
"config-ref": "snowflakeConfig",
"keys": {},
"tests": {}
}
For a data sink¶
sink.json
{
"type": "DKDataSink_Snowflake",
"name": "sink",
"config-ref": "snowflakeConfig",
"keys": {},
"tests": {}
}
Connect to the Snowflake platform¶
After creating your Snowflake account, access Snowflake data warehousing information through the Snowflake Portal. There, you can find details on databases, worksheets, and warehouses. Learn more in the Snowflake Getting Started guide.
You must create a warehouse to use the DataKitchen connector.
The Python connector uses two methods to connect to the Snowflake platform. Both tools are available in DataKitchen agents, so there is no additional installation necessary.
- Snowflake Python connector: https://docs.snowflake.com/en/user-guide/python-connector.html
- SnowSQL (CLI client): https://docs.snowflake.com/en/user-guide/snowsql.html
Other configuration properties¶
See the following topics for common properties and runtime variables:
Additional Snowflake step (key) properties¶
| Field | Type | Required? | Description |
|---|---|---|---|
query-type |
string | yes | Required. Specifies the type of operation to be performed by a step (key). See notes here for details specific to Snowflake sources and sinks. execute_query: (sources only) Executes a query and exports the resulting rowset in the format specified by the format field. Note: execute_query supports a single snowflake command (query), that may be preceded by any number of “use “ Snowflake clauses (e.g., “use database name”, “use schema name”, etc). Note: This functionality is implemented using the Snowflake Python connector. execute_scalar: (sources only) Performs a query for a scalar value, like sum(), count(), etc. Note: execute_scalar supports any number of Snowflake commands. Note: Only the last statement has to return a scalar. Note: This functionality uses SnowSQL. execute_non_query: (sources only) Intended for DML/DDL statements. Note: execute_non_query supports any number of Snowflake commands. Note: This functionality uses SnowSQL. execute_dml: (sinks only) Uses an insert template string for creating the insert sentence. Note: This sink functionality uses the Snowflake Python connector. bulk_insert: (sinks only) Performs a bulk insert from the input data. Note: This sink functionality uses the Snowflake Python connector. |
Note
DATE SQL types handling: For DATE SQL types, the Python Snowflake source uses the Python datetime format to return DATE fields: '%Y-%m-%d %H:%M:%S'.
When Snowflake Sink loads data from input files, it expects DATE fields in this datetime format.
File encoding requirements¶
Files used with data sources and data sinks must be encoded in UTF-8 in order to avoid non-Unicode characters causing problems with sinking data to database tables and errors when running related tests
For CSV and other delimited files, use Save as in the program and select the proper encoding, or consider using a text editor with encoding options.
Data source example¶
source.json
{
"type": "DKDataSource_Snowflake",
"name": "source",
"config-ref" : "snowflakeConfig",
"keys": {
"create_table": {
"target-field": "sql",
"resource-file": "snowflake/create.sql",
"query-type": "execute_scalar",
"set-runtime-vars" : {
"result" : "create_table_result"
}
},
"insert_data": {
"target-field": "sql",
"resource-file": "postgres/insert.sql",
"query-type": "execute_scalar",
"set-runtime-vars" : {
"result" : "insert_data_result"
}
},
"my_query": {
"sql": "use database {{snowflake_databasename}};
use schema {{snowflake_databasename}}.{{snowflake_schemaname}};
select * from {{snowflake_tablename}};",
"query-type": "execute_query",
"set-runtime-vars" : {
"column_count": "my_query_column_count",
"row_count": "my_query_row_count"
}
}
}
"tests": {
"test-runtime_variable_create_table_result": {
"test-variable": "create_table_result",
"action": "stop-on-error",
"test-logic": "create_table_result == 0"
},
"test-runtime_variable_insert_data_result": {
"test-variable": "insert_data_result",
"action": "stop-on-error",
"test-logic": "insert_data_result == 2"
},
"test-runtime_variable_my_query_column_count": {
"test-variable": "my_query_column_count",
"action": "stop-on-error",
"test-logic": "my_query_column_count == 2"
},
"test-runtime_variable_my_query_row_count": {
"test-variable": "my_query_row_count",
"action": "stop-on-error",
"test-logic": "my_query_row_count == 2"
}
}
}
Data sink example¶
sink.json
{
"type": "DKDataSink_Snowflake",
"name": "sink",
"config-ref": "snowflakeConfig",
"keys": {
"insert_template": {
"sql": "insert into {{snowflake_databasename}}.{{snowflake_schemaname}}.test_data1(id, data) values(%s, %s);",
"query-type": "execute_dml",
"format": "csv"
},
"insert_bulk_csv": {
"table-name": "{{snowflake_databasename}}.{{snowflake_schemaname}}.test_data2",
"query-type": "bulk_insert",
"format": "csv"
},
"insert_bulk_ddl": {
"table-name": "{{snowflake_databasename}}.{{snowflake_schemaname}}.test_data3",
"query-type": "bulk_insert",
"format": "csv",
"table-ddl": "create or replace table {{snowflake_databasename}}.{{snowflake_schemaname}}.test_data3(id INT not null primary key, data varchar(200);
select * from {{snowflake_databasename}}.{{snowflake_schemaname}}.test_data3;"
}
}
}