Streamdal Pipelines

We're updating our documentation, so the presented info might not be the most recent.

Between the service and the destination on the Data Graph, you will see your producers and consumers (also known as readers or writers; what you assigned as an operation type when configuring the SDK). These are what you will be attaching pipelines to.

What are Pipelines?

Pipelines in Streamdal work similarly to pipelines in other products that utilize step-based workflows. They are logical checks and rules data must adhere to so it can continue to be processed by your applications, and subsequently sent “downstream” (i.e. written to a database, or passed to another application or service).

Pipelines are put together in the form of steps (which we refer to interchangeably as rules), and each step has a type which determine the kinds of actions to take on data.

Pipelines informative image

You can also edit or create new pipelines by selecting an operation type (a producer or a consumer), and clicking on Create New Pipeline or Edit Pipeline (circled on the right menu above.)

On the high-level overview for how Streamdal all works together with it’s necessary components, pipelines are the “transformation rules” that are sent to the server, which are then converted to Wasm modules that are pushed to the SDK.

Step Types

Step TypesUsage
DetectiveUsed to analyze a payload
TransformUsed to transform or extract data
Key/ValueUsed to analyze for specific key/values
Schema ValidationAllows you to upload and version schemas to validate data against
HTTP RequestUsed for CRUD operations and multiplexing data via HTTP

Detective

Detective is used to analyze a JSON payload. This step type accepts an optional path to specifically search for, and when left empty detective will search the entire payload.

Detective Types

Personally Identifiable Information (PII)Data ValidationString MatchingData Formats
PII_ANYIS_EMPTYSTRING_CONTAINS_ANYTIMESTAMP_RFC3339
PII_CREDIT_CARTHAS_FIELDSTRING_CONTAINS_ALLTIMESTAMP_UNIX_NANO
PII_SSNIS_TYPEREGEXTIMESTAMP_UNIX
PII_E-MAILSTRING_LENGTH_MINBOOLEAN_TRUE
PII_PHONESTRING_LENGTH_MAXBOOLEAN_FALSE
PII_DRIVER_LICENSESTRING_LENGTH_RANGEUUID
PII_PASSPORT_IDSTRING_EQUALIPV4_ADDRESS
PII_VIN_NUMBERNUMERIC_EQUAL_TOIPV6_ADDRESS
PII_SERIAL_NUMBERNUMERIC_GREATER_THANMAC_ADDRESS
PII_LOGINNUMERIC_GREATER_EQUALURL
PII_TAXPAYER_IDNUMERIC_LESS_THANHOSTNAME
PII_ADDRESSNUMERIC_LESS_EQUAL
PII_SIGNATURENUMERIC_RANGE
PII_GEOLOCATIONNUMERIC_MIN
PII_EDUCATIONNUMERIC_MAX
PII_FINANCIALSEMVER
PII_HEALTH
tip

Want to scan the entire payload dynamically for PII vs. specifying fields?

Use PII_ANY for the detective step, leave the path empty, and your pipeline will detect PII anywhere in the payload!

Transform

Transform is used to make changes to data.

Transform Types

REPLACE_VALUE

This transform type will replace the values in the object fields.

Path is required.

Value is required.


DELETE_FIELD

This transform type will delete the object fields.

Path is required.


OBFUSCATE_VALUE

This transform type will obfuscate the object fields.

Path is required.


MASK_VALUE

This transform type will mask the the object fields.

Path is required.


TRUNCATE_VALUE

This transform type will truncate the object fields. Truncate length is determined by length or percentage

Path is required.

Value is required.


EXTRACT_VALUE

This transform type will extract the the object fields.

Path is required.

Extract will take only the selected paths, and drop all others. This type also allows you to flatten the resulting object.

Example:

{
   "customer": {
      "address": {
         "street": "123 main st",
         "city": "portland"
      },
      "client": {
         "first_name": "John",
         "last_name": "Doe"
      }
   }
}

If you extract customer.client.first_name and customer.address.city, and select the flatten option, the resulting JSON will look like:

{
   "city": "portland",
   "first_name": "John"
}

Key/Value

Key value currently operates similarly to the Detective type. You can use this type to check against a payload for known keys. In the future, this type will support CRUD and DELETE_ALL operations.

Key is required.

Schema Validation

Schema validation is self-explanatory. This type is used to validate schemas. Schemas are accepted via pasting from the clipboard. We currently accept JSON schemas, but will accept protobuf and other binary schemas in the future.

Schema is required.

HTTP Request

Allowed HTTP Methods
GET
POST
PUT
DELETE
PATCH
HEAD
OPTIONS

URL is required.

Request Body is required.

You are also able to pass key/value pairs that will be sent along as HTTP request headers.

Headers are optional (but you’ll probably want to include these in production environments).

Pipeline Flow control

As pipelines execute, the resulting flow is determined via boolean or ERROR results. When configuring pipelines, you can tailor each step to control the execution of pipelines or optimize the flow of data at every step:

Pipeline Flow Control UI screenshot

You can define how each pipeline should proceed by Don't Abort, Abort Current Pipeline, Abort All Pipelines, adding Metadata (which could be arbitrary keys and values that will be emitted to the calling code; i.e. to enrich, categorize, or label data for downstream usage), or sending Notifications:

Pipeline Flow Control UI screenshot
tip

Pro tip: Don’t forget to save pipelines after creating them!

Example Pipeline

There are many workflows you might normally configure in a traditional data pipeline. You might want to:

  • Ensure no PII or sensitive data is in logs
  • Route data
  • Clean and reshape data
  • Verify data consistency
  • Handle errors
  • Add automation
  • Scale services or data platform

Below is an example of how you would set up a similar workflow as a Streamdal pipeline in the Console UI. In this example, you’ll see how you could use pipelines to truncate developer logs: