Quick Start Tutorial: ETL Automatisierung (Operations)

What You'll Build
Interessiert an diesem Thema?
Kontaktieren Sie uns für eine kostenlose Beratung →Die häufigsten Fehler entstehen bei der Webhook-Retry-Logik
Beispiel: Enterprise E-Commerce Platform
Fehlerrate von 2.5% auf 0.3% gesenkt
Letzte Woche haben wir bei einem Team eine „automatisierte“ ETL-Strecke übernommen, die eigentlich nur ein Cronjob mit einem 800-Zeilen-Skript war. Klingt erstmal okay. Bis der Job zweimal lief, Daten doppelt schrieb und danach sechs Stunden Backfill nötig waren. Genau da fängt ETL Automatisierung in der Operations-Praxis an: nicht beim ersten erfolgreichen Run, sondern beim zehnten, wenn Netzwerk, Quellsystem und Schema gleichzeitig wackeln.
Du baust hier eine kleine, aber belastbare ETL-Automatisierung in TypeScript. Plattform-agnostisch. Ohne Magie. Mit Fokus auf Betrieb: Wiederholbarkeit, Observability, saubere Failure-Modes.
- Extract: Daten aus einer Quelle holen (hier simuliert per HTTP).
- Transform: Minimale Normalisierung + Validierung.
- Load: Schreiben in ein Ziel (hier beispielhaft über eine Repository-Schnittstelle).
- Ops-Schicht: Idempotenz, Checkpoints, Retries, Timeouts, Dead-Letter.
Benchmarks, die ich im Betrieb als „gesund“ ansehe (keine Universalwerte, nur Orientierung): p95 Job-Laufzeit stabil, Fehlerrate < 1% pro Run, Retry-Rate nicht dauerhaft steigend, und ein klarer „freshness“-SLO wie „Daten sind < 30 Minuten alt“ (je nach Business).
Prerequisites
Du brauchst nicht viel. Aber die Details zählen. Wenn du nur „läuft auf meinem Laptop“ erreichst, hast du keine Automatisierung, sondern ein Experiment.
- Node.js (aktuell) und TypeScript-Setup.
- Eine persistente Ablage für Checkpoints (Datenbank, KV-Store, oder notfalls ein File in einem Volume). Lokal geht auch, aber denk wie Ops.
- Ein Zielsystem (Data Warehouse, Datenbank, Search Index). Im Tutorial abstrahiert.
Kurze Checkliste für Betrieb (die drei, die ich am häufigsten nachrüste):
- Timeouts pro Request und pro Job.
- Idempotente Loads (Dedup-Key oder Upsert).
- Messpunkte: Laufzeit, Rows gelesen/geschrieben, Errors, Retry-Zähler.
Step-by-Step Guide
Ich starte ungewohnt: zuerst mit der „Ops-Hülle“. Das ist der Teil, der im Alltag Zeit spart. ETL-Logik kann man später verbessern. Fehlende Idempotenz rächt sich sofort.
- Definiere deinen Run-State: checkpoint + runId + Metriken.
- Baue Extract mit Pagination und Timeouts.
- Transform: so wenig wie möglich, aber valide.
- Load: Upsert/Dedup. Kein „blind insert“.
- Failure-Handling: Retries, DLQ, und ein checkpoint nur bei Erfolg.
Ein typisches Beispiel für eine kleine ETL-Engine ist ein Runner, der Retries, Timeouts und Checkpoints standardisiert. Der Code wirkt länger als nötig. Ist er auch. Aber er ist billiger als nächtliche Debug-Sessions.
// etlRunner.ts
import { setTimeout as sleep } from "node:timers/promises";
type Checkpoint = {
cursor: string | null; // z.B. "updated_at" Cursor oder Page-Token
updatedAt: number;
};
type Metrics = {
extracted: number;
loaded: number;
retries: number;
durationMs: number;
};
type RunResult = {
ok: boolean;
metrics: Metrics;
checkpoint?: Checkpoint;
error?: string;
};
export interface CheckpointStore {
get(key: string): Promise<Checkpoint | null>;
set(key: string, cp: Checkpoint): Promise<void>;
}
export interface DeadLetterQueue {
publish(event: { runId: string; reason: string; payload: unknown }): Promise<void>;
}
export async function runEtlJob(opts: {
jobKey: string;
runId: string;
store: CheckpointStore;
dlq: DeadLetterQueue;
maxRuntimeMs: number;
maxRetries: number;
extractTransformLoad: (cp: Checkpoint | null) => Promise<{ nextCheckpoint: Checkpoint; extracted: number; loaded: number }>;
}): Promise<RunResult> {
const started = Date.now();
const metrics: Metrics = { extracted: 0, loaded: 0, retries: 0, durationMs: 0 };
const previousCp = await opts.store.get(opts.jobKey);
// Quick fix for now: global runtime guard
const deadline = started + opts.maxRuntimeMs;
let attempt = 0;
while (attempt <= opts.maxRetries) {
attempt++;
try {
if (Date.now() > deadline) throw new Error("job timeout exceeded");
const { nextCheckpoint, extracted, loaded } = await opts.extractTransformLoad(previousCp);
metrics.extracted = extracted;
metrics.loaded = loaded;
// Wichtig: Checkpoint nur nach erfolgreichem Load.
await opts.store.set(opts.jobKey, nextCheckpoint);
metrics.durationMs = Date.now() - started;
return { ok: true, metrics, checkpoint: nextCheckpoint };
} catch (e: any) {
metrics.retries += attempt <= opts.maxRetries ? 1 : 0;
const msg = e?.message ? String(e.message) : "unknown error";
const isLast = attempt > opts.maxRetries;
if (isLast) {
await opts.dlq.publish({
runId: opts.runId,
reason: msg,
payload: { jobKey: opts.jobKey, previousCp }
});
metrics.durationMs = Date.now() - started;
return { ok: false, metrics, error: msg };
}
// Backoff. Nicht zu fancy. TODO: jitter nachrüsten.
await sleep(400 * attempt);
}
}
metrics.durationMs = Date.now() - started;
return { ok: false, metrics, error: "unreachable" };
}
Was ist daran operational wichtig? Der Checkpoint wird nur geschrieben, wenn der Load durch ist. Das verhindert „Fortschritt ohne Daten“. Ein Fehler, den ich oft sehe: Checkpoint nach Extract setzen, dann bricht Load ab, und du verlierst Datensätze. Klingt theoretisch. Passiert ständig.
Jetzt füllen wir extractTransformLoad mit einer pragmatischen Implementierung: inkrementell ziehen, leicht transformieren, idempotent schreiben. Der nächste Code zeigt genau das. Nicht hübsch. Aber robust genug für den Anfang.
// jobImpl.ts
type SourceRow = { id: string; updated_at: string; amount: number | string; currency?: string };
type TargetRow = { id: string; updatedAt: number; amountCents: number; currency: string };
export interface TargetRepo {
upsertMany(rows: TargetRow[]): Promise<{ insertedOrUpdated: number }>;
}
async function fetchPage(opts: {
baseUrl: string;
cursor: string | null;
timeoutMs: number;
}): Promise<{ items: SourceRow[]; nextCursor: string | null }> {
const ctrl = new AbortController();
const t = setTimeout(() => ctrl.abort(), opts.timeoutMs);
try {
const url = new URL(opts.baseUrl);
if (opts.cursor) url.searchParams.set("cursor", opts.cursor);
const res = await fetch(url.toString(), { signal: ctrl.signal });
if (!res.ok) throw new Error(`source http ${res.status}`);
const data = (await res.json()) as any;
return { items: data.items ?? [], nextCursor: data.nextCursor ?? null };
} finally {
clearTimeout(t);
}
}
function toTarget(row: SourceRow): TargetRow | null {
// Achtung: häufiger Stolperstein sind leere updated_at Werte
const ts = Date.parse(row.updated_at);
if (!Number.isFinite(ts)) return null;
const amount = typeof row.amount === "string" ? Number(row.amount) : row.amount;
if (!Number.isFinite(amount)) return null;
return {
id: row.id,
updatedAt: ts,
amountCents: Math.round(amount * 100),
currency: row.currency ?? "EUR"
};
}
export function makeExtractTransformLoad(opts: {
sourceBaseUrl: string;
repo: TargetRepo;
pageTimeoutMs: number;
maxPagesPerRun: number; // schützt dich vor endlosen Läufen
}) {
return async (cp: { cursor: string | null } | null) => {
let cursor = cp?.cursor ?? null;
let pages = 0;
let extracted = 0;
let loaded = 0;
while (pages < opts.maxPagesPerRun) {
pages++;
const { items, nextCursor } = await fetchPage({
baseUrl: opts.sourceBaseUrl,
cursor,
timeoutMs: opts.pageTimeoutMs
});
if (items.length === 0) {
// Nichts mehr zu holen.
cursor = nextCursor;
break;
}
extracted += items.length;
const transformed: TargetRow[] = [];
for (const it of items) {
const t = toTarget(it);
if (t) transformed.push(t);
// else: Drop. Später evtl. eigene DLQ pro Datensatz.
}
if (transformed.length) {
const r = await opts.repo.upsertMany(transformed); // Idempotenz über Upsert
loaded += r.insertedOrUpdated;
}
cursor = nextCursor;
if (!cursor) break;
}
return {
nextCheckpoint: { cursor, updatedAt: Date.now() },
extracted,
loaded
};
};
}
Warum maxPagesPerRun? Weil „unendlich Daten“ als Failure-Mode real ist: Bug im Cursor, Quellsystem liefert denselben Cursor zurück, oder du hast ein neues Backfill und der Job läuft Stunden. In Ops willst du kontrollierte Läufe. Lieber mehrere Runs als ein Monster-Run.
Übrigens: In echten Pipelines trenne ich oft DLQ für Run-Level (Job kaputt) und Record-Level (ein Datensatz kaputt). Das sprengt hier den Rahmen, aber es ist eine der effektivsten Maßnahmen, um Fehlerraten zu senken, ohne den Datenfluss zu blockieren.
Testing & Verification
ETL Automatisierung ohne Tests ist wie Alerting ohne Pager. Es sieht gut aus, bis es ernst wird. Ich halte es simpel: ein paar gezielte Checks, die die typischen Ops-Fehler abfangen.
- Idempotenz-Test: denselben Run zweimal ausführen, Ergebnis darf nicht wachsen (oder nur erwartbar via Updates).
- Checkpoint-Test: Simuliere Fehler im Load. Checkpoint darf nicht vorlaufen.
- Timeout-Test: Eine Page-Response verzögert sich. Job soll sauber fehlschlagen oder retryen.
- Durchsatz & Laufzeit: Miss p50/p95 Laufzeit über mehrere Runs. Nicht einmalig.
Kurze Checkliste für „Production Readiness“ (mehr braucht’s am Anfang oft nicht):
- Logs enthalten runId, jobKey, extracted, loaded, durationMs.
- Mindestens ein Counter für Errors und Retries.
- Ein Freshness-Indikator: „Letzter erfolgreicher Checkpoint“.
Verifikation im Betrieb mache ich gern über drei Metriken, weil sie schnell auffallen: durationMs (Trend), retries (Spikes), loaded/extracted Ratio (Drops durch Transform/Validation). Wenn die Ratio plötzlich von ~0,98 auf 0,60 fällt, ist selten „Zufall“. Meist Schemaänderung oder neuer Datentyp.
Next Steps
Wenn das Grundgerüst läuft, kommt die echte Arbeit. Und ja, das klingt widersprüchlich. Aber Automatisierung ist ein Produkt, kein Skript.
- Concurrency kontrollieren: Parallele Pages können Durchsatz erhöhen, aber erhöhen auch Rate-Limits und Fehlerbilder. Ich gehe meist konservativ ran.
- Schema-Drift absichern: Contract-Tests gegen die Quelle, plus „quarantine“ für neue Felder.
- Backfills als eigener Modus: gleiche Logik, andere Checkpoints, andere Limits. Sonst ruinierst du deine normalen SLAs.
- Observability sauber machen: Metriken in dein Monitoring, Alerts mit sinnvollen Schwellen. Nicht zu viele. Zu viele Alerts sind auch ein Ausfall.
Ab hier lohnt sich oft ein kleiner Architektur-Refactor: Trenne Extract/Transform/Load in Module, aber lass die Ops-Hülle zentral. Nebenbei bemerkt: Wenn du heute „nur“ einen Cron hast, ist das okay. Wichtig ist, dass dein Job deterministisch ist und du ihn wieder starten kannst, ohne Angst.
Wenn du das intern ausrollst, plane am Ende noch einen „Game Day“ ein: Quellsystem langsam, 500er-Fehler, kaputte Daten. Eine Stunde Chaos im Test spart dir später mehrere Nächte. Wenn du willst, kann ich dir dafür ein kurzes Runbook-Template geben. Das gehört für mich in die letzten 20% jeder ETL-Automatisierung.


