Etl
Etl protocol schemas
ETL (Extract, Transform, Load) Pipeline Protocol - LEVEL 2: Data Engineering
Inspired by modern data integration platforms like Airbyte, Fivetran, and Apache NiFi.
Positioning in 3-Layer Architecture:
-
L1: Simple Sync (automation/sync.zod.ts) - Business users - Sync Salesforce to Sheets
-
L2: ETL Pipeline (THIS FILE) - Data engineers - Aggregate 10 sources to warehouse
-
L3: Enterprise Connector (integration/connector.zod.ts) - System integrators - Full SAP integration
ETL pipelines enable automated data synchronization between systems, transforming
data as it moves from source to destination.
SCOPE: Advanced multi-source, multi-stage transformations.
Supports complex operations: joins, aggregations, filtering, custom SQL.
When to Use This Layer
Use ETL Pipeline when:
-
Combining data from multiple sources
-
Need aggregations, joins, transformations
-
Building data warehouses or analytics platforms
-
Complex data transformations required
Examples:
-
Sales data from Salesforce + Marketing from HubSpot → Data Warehouse
-
Multi-region databases → Consolidated reporting
-
Legacy system migration with transformation
When to downgrade:
- Simple 1:1 sync → Use Simple Sync
When to upgrade:
- Need full connector lifecycle (auth, webhooks, rate limits) → Use Enterprise Connector
@see ./sync.zod.ts for Level 1 (simple sync)
@see ../integration/connector.zod.ts for Level 3 (enterprise integration)
Use Cases
- Data Warehouse Population
-
Extract from multiple operational systems
-
Transform to analytical schema
-
Load into data warehouse
- System Integration
-
Sync data between CRM and Marketing Automation
-
Keep product catalog synchronized across e-commerce platforms
-
Replicate data for backup/disaster recovery
- Data Migration
-
Move data from legacy systems to modern platforms
-
Consolidate data from multiple sources
-
Split monolithic databases into microservices
@see https://airbyte.com/
@see https://docs.fivetran.com/
@example
const salesforceToDB: ETLPipeline = \{
name: 'salesforce_to_postgres',
label: 'Salesforce Accounts to PostgreSQL',
source: \{
type: 'api',
connector: 'salesforce',
config: \{ object: 'Account' \}
\},
destination: \{
type: 'database',
connector: 'postgres',
config: \{ table: 'accounts' \}
\},
transformations: [
\{ type: 'map', config: \{ 'Name': 'account_name' \} \}
],
schedule: '0 2 * * *' // Daily at 2 AM
\}Source: packages/spec/src/automation/etl.zod.ts
TypeScript Usage
import { ETLDestination, ETLEndpointType, ETLPipeline, ETLPipelineRun, ETLRunStatus, ETLSource, ETLSyncMode, ETLTransformation, ETLTransformationType } from '@objectstack/spec/automation';
import type { ETLDestination, ETLEndpointType, ETLPipeline, ETLPipelineRun, ETLRunStatus, ETLSource, ETLSyncMode, ETLTransformation, ETLTransformationType } from '@objectstack/spec/automation';
// Validate data
const result = ETLDestination.parse(data);ETLDestination
Properties
| Property | Type | Required | Description |
|---|---|---|---|
| type | Enum<'database' | 'api' | 'file' | 'stream' | 'object' | 'warehouse' | 'storage' | 'spreadsheet'> | ✅ | Destination type |
| connector | string | optional | Connector ID |
| config | Record<string, any> | ✅ | Destination configuration |
| writeMode | Enum<'append' | 'overwrite' | 'upsert' | 'merge'> | ✅ | How to write data |
| primaryKey | string[] | optional | Primary key fields |
ETLEndpointType
Allowed Values
databaseapifilestreamobjectwarehousestoragespreadsheet
ETLPipeline
Properties
| Property | Type | Required | Description |
|---|---|---|---|
| name | string | ✅ | Pipeline identifier (snake_case) |
| label | string | optional | Pipeline display name |
| description | string | optional | Pipeline description |
| source | Object | ✅ | Data source |
| destination | Object | ✅ | Data destination |
| transformations | Object[] | optional | Transformation pipeline |
| syncMode | Enum<'full' | 'incremental' | 'cdc'> | ✅ | Sync mode |
| schedule | string | optional | Cron schedule expression |
| enabled | boolean | ✅ | Pipeline enabled status |
| retry | Object | optional | Retry configuration |
| notifications | Object | optional | Notification settings |
| tags | string[] | optional | Pipeline tags |
| metadata | Record<string, any> | optional | Custom metadata |
ETLPipelineRun
Properties
| Property | Type | Required | Description |
|---|---|---|---|
| id | string | ✅ | Run identifier |
| pipelineName | string | ✅ | Pipeline name |
| status | Enum<'pending' | 'running' | 'succeeded' | 'failed' | 'cancelled' | 'timeout'> | ✅ | Run status |
| startedAt | string | ✅ | Start time |
| completedAt | string | optional | Completion time |
| durationMs | number | optional | Duration in ms |
| stats | Object | optional | Run statistics |
| error | Object | optional | Error information |
| logs | string[] | optional | Execution logs |
ETLRunStatus
Allowed Values
pendingrunningsucceededfailedcancelledtimeout
ETLSource
Properties
| Property | Type | Required | Description |
|---|---|---|---|
| type | Enum<'database' | 'api' | 'file' | 'stream' | 'object' | 'warehouse' | 'storage' | 'spreadsheet'> | ✅ | Source type |
| connector | string | optional | Connector ID |
| config | Record<string, any> | ✅ | Source configuration |
| incremental | Object | optional | Incremental extraction config |
ETLSyncMode
Allowed Values
fullincrementalcdc
ETLTransformation
Properties
| Property | Type | Required | Description |
|---|---|---|---|
| name | string | optional | Transformation name |
| type | Enum<'map' | 'filter' | 'aggregate' | 'join' | 'script' | 'lookup' | 'split' | 'merge' | 'normalize' | 'deduplicate'> | ✅ | Transformation type |
| config | Record<string, any> | ✅ | Transformation config |
| continueOnError | boolean | ✅ | Continue on error |
ETLTransformationType
Allowed Values
mapfilteraggregatejoinscriptlookupsplitmergenormalizededuplicate