Skip to content

Publish Events Directly

The following walks you through how you use the Event Ingestion API endpoints directly.

How to publish events

Prerequisites

To use the Event Ingestion API, an API key is required.

Send events

  1. Navigate to the API Swagger documentation: https://api.docs.datakitchen.io/production/events.html
  2. Expand Events > in the menu.
  3. Select an event endpoint.
  4. Expand the AUTHORIZATIONS section for header information on where the API key should be applied.
  5. Enter the required and optional key-value pairs into your script.

    Descriptions, parameters, and request and response formats are provided for each endpoint.

Walkthrough

Use case

This batch pipeline migrates data from an SFTP server to an S3 bucket. It creates a table for storing the data in Redshift, then uploads the data into Redshift in an ETL step. Finally, it publishes a Tableau workbook for visualizing the data.

(Screenshot of DAG described above)

When this operation runs, a user may be interested in the following information:

  • When did the batch pipeline start running?
  • What tasks are actively running? What tasks finished running and did any fail?
  • Are there any error or warning log messages?
  • What tests have been executed and what are the results?

Once the appropriate events are published to the Event Ingestion API, Observability can provide insight and answers to these questions.

Publish a Run Status event

A Run Status event is the first event type published when the batch pipeline begins. This indicates that a run has started.

See Event Ingestion API: RunStatus for complete technical details for this event type.

  1. For the first POST to the run status endpoint:

    • Include the pipeline_key and run_key.
    • In this example, an appropriate pipeline_key would be SFTP_server_to_Tableau_workbook and you can use the Airflow Run ID as the run_key value.
    • Include a status in the request body set to RUNNING.
  2. For the second POST to the run status endpoint:

    • Add an event_timestamp, task_key, and status in the request body.
    • In this example, each step of the DAG represents a task. An appropriate task_key would be SFTP_to_S3, status is RUNNING, and event_timestamp should reflect when the task started running.
    • If no timestamp is set, the Event Ingestion API applies its current time to the field.
  3. When the task is finished running, publish another Run Status event similar to that from step 2, but with a status COMPLETED, COMPLETED_WITH_WARNINGS, or FAILED.

  4. As the subsequent tasks in the pipeline execute, they should publish their own Run Status events, similar to that from step 2.
  5. When the run is finished, publish another Run Status event similar to that from step 1, but with a status COMPLETED, COMPLETED_WITH_WARNINGS, or FAILED to close the run.

Note

Run start and end: the status key sets the status for both runs and tasks. A run starts when no task_key is present and the status is "running." A run finishes when no task_key is present and the status is not "running."

The system automatically creates a new run for any event sent with a unique pipeline_key and run_key.

Publish a Message Log event

When a task in the batch pipeline run fails, logs are typically the starting point for fixing any failures. Therefore, it's helpful to publish relevant log messages as events to give context in the UI.

See Event Ingestion API: MessageLog for complete technical details for this event type.

  1. A POST to the message log endpoint requires log_level, message, run_key, and pipeline_key values.
  2. Optionally, include a task_key to associate the message with a specific task, or node.
  3. Each node should publish its respective warning and error logs.

Tip

Although you can publish every log message for a pipeline run, DataKitchen strongly discourages this practice. Best practice is to publish only warning and error log messages that provide context for fixing failures and a limited number of useful info logs.

Publish a Test Outcomes event

An important part of DataOps methodology is running tests. Ideally, each node in the pipeline should execute tests to validate that they have run successfully before the batch pipeline run progresses to subsequent nodes. Therefore, each task should publish a TestOutcomes event with corresponding test results.

See Event Ingestion API: TestOutcomes for complete technical details for this event type.

  1. A POST to the test outcomes endpoint requires a run_key, pipeline_key, and test_outcomes array of results.
  2. Optionally, include a task_key to associate the results with a specific task (i.e. node).
  3. Each node should publish its respective test results.

Publish a Metric Log event

Staying up-to-date with data values as they move through a pipeline is a good way to ensure data remains within expectations. Each node that transforms, aggregates, or manipulates data should post a log with relevant information about the value. Publish a MetricLog event to monitor data as it moves through the batch pipeline.

See Event Ingestion API: MetricLog for complete technical details for this event type.

  1. A POST to the metric log endpoint requires a metric_value, pipeline_key, metric_key, and run_key.
  2. Optionally, include a task_key to associate the results with a specific task (i.e. node).
  3. Each node should publish its respective metric log.