ObjectStackObjectStack

Etl

Etl protocol schemas

ETL (Extract, Transform, Load) Pipeline Protocol - LEVEL 2: Data Engineering

Inspired by modern data integration platforms like Airbyte, Fivetran, and Apache NiFi.

Positioning in 3-Layer Architecture:

  • L1: Simple Sync (automation/sync.zod.ts) - Business users - Sync Salesforce to Sheets

  • L2: ETL Pipeline (THIS FILE) - Data engineers - Aggregate 10 sources to warehouse

  • L3: Enterprise Connector (integration/connector.zod.ts) - System integrators - Full SAP integration

ETL pipelines enable automated data synchronization between systems, transforming

data as it moves from source to destination.

SCOPE: Advanced multi-source, multi-stage transformations.

Supports complex operations: joins, aggregations, filtering, custom SQL.

When to Use This Layer

Use ETL Pipeline when:

  • Combining data from multiple sources

  • Need aggregations, joins, transformations

  • Building data warehouses or analytics platforms

  • Complex data transformations required

Examples:

  • Sales data from Salesforce + Marketing from HubSpot → Data Warehouse

  • Multi-region databases → Consolidated reporting

  • Legacy system migration with transformation

When to downgrade:

When to upgrade:

@see ./sync.zod.ts for Level 1 (simple sync)

@see ../integration/connector.zod.ts for Level 3 (enterprise integration)

Use Cases

  1. Data Warehouse Population
  • Extract from multiple operational systems

  • Transform to analytical schema

  • Load into data warehouse

  1. System Integration
  • Sync data between CRM and Marketing Automation

  • Keep product catalog synchronized across e-commerce platforms

  • Replicate data for backup/disaster recovery

  1. Data Migration
  • Move data from legacy systems to modern platforms

  • Consolidate data from multiple sources

  • Split monolithic databases into microservices

@see https://airbyte.com/

@see https://docs.fivetran.com/

@see https://nifi.apache.org/

@example


const salesforceToDB: ETLPipeline = \{

name: 'salesforce_to_postgres',

label: 'Salesforce Accounts to PostgreSQL',

source: \{

type: 'api',

connector: 'salesforce',

config: \{ object: 'Account' \}

\},

destination: \{

type: 'database',

connector: 'postgres',

config: \{ table: 'accounts' \}

\},

transformations: [

\{ type: 'map', config: \{ 'Name': 'account_name' \} \}

],

schedule: '0 2 * * *' // Daily at 2 AM

\}

Source: packages/spec/src/automation/etl.zod.ts

TypeScript Usage

import { ETLDestination, ETLEndpointType, ETLPipeline, ETLPipelineRun, ETLRunStatus, ETLSource, ETLSyncMode, ETLTransformation, ETLTransformationType } from '@objectstack/spec/automation';
import type { ETLDestination, ETLEndpointType, ETLPipeline, ETLPipelineRun, ETLRunStatus, ETLSource, ETLSyncMode, ETLTransformation, ETLTransformationType } from '@objectstack/spec/automation';

// Validate data
const result = ETLDestination.parse(data);

ETLDestination

Properties

PropertyTypeRequiredDescription
typeEnum<'database' | 'api' | 'file' | 'stream' | 'object' | 'warehouse' | 'storage' | 'spreadsheet'>Destination type
connectorstringoptionalConnector ID
configRecord<string, any>Destination configuration
writeModeEnum<'append' | 'overwrite' | 'upsert' | 'merge'>How to write data
primaryKeystring[]optionalPrimary key fields

ETLEndpointType

Allowed Values

  • database
  • api
  • file
  • stream
  • object
  • warehouse
  • storage
  • spreadsheet

ETLPipeline

Properties

PropertyTypeRequiredDescription
namestringPipeline identifier (snake_case)
labelstringoptionalPipeline display name
descriptionstringoptionalPipeline description
sourceObjectData source
destinationObjectData destination
transformationsObject[]optionalTransformation pipeline
syncModeEnum<'full' | 'incremental' | 'cdc'>Sync mode
schedulestringoptionalCron schedule expression
enabledbooleanPipeline enabled status
retryObjectoptionalRetry configuration
notificationsObjectoptionalNotification settings
tagsstring[]optionalPipeline tags
metadataRecord<string, any>optionalCustom metadata

ETLPipelineRun

Properties

PropertyTypeRequiredDescription
idstringRun identifier
pipelineNamestringPipeline name
statusEnum<'pending' | 'running' | 'succeeded' | 'failed' | 'cancelled' | 'timeout'>Run status
startedAtstringStart time
completedAtstringoptionalCompletion time
durationMsnumberoptionalDuration in ms
statsObjectoptionalRun statistics
errorObjectoptionalError information
logsstring[]optionalExecution logs

ETLRunStatus

Allowed Values

  • pending
  • running
  • succeeded
  • failed
  • cancelled
  • timeout

ETLSource

Properties

PropertyTypeRequiredDescription
typeEnum<'database' | 'api' | 'file' | 'stream' | 'object' | 'warehouse' | 'storage' | 'spreadsheet'>Source type
connectorstringoptionalConnector ID
configRecord<string, any>Source configuration
incrementalObjectoptionalIncremental extraction config

ETLSyncMode

Allowed Values

  • full
  • incremental
  • cdc

ETLTransformation

Properties

PropertyTypeRequiredDescription
namestringoptionalTransformation name
typeEnum<'map' | 'filter' | 'aggregate' | 'join' | 'script' | 'lookup' | 'split' | 'merge' | 'normalize' | 'deduplicate'>Transformation type
configRecord<string, any>Transformation config
continueOnErrorbooleanContinue on error

ETLTransformationType

Allowed Values

  • map
  • filter
  • aggregate
  • join
  • script
  • lookup
  • split
  • merge
  • normalize
  • deduplicate

On this page