Streamdal Pipelines

We're updating our documentation, so the presented info might not be the most recent.

Between the service and the destination on the Data Graph, you will see your producers and consumers (also known as readers or writers; what you assigned as an operation type when configuring the SDK). These are what you will be attaching pipelines to.

What are Pipelines?

Pipelines in Streamdal work similarly to pipelines in other products that utilize step-based workflows. They are logical checks and rules data must adhere to so it can continue to be processed by your applications, and subsequently sent “downstream” (i.e. written to a database, or passed to another application or service).

Pipelines are put together in the form of steps (which we refer to interchangeably as rules), and each step has a type which determine the kinds of actions to take on data.

Pipelines informative image

You can also edit or create new pipelines by selecting an operation type (a producer or a consumer), and clicking on Create New Pipeline or Edit Pipeline (circled on the right menu above.)

On the high-level overview for how Streamdal all works together with it’s necessary components, pipelines are the “transformation rules” that are sent to the server, which are then converted to Wasm modules that are pushed to the SDK.

Step Types

Step TypesUsage
DetectiveUsed to analyze a payload
TransformUsed to transform or extract data
Key/ValueOperations against built-in distributed K/V store
Schema ValidationAllows you to upload and version schemas to validate data against
HTTP RequestUsed for CRUD operations and multiplexing data via HTTP

Detective

Detective is used for performing various types of data analysis on JSON payloads, including but not limited to:

  • PII Detection
  • Data Validation
  • Key/Value Matching
  • Existence Validation
  • Data Format Checks

This step type accepts an optional path to specifically search for, and when left empty detective will search the entire payload.

Detective Types

Personally Identifiable Information (PII)Data ValidationString MatchingData Formats
PII_ANYIS_EMPTYSTRING_CONTAINS_ANYTIMESTAMP_RFC3339
PII_CREDIT_CARDHAS_FIELDSTRING_CONTAINS_ALLTIMESTAMP_UNIX_NANO
PII_SSNIS_TYPEREGEXTIMESTAMP_UNIX
PII_E-MAILSTRING_LENGTH_MINBOOLEAN_TRUE
PII_PHONESTRING_LENGTH_MAXBOOLEAN_FALSE
PII_DRIVER_LICENSESTRING_LENGTH_RANGEUUID
PII_PASSPORT_IDSTRING_EQUALIPV4_ADDRESS
PII_VIN_NUMBERNUMERIC_EQUAL_TOIPV6_ADDRESS
PII_SERIAL_NUMBERNUMERIC_GREATER_THANMAC_ADDRESS
PII_LOGINNUMERIC_GREATER_EQUALURL
PII_TAXPAYER_IDNUMERIC_LESS_THANHOSTNAME
PII_ADDRESSNUMERIC_LESS_EQUAL
PII_SIGNATURENUMERIC_RANGE
PII_GEOLOCATIONNUMERIC_MIN
PII_EDUCATIONNUMERIC_MAX
PII_FINANCIALSEMVER
PII_HEALTH

PII_KEYWORD

The PII_KEYWORD detective type will attempt to find sensitive info in JSON payloads by analyzing field names and their relative location in the payload.

By default, this detective type will run in Performance mode - this mode normalizes field names and compares them against a PII keyword hashmap. The result is that “Performance” is extremely fast and incurs a very small overhead penalty (<0.08ms per PII scan on a 10KB payload).

The downside of “Performance” mode is that it relies on exact key and path-based lookups. Meaning, that if your payloads have fields like foo_AWS_SECRET_KEY_bar, “Performance” will not pick up on it.

To address this, you can enable PII_KEYWORD to use Accuracy mode.

In this mode, Detective will perform hashmap based lookups, path comparison and lastly perform a “keyword-in-string” check. While this incurs a higher resource penalty cost (~1.5ms per PII scan on a 10KB payload), it results in significantly more accurate PII detection.


NOTE: If you are concerned about the extra overhead that “Accuracy” mode will incur, configure the Streamdal SDK to use async mode and enable sampling.
tip

Pro tip: There are two “catch-all” PII scanners: PII_KEYWORD and PII_ANY

PII_KEYWORD is used for scanning for PII in KEYS.

PII_ANY is used for scanning for PII in VALUES.

Transform

Transform is used to make changes to data.

Transform Types

REPLACE_VALUE

This transform type will replace the values in the object fields.

Path is required.

Value is required.


DELETE_FIELD

This transform type will delete the object fields.

Path is required.


OBFUSCATE_VALUE

This transform type will obfuscate the object fields.

Path is required.


MASK_VALUE

This transform type will mask the the object fields.

Path is required.


TRUNCATE_VALUE

This transform type will truncate the object fields. Truncate length is determined by length or percentage

Path is required.

Value is required.


EXTRACT_VALUE

This transform type will extract the the object fields.

Path is required.

Extract will take only the selected paths, and drop all others. This type also allows you to flatten the resulting object.

Example:

{
   "customer": {
      "address": {
         "street": "123 main st",
         "city": "portland"
      },
      "client": {
         "first_name": "John",
         "last_name": "Doe"
      }
   }
}

If you extract customer.client.first_name and customer.address.city, and select the flatten option, the resulting JSON will look like:

{
   "city": "portland",
   "first_name": "John"
}

Key/Value

All Streamdal clients come pre-built with a “local, distributed key/value store”.

“Local, distributed” is a strange term - here is what we mean:

  • Every Streamdal client that uses the same Streamdal cluster has a copy of the full key/value store that is accessible to every other Streamdal client.
  • The key/value store is stored entirely in memory.
  • Pipeline steps can modify the contents of the key/value store and these changes will be propagated to all other Streamdal clients.
  • Key/value does not incur ANY network hop penalty as it is stored entirely locally, in memory.

This feature enables users to build complex pipelines that involve looking up dynamic data, such as “known user id’s”, to implement idempotency or implementing a dedupe strategy.

Schema Validation

Schema validation is self-explanatory. This type is used to validate schemas. Schemas are accepted via pasting from the clipboard. We currently accept JSON schemas, but will accept protobuf and other binary schemas in the future.

Schema is required.

HTTP Request

The HTTP Request step allows you to perform an outbound HTTP request if a condition is met.

This step allows you to set a “Body Mode” to either Static or Use Previous Step Result.

When using Static, the body will be sent as specified. When using Use Previous Step Result - the SDKs will construct a body that will look something like this:

{
   "detective_result": {
      "matches": [
         {
            "type": "DETECTIVE_TYPE_PII_KEYWORD",
            "path": "email",
            "value": null,
            "pii_type": "Person"
         },
         {
            "type": "DETECTIVE_TYPE_PII_KEYWORD",
            "path": "password",
            "value": null,
            "pii_type": "Credentials"
         },
         {
            "type": "DETECTIVE_TYPE_PII_KEYWORD",
            "path": "birthdate",
            "value": null,
            "pii_type": "Person"
         },
         {
            "type": "DETECTIVE_TYPE_PII_KEYWORD",
            "path": "country",
            "value": null,
            "pii_type": "Address"
         }
      ]
   },
   "audience": {
      "service_name": "signup-service",
      "component_name": "postgresql",
      "operation_type": "OPERATION_TYPE_PRODUCER",
      "operation_name": "verifier"
   }
}

NOTE: As of 05.2024, the only step type that can be used with Use Previous Step Results is Detective.

Allowed HTTP Methods
GET
POST
PUT
DELETE
PATCH
HEAD
OPTIONS

URL is required.

Request Body is required IF PII_KEYWORD mode is “Static”.

You are also able to pass key/value pairs that will be sent along as HTTP request headers.

Headers are optional (but you’ll probably want to include these in production environments).

Pipeline Flow control

As pipelines execute, the resulting flow is determined via boolean or ERROR results. When configuring pipelines, you can tailor each step to control the execution of pipelines or optimize the flow of data at every step:

Pipeline Flow Control UI screenshot

You can define how each pipeline should proceed by Don't Abort, Abort Current Pipeline, Abort All Pipelines, adding Metadata (which could be arbitrary keys and values that will be emitted to the calling code; i.e. to enrich, categorize, or label data for downstream usage), or sending Notifications:

Pipeline Flow Control UI screenshot
tip

Pro tip: Don’t forget to save pipelines after creating them!

Example Pipeline

There are many workflows you might normally configure in a traditional data pipeline. You might want to:

  • Ensure no PII or sensitive data is in logs
  • Route data
  • Clean and reshape data
  • Verify data consistency
  • Handle errors
  • Add automation
  • Scale services or data platform

Below is an example of how you would set up a similar workflow as a Streamdal pipeline in the Console UI. In this example, you’ll see how you could use pipelines to truncate developer logs: