Data ingestion patterns for loading data from cloud storage, APIs, files, and streaming sources into databases...
This skill provides patterns for getting data INTO systems from external sources.
What is your data source?
├── Cloud Storage (S3, GCS, Azure) → See cloud-storage.md
├── Files (CSV, JSON, Parquet) → See file-formats.md
├── REST/GraphQL APIs → See api-feeds.md
├── Streaming (Kafka, Kinesis) → See streaming-sources.md
├── Legacy Database → See database-migration.md
└── Need full ETL framework → See etl-tools.md
dlt (data load tool) - Modern Python ETL:
import dlt
# Define a source
@dlt.source
def github_source(repo: str):
@dlt.resource(write_disposition="merge", primary_key="id")
def issues():
response = requests.get(f"https://api.github.com/repos/{repo}/issues")
yield response.json()
return issues
# Load to destination
pipeline = dlt.pipeline(
pipeline_name="github_issues",
destination="postgres", # or duckdb, bigquery, snowflake
dataset_name="github_data"
)
load_info = pipeline.run(github_source("owner/repo"))
print(load_info)
Polars for file processing (faster than pandas):
import polars as pl
# Read CSV with schema inference
df = pl.read_csv("data.csv")
# Read Parquet (columnar, efficient)
df = pl.read_parquet("s3://bucket/data.parquet")
# Read JSON lines
df = pl.read_ndjson("events.jsonl")
# Write to database
df.write_database(
table_name="events",
connection="postgresql://user:pass@localhost/db",
if_table_exists="append"
)
S3 ingestion:
import { S3Client, GetObjectCommand } from "@aws-sdk/client-s3";
import { parse } from "csv-parse/sync";
const s3 = new S3Client({ region: "us-east-1" });
async function ingestFromS3(bucket: string, key: string) {
const response = await s3.send(new GetObjectCommand({ Bucket: bucket, Key: key }));
const body = await response.Body?.transformToString();
// Parse CSV
const records = parse(body, { columns: true, skip_empty_lines: true });
// Insert to database
await db.insert(eventsTable).values(records);
}
API feed polling:
import { Hono } from "hono";
// Webhook receiver for real-time ingestion
const app = new Hono();
app.post("/webhooks/stripe", async (c) => {
const event = await c.req.json();
// Validate webhook signature
const signature = c.req.header("stripe-signature");
// ... validation logic
// Ingest event
await db.insert(stripeEventsTable).values({
eventId: event.id,
type: event.type,
data: event.data,
receivedAt: new Date()
});
return c.json({ received: true });
});
High-performance file ingestion:
use polars::prelude::*;
use aws_sdk_s3::Client;
async fn ingest_parquet(client: &Client, bucket: &str, key: &str) -> Result<DataFrame> {
// Download from S3
let resp = client.get_object()
.bucket(bucket)
.key(key)
.send()
.await?;
let bytes = resp.body.collect().await?.into_bytes();
// Parse with Polars
let df = ParquetReader::new(Cursor::new(bytes))
.finish()?;
Ok(df)
}
Concurrent file processing:
package main
import (
"context"
"encoding/csv"
"github.com/aws/aws-sdk-go-v2/service/s3"
)
func ingestCSV(ctx context.Context, client *s3.Client, bucket, key string) error {
resp, err := client.GetObject(ctx, &s3.GetObjectInput{
Bucket: &bucket,
Key: &key,
})
if err != nil {
return err
}
defer resp.Body.Close()
reader := csv.NewReader(resp.Body)
records, err := reader.ReadAll()
if err != nil {
return err
}
// Batch insert to database
return batchInsert(ctx, records)
}
For periodic bulk loads:
Source → Extract → Transform → Load → Validate
↓ ↓ ↓ ↓ ↓
S3 Download Clean/Map Insert Count check
Key considerations:
For continuous data flow:
Source → Buffer → Process → Load → Ack
↓ ↓ ↓ ↓ ↓
Kafka In-memory Transform DB Commit offset
Key considerations:
For external API data:
Schedule → Fetch → Dedupe → Load → Update cursor
↓ ↓ ↓ ↓ ↓
Cron API call By ID Insert Last timestamp
Key considerations:
For database replication:
Source DB → Capture changes → Transform → Target DB
↓ ↓ ↓ ↓
Postgres Debezium/WAL Map schema Insert/Update
Key considerations:
| Use Case | Python | TypeScript | Rust | Go |
|---|---|---|---|---|
| ETL Framework | dlt, Meltano, Dagster | - | - | - |
| Cloud Storage | boto3, gcsfs, adlfs | @aws-sdk/, @google-cloud/ | aws-sdk-s3, object_store | aws-sdk-go-v2 |
| File Processing | polars, pandas, pyarrow | papaparse, xlsx, parquetjs | polars-rs, arrow-rs | encoding/csv, parquet-go |
| Streaming | confluent-kafka, aiokafka | kafkajs | rdkafka-rs | franz-go, sarama |
| CDC | Debezium, pg_logical | - | - | - |
references/cloud-storage.md - S3, GCS, Azure Blob patternsreferences/file-formats.md - CSV, JSON, Parquet, Excel handlingreferences/api-feeds.md - REST polling, webhooks, GraphQL subscriptionsreferences/streaming-sources.md - Kafka, Kinesis, Pub/Subreferences/database-migration.md - Schema migration, CDC patternsreferences/etl-tools.md - dlt, Meltano, Airbyte, Fivetranscripts/validate_csv_schema.py - Validate CSV against expected schemascripts/test_s3_connection.py - Test S3 bucket connectivityscripts/generate_dlt_pipeline.py - Generate dlt pipeline scaffoldAfter ingestion, chain to appropriate database skill:
| Destination | Chain to Skill |
|---|---|
| PostgreSQL, MySQL | databases-relational |
| MongoDB, DynamoDB | databases-document |
| Qdrant, Pinecone | databases-vector (after embedding) |
| ClickHouse, TimescaleDB | databases-timeseries |
| Neo4j | databases-graph |
For vector databases, chain through ai-data-engineering for embedding:
ingesting-data → ai-data-engineering → databases-vector