GitHub: dbos_experiments/exp17
ELT Pipeline with DBOS Workflows
A resilient, distributed ELT (Extract, Load, Transform) pipeline built with DBOS workflows, featuring automatic failure recovery, memory-efficient batching, and data deduplication.
Overview
This project demonstrates a production-ready ELT pipeline that processes users from multiple external APIs across different organizations and integrations. The pipeline uses DBOS workflows to provide automatic recovery from any failure, ensuring exactly-once semantics and progress tracking.
Hybrid Database Architecture:
- DuckDB (OLAP): Stores raw untreated data (staging and CDC tables)
- SQLite (OLTP): Stores final treated and unique data (latest table and integrations)
Features
- π Hierarchical Workflows: Four-level workflow orchestration for fine-grained recovery
- πΎ Memory-Efficient Batching: Two-level batching strategy prevents OOM errors
- π Automatic Recovery: Built-in retry logic for transient failures
- π§Ή Data Deduplication: SQL window functions handle duplicates from retries
- β° Scheduled Execution: Daily automated runs via cron-like scheduling
- π Remote Triggering: Client can trigger workflows on demand
- β€οΈ Health Monitoring: HTTP health check endpoint for service monitoring
- π’ Multi-Tenant Architecture: Complete data isolation per organization and integration
Project Structure
exp17/
βββ elt.py # Main ELT pipeline workflows
βββ server.py # DBOS server with health check
βββ client.py # Remote workflow triggering client
βββ data.py # Data models and fake data generation
βββ db.py # Database operations (DuckDB + SQLite)
βββ ARCHITECTURE.md # Detailed architecture documentation
βββ data.db # SQLite database - OLTP (created at runtime)
βββ data_olap.db # DuckDB database - OLAP (created at runtime)
Quick Start
Prerequisites
- Python 3.11+
- PostgreSQL (for DBOS system database)
- Poetry or pip for dependency management
Installation
# Install dependencies
poetry install
# Or with pip
pip install dbos faker
Environment Setup
Set the PostgreSQL connection string for DBOS:
export DBOS_DATABASE_URL="postgresql://postgres:dbos@localhost:5432/test?connect_timeout=10"
Running the Server
Start the ELT pipeline server:
python server.py
This will:
- Initialize the SQLite database and seed test data
- Launch DBOS and register all workflows
- Start a health check server on port 8080
- Begin processing scheduled workflows
Health check endpoint:
curl http://localhost:8080/health
Using the Client
The client allows you to trigger workflows remotely:
# Trigger the root orchestration workflow (processes all integrations)
python client.py start root_orchestration_workflow
# Trigger ELT pipeline for a specific org/integration
python client.py start elt_pipeline_workflow \
organization_id=org-001 \
connected_integration_id=<uuid-here>
# Trigger with custom batch parameters
python client.py start extract_and_load_workflow \
organization_id=org-001 \
connected_integration_id=<uuid-here> \
num_batches=5 \
batch_size=20
# Check workflow status
python client.py status <workflow-id>
# List all workflows
python client.py list
# List workflows with filters
python client.py list status=PENDING
python client.py list status=SUCCESS limit=10
Architecture
Workflow Hierarchy
Scheduled Trigger (daily at 5:20 AM GMT)
βββ Root Orchestration Workflow
βββ ELT Pipeline Workflow (per org/integration)
βββ Extract & Load Workflow (to DuckDB)
β βββ Extract & Load Batch Workflow (per batch)
β βββ fetch_users_from_api (per page)
β βββ insert_users_to_staging (DuckDB - per batch)
βββ CDC Detection Workflow (in DuckDB)
βββ Apply to Latest Workflow (DuckDB β SQLite)
Batching Strategy
The pipeline uses a two-level batching approach to manage memory:
- 10 batches Γ 10 pages per batch = 100 total pages
- 10 users per page = 1,000 users per org/integration
- Only one batch (100 users) in memory at a time
This prevents OOM errors while maintaining efficiency.
Data Deduplication
Uses SQL window functions to handle duplicates created by automatic retries:
WITH ranked_users AS (
SELECT *,
ROW_NUMBER() OVER (
PARTITION BY id, workflow_id
ORDER BY created_at DESC
) as rn
FROM users
)
SELECT COUNT(*) FROM ranked_users WHERE rn = 1
See ARCHITECTURE.md for detailed architecture documentation.
Database Schema
The pipeline uses a hybrid database architecture with DuckDB (OLAP) and SQLite (OLTP):
SQLite (OLTP) - Final Treated Data
connected_integrations
Stores integration configurations for each organization:
CREATE TABLE connected_integrations (
id TEXT PRIMARY KEY,
organization_id TEXT NOT NULL,
provider TEXT NOT NULL,
provider_data TEXT NOT NULL -- JSON: {permission_source_name, api_endpoint, ...}
)
users_latest
Latest state table containing deduplicated, current records:
CREATE TABLE users_latest (
id TEXT NOT NULL,
external_id TEXT NOT NULL,
organization_id TEXT NOT NULL,
connected_integration_id TEXT NOT NULL,
name TEXT NOT NULL,
email TEXT NOT NULL,
content_hash TEXT NOT NULL,
last_updated DATETIME DEFAULT(datetime('subsec')),
PRIMARY KEY (id, organization_id, connected_integration_id)
)
Multi-Tenant Primary Key: (id, organization_id, connected_integration_id)
- Enforces uniqueness per user per organization per integration
- Allows same user ID to exist in different tenant contexts
DuckDB (OLAP) - Raw Untreated Data
users_staging
Staging table for raw data extracted from APIs (supports duplicates for idempotency):
CREATE TABLE users_staging (
id VARCHAR NOT NULL,
workflow_id VARCHAR NOT NULL,
external_id VARCHAR NOT NULL,
organization_id VARCHAR NOT NULL,
connected_integration_id VARCHAR NOT NULL,
name VARCHAR NOT NULL,
email VARCHAR NOT NULL,
content_hash VARCHAR NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (id, organization_id, connected_integration_id, workflow_id, created_at)
)
Multi-Tenant Primary Key: (id, organization_id, connected_integration_id, workflow_id, created_at)
- First 3 fields form the multi-tenant user identity
- User IDs can be duplicated across different organizations/integrations
workflow_idandcreated_atenable idempotent retries
users_cdc
Change Data Capture table that tracks INSERT, UPDATE, and DELETE operations:
CREATE TABLE users_cdc (
id VARCHAR NOT NULL,
workflow_id VARCHAR NOT NULL,
external_id VARCHAR NOT NULL,
organization_id VARCHAR NOT NULL,
connected_integration_id VARCHAR NOT NULL,
name VARCHAR NOT NULL,
email VARCHAR NOT NULL,
content_hash VARCHAR NOT NULL,
change_type VARCHAR NOT NULL, -- 'INSERT', 'UPDATE', 'DELETE'
detected_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (id, organization_id, connected_integration_id, workflow_id, detected_at)
)
Multi-Tenant Primary Key: (id, organization_id, connected_integration_id, workflow_id, detected_at)
- Ensures CDC records are isolated per organization/integration
workflow_idanddetected_atsupport idempotent detection
ELT Pipeline Stages
The pipeline follows a proper ELT flow with three distinct stages:
Stage 1: Extract & Load to DuckDB (OLAP)
- Fetches data from external APIs in batches
- Loads raw data into DuckDB
users_stagingtable - Handles duplicates from retries using composite primary key
- Uses window functions for deduplication
Stage 2: CDC Detection in DuckDB
- Compares deduplicated staging data (DuckDB) with
users_latest(SQLite) - Identifies INSERT operations (new records)
- Identifies UPDATE operations (changed records) using content hash comparison
- Identifies DELETE operations (removed records)
- Populates DuckDB
users_cdctable with detected changes - Uses DuckDBβs ATTACH feature to query across databases
Stage 3: Apply CDC to SQLite (OLTP)
- Reads CDC records from DuckDB
- Applies INSERT/UPDATE operations to SQLite
users_latesttable - Applies DELETE operations to SQLite
users_latesttable - Bridges the OLAP (DuckDB) and OLTP (SQLite) databases
Content Hash-Based Change Detection:
The pipeline uses an MD5 content hash for efficient change detection:
- Each record has a
content_hashcomputed fromname:email - UPDATEs are detected by comparing hashes instead of individual fields
- Benefits: Simpler SQL, easy to extend with more fields, faster with many columns
def compute_content_hash(name: str, email: str) -> str:
"""Compute MD5 hash of user content fields."""
content = f"{name}:{email}"
return hashlib.md5(content.encode("utf-8")).hexdigest()
SQL Logic (Multi-Tenant Aware):
-- INSERTs: Records in staging but not in latest
SELECT * FROM staging_deduped
LEFT JOIN users_latest
ON staging.id = latest.id
AND staging.organization_id = latest.organization_id
AND staging.connected_integration_id = latest.connected_integration_id
WHERE latest.id IS NULL
-- UPDATEs: Records in both with different content_hash
SELECT * FROM staging_deduped
INNER JOIN users_latest
ON staging.id = latest.id
AND staging.organization_id = latest.organization_id
AND staging.connected_integration_id = latest.connected_integration_id
WHERE staging.content_hash != latest.content_hash
-- DELETEs: Records in latest but not in current staging
SELECT * FROM sqlite_db.users_latest
LEFT JOIN staging_deduped
ON latest.id = staging.id
AND latest.organization_id = staging.organization_id
AND latest.connected_integration_id = staging.connected_integration_id
WHERE staging.id IS NULL
All JOINs include organization_id and connected_integration_id to ensure proper multi-tenant isolation.
Cross-Database Queries:
DuckDBβs ATTACH feature is used to query across databases:
-- In DuckDB, attach SQLite database
ATTACH 'data.db' AS sqlite_db (TYPE SQLITE);
-- Now can join DuckDB staging with SQLite latest
SELECT * FROM users_staging s
LEFT JOIN sqlite_db.users_latest l
ON s.id = l.id ...
Failure Simulation
The pipeline includes intentional failures for testing resilience:
| Failure Type | Location | Probability | Retry Strategy |
|---|---|---|---|
| API Failure | fetch_users_from_api |
2% | 3 attempts with exponential backoff |
| DB Failure | insert_users_to_staging |
40% | 10 attempts with fast backoff (0.1s) |
| OOM Error | extract_and_load_workflow |
5% (after batch 5) | Workflow recovery from checkpoint |
Retry Configuration Examples
# API step with retries
@DBOS.step(retries_allowed=True, max_attempts=3)
def fetch_users_from_api(...):
...
# Database step with aggressive retries
@DBOS.step(retries_allowed=True, max_attempts=10, backoff_rate=0.1, interval_seconds=0.1)
def insert_users_to_staging(...):
...
# Workflow with recovery
@DBOS.workflow(max_recovery_attempts=100)
def extract_and_load_workflow(...):
...
Workflow Monitoring
Using the Client
# Check specific workflow
python client.py status <workflow-id>
# List recent workflows
python client.py list limit=20
# Filter by status
python client.py list status=SUCCESS
python client.py list status=ERROR
DBOS Admin Interface
Access the DBOS admin interface (if enabled):
http://localhost:3001
Configuration
DBOS Configuration
The server uses the following DBOS configuration:
config = {
"name": "elt-pipeline",
"database_url": os.getenv("DBOS_DATABASE_URL"),
"log_level": "DEBUG",
}
Schedule Configuration
To change the schedule, modify the cron expression in elt.py:
@DBOS.scheduled("20 5 * * *") # Current: Daily at 5:20 AM GMT
@DBOS.workflow()
def scheduled_elt_trigger(scheduled_time, actual_time):
...
Cron format: minute hour day month dayOfWeek
Examples:
"*/5 * * * *"- Every 5 minutes"0 * * * *"- Every hour"0 0 * * *"- Daily at midnight"0 0 * * 0"- Weekly on Sunday
Batch Configuration
Adjust batch sizes in workflow calls:
# Default: 10 batches Γ 10 pages = 100 pages
extract_and_load_workflow(
organization_id=org_id,
connected_integration_id=integration_id,
num_batches=10, # Number of batches
batch_size=10, # Pages per batch
)
Testing
Run the Test Script
The easiest way to test the complete ELT pipeline:
python test_elt.py
This script will:
- Initialize a fresh database with test data
- Run a complete ELT pipeline for one org/integration
- Display results from all tables (staging, CDC, latest)
- Verify the pipeline worked correctly
Expected output:
π§ͺ Testing ELT Pipeline
============================================================
π¦ Initializing database...
β
Database initialized
π± Seeding connected integrations...
β
Seeded 2 integrations
π Launching DBOS...
π Running ELT pipeline...
β
ELT Pipeline Completed Successfully!
π Results:
Users loaded (staging): 1000
CDC changes detected: 1000 (1000 inserts, 0 updates)
Applied to latest: 1000 changes
Latest table records: 1000
Synced to Postgres: 1000 changes
Test Idempotency
To verify the pipeline handles workflow replays without creating duplicates:
python test_idempotency.py
This script will:
- Run the ELT pipeline with a fixed workflow_id
- Run it AGAIN with the same workflow_id (simulating crash/replay)
- Verify no duplicate records were created
- Confirm idempotency guarantees
Expected output:
π§ͺ Testing ELT Pipeline Idempotency
============================================================
π FIRST RUN - Running ELT pipeline
β
First run completed!
π SECOND RUN - Simulating workflow replay with SAME workflow_id
β
Second run completed!
π IDEMPOTENCY VERIFICATION
1οΈβ£ Checking if results are identical...
β
Users loaded count matches
2οΈβ£ Checking CDC table for duplicates...
β
CDC table unchanged: 1000 records (no duplicates!)
3οΈβ£ Checking Latest table for duplicates...
β
Latest table unchanged: 1000 records (no duplicates!)
π ALL CHECKS PASSED! Pipeline is IDEMPOTENT!
Manual Testing
Run workflows manually for testing:
from elt import root_orchestration_workflow, elt_pipeline_workflow
from dbos import DBOS
# Run a single ELT pipeline
result = elt_pipeline_workflow(
organization_id="org-001",
connected_integration_id="<uuid-here>"
)
print(result)
# Run the full pipeline for all integrations
result = root_orchestration_workflow()
print(result)
Using the Client
Or use the client for remote testing:
# Test a single org/integration
python client.py start elt_pipeline_workflow \
organization_id=org-001 \
connected_integration_id=<uuid-here>
# Test all integrations
python client.py start root_orchestration_workflow
Inspecting the Database
After running the pipeline, inspect the tables:
from db import get_user_count
# Check staging table
staging_count = get_user_count(table_name="users_staging")
print(f"Staging: {staging_count} records")
# Check CDC table
cdc_count = get_user_count(table_name="users_cdc")
print(f"CDC: {cdc_count} changes")
# Check latest table
latest_count = get_user_count(table_name="users_latest")
print(f"Latest: {latest_count} records")
Key Files Explained
elt.py
Main workflow definitions including:
- DBOS steps for API fetching and database insertion
- Sub-workflows for each ELT stage
- Main ELT pipeline orchestration
- Root workflow for processing all integrations
- Scheduled workflow trigger
server.py
DBOS server implementation:
- DBOS initialization and configuration
- Database setup and seeding
- Health check HTTP server (port 8080)
- Graceful shutdown handling
- Multiprocess coordination
client.py
Command-line client for remote workflow triggering:
- Generic parameter parsing (auto-type conversion)
- Workflow enqueueing via DBOSClient
- Status checking and result retrieval
- Workflow listing with filters
data.py
Data models and generation:
UserandConnectedIntegrationdataclasses- Faker-based fake data generation
- Stable UUID generation from external IDs
db.py
Database operations:
- SQLite database and table creation
- Connected integrations seeding
- User insertion with batch support
- Deduplication via SQL window functions
Idempotency and Duplication Prevention
β οΈ CRITICAL: The pipeline is designed to be duplication-proof when workflows crash and replay.
Key mechanisms:
- Staging Table: Window functions deduplicate retry-induced duplicates
- CDC Table: Delete-before-insert ensures no duplicate change records
- Latest Table:
INSERT OR REPLACEprovides upsert semantics - Workflow Recovery: DBOS guarantees steps wonβt re-execute on replay
- Multi-Tenancy: Composite keys ensure isolation per organization/integration
For detailed explanation, see IDEMPOTENCY.md and MULTI_TENANCY.md.
Quick Example
from dbos import SetWorkflowID
# Same workflow_id = idempotent operation
with SetWorkflowID("test-workflow-001"):
result1 = elt_pipeline_workflow(org_id, integration_id)
# Simulate crash and replay - produces same result
with SetWorkflowID("test-workflow-001"):
result2 = elt_pipeline_workflow(org_id, integration_id)
assert result1 == result2 # β
No duplicates!
Production Considerations
Scaling
- Queue Concurrency: Adjust
Queue("elt_queue", concurrency=5)for parallelism - Batch Size: Tune
num_batchesandbatch_sizebased on available memory - Database: Use connection pooling for high throughput
- Monitoring: Integrate with OpenTelemetry for distributed tracing
Error Handling
- All transient failures are automatically retried
- Permanent failures are logged with workflow status
ERROR - Use
DBOS.list_workflows(status="ERROR")to find failed workflows - Resume failed workflows with
DBOS.resume_workflow(workflow_id)
Data Consistency
- Idempotent operations prevent duplicates on replay (see IDEMPOTENCY.md)
- Deduplication handles retry-induced duplicates automatically
- Composite primary key prevents constraint violations
- Window functions ensure latest data is used
- Workflow IDs track data provenance
Troubleshooting
Server wonβt start
Check PostgreSQL connection:
psql $DBOS_DATABASE_URL
Workflows failing immediately
Check logs for configuration errors:
python server.py # Look for DBOS initialization errors
High memory usage
Reduce batch sizes:
num_batches=20, batch_size=5 # More batches, smaller size
Duplicate data
The window function should handle this automatically. Verify with:
from db import get_unique_user_count, get_user_count
total = get_user_count()
unique = get_unique_user_count()
print(f"Total: {total}, Unique: {unique}, Duplicates: {total - unique}")
Resources
License
See repository root for license information.
Recent changes
-
2026-01-17 8e83110 updated models -
2026-01-03 6c2bc43 using OLAP and OLTP DBs -
2025-10-24 b2ae917 remove ex1 -
2025-10-23 e52c602 update architecture -
2025-10-23 00f69f7 update readme -
2025-10-23 8c110cb added hash to compute changes -
2025-10-23 ae7cb64 updated doc -
2025-10-23 fa2ef61 ELT 3rd cut
Categories: experiments, Python
Tags: dbos-experiments
← Previous Β· Next →