Ingestion (EL)
Most of the pipelines page assumes data is already in the lake. This page covers the extract-load side: an entry script owns the extract in whatever language fits the source, and the load goes through a managed materialization so it gets idempotent write strategies, snapshots and the partition grid.
When the source is something DuckDB can query directly (an attached database, files in object storage), a single DuckDB step is both extract and load - see the first worked example below. For everything else (paginated or authenticated APIs, SaaS SDKs, streams), split the step in two:
[entry script, any language] -> s3:///etl/landing/orders.jsonl -> [DuckDB loader, -- materialize] -> ducklake://main/orders
- The entry script talks to the outside world and writes the raw batch to a landing object in workspace storage with
wmill.write_s3_file/wmill.writeS3File. - The loader is a DuckDB script with
-- on s3:///...and-- materialize: it re-runs whenever a new batch lands (the cascade) and owns idempotency, typing and history.
Why two scripts instead of one script writing the lake directly: materialize is DuckDB-only, and lake writes made through SDK helpers are not visible to the asset parser, so a single Python or TypeScript node would deploy with no output edge in the graph. The landing file is not overhead: it is a replayable raw zone that decouples source flakiness from load semantics.
Choosing the extract engine
- DuckDB-native pull, when the source is reachable from DuckDB (
ATTACH 'datatable://...'for a Postgres-backed data table,read_parquet/read_json_autofor files). One node, no landing file, no per-row scripting: the postgres scanner reads vectorized with predicate pushdown. - A plain fetch loop (TypeScript or Python). For REST APIs the bottleneck is the API itself (rate limits, page sizes, network latency), not the language. The script only shuttles JSON to the landing object; all row work happens in the DuckDB loader.
- dlt (Python), when you want its source ecosystem:
RESTClientwith pluggable paginators and auth for custom sources, or a verified source for a SaaS API. Keep dlt for extraction only; its own normalize/load pipeline duplicates what the managed loader already does, outside the graph.
For streaming and CDC sources, do not poll at all: use a native trigger marker (kafka, nats, sqs, postgres for logical-replication CDC) on the entry script. The broker's offset tracking replaces the cursor.
Incremental cursors
wmill.get_state() / wmill.set_state(v) persist a value per script (states). That is the incremental cursor:
last_id = wmill.get_state() or 0 # first run: full ingest
rows = [r for r in fetch_pages() if r["id"] > last_id]
...
wmill.set_state(max(r["id"] for r in rows))
- Set the cursor only after the landing write succeeds, so a failed run re-extracts the same window.
- Prefer a monotonic source column (
id,updated_at). Forupdated_atcursors, overlap the window slightly and let akey=merge loader dedup, otherwise clock skew and late commits drop rows. - The lookback-window variant needs no state at all: re-read everything touched in the last N days and merge by key. This is the only option for pure-DuckDB entries (no
get_statethere) and is very robust; the first worked example uses it.
To re-ingest from scratch, clear the state (run once with wmill.set_state(None), or edit it in the script settings) and run the entry script. What happens on load depends on the strategy: key= merge is safe, replace rebuilds, but append duplicates the table on a full re-ingest, so drop the table first or switch the loader to key= for the backfill. Re-loading without re-extracting is cheaper: re-run just the loader, or use backfill for partitioned targets.
Schema drift on load
The landing file carries whatever the source sent, so read_json_auto picks a new column up on the next load. What happens then depends on the strategy:
| Strategy | On a new or removed column |
|---|---|
| replace, unpartitioned (default) | Re-derives the schema every run; drift flows through. |
| replace, partitioned | Schema frozen at first run; a drifted batch fails the run. |
key= merge | Frozen at first run; drifted batch fails the run. |
| append | Frozen at first run; drifted batch fails the run. |
key=... history (SCD2) | Frozen by design; manual rebuild required. |
A failing run is the schema contract firing. Recovery options, in order of typicality: project explicitly in the loader (SELECT id, name, ... FROM read_json_auto(...)) so new source columns are ignored until you opt in; evolve the table (ALTER TABLE lake.t ADD COLUMN ...) then widen the SELECT; or rebuild (replace for one run, or drop the table and let the next run bootstrap the new shape).
Worked examples
Three complete pipelines, one per extract engine. The entry bodies point at a public demo API so they run as-is; swap the URL, the source table and the cursor column for your source. All scripts of a pipeline go in the same folder (f/etl here).
Postgres to lake, pure DuckDB incremental pull. One node, no cursor state: the lookback window plus merge-by-key makes re-pulls idempotent, so the window just has to be wider than the schedule gap. f/etl/customers_lake:
-- pipeline
-- on schedule
-- materialize ducklake://main/customers_lake key=id
ATTACH 'datatable://main' AS pg;
SELECT *
FROM pg.customers
WHERE updated_at::TIMESTAMP > now()::TIMESTAMP - INTERVAL 7 DAY;
REST API to lake with a fetch loop. Entry f/etl/comments_api (TypeScript):
// pipeline
// on schedule
import * as wmill from 'windmill-client';
export async function main() {
// Cursor persists per-script; reset it to backfill.
const lastId: number = (await wmill.getState()) ?? 0;
const rows: any[] = [];
for (let page = 1; ; page++) {
const res = await fetch(`https://jsonplaceholder.typicode.com/comments?_page=${page}&_limit=100`);
if (!res.ok) throw new Error(`source API returned ${res.status}`);
const batch: any[] = await res.json();
if (batch.length === 0) break;
rows.push(...batch.filter((r) => r.id > lastId));
}
if (rows.length === 0) return { ingested: 0, cursor: lastId };
await wmill.writeS3File(
's3:///pipelines/etl/landing/comments_api.jsonl',
rows.map((r) => JSON.stringify(r)).join('\n')
);
const cursor = Math.max(...rows.map((r) => r.id));
await wmill.setState(cursor);
return { ingested: rows.length, cursor };
}
Loader f/etl/comments_api_load (DuckDB), auto-runs when a batch lands:
-- pipeline
-- on s3:///pipelines/etl/landing/comments_api.jsonl
-- materialize ducklake://main/comments_api append
SELECT * FROM read_json_auto('s3:///pipelines/etl/landing/comments_api.jsonl');
REST API to lake with dlt extraction. Same shape, with dlt's RESTClient doing the pagination (auto-detects header-link, cursor and offset styles) and a key= merge loader so re-loads are idempotent. Entry f/etl/orders_dlt (Python):
# pipeline
# on schedule
import json
import wmill
from dlt.sources.helpers.rest_client import RESTClient
def main():
last_id = wmill.get_state() or 0
client = RESTClient(base_url="https://jsonplaceholder.typicode.com")
rows = []
for page in client.paginate("/posts", params={"_page": 1, "_limit": 25}):
rows.extend(r for r in page if r["id"] > last_id)
if not rows:
return {"ingested": 0, "cursor": last_id}
wmill.write_s3_file(
"s3:///pipelines/etl/landing/orders_dlt.jsonl",
"\n".join(json.dumps(r) for r in rows).encode(),
)
cursor = max(r["id"] for r in rows)
wmill.set_state(cursor)
return {"ingested": len(rows), "cursor": cursor}
Loader f/etl/orders_dlt_load (DuckDB):
-- pipeline
-- on s3:///pipelines/etl/landing/orders_dlt.jsonl
-- materialize ducklake://main/orders_dlt key=id
SELECT * FROM read_json_auto('s3:///pipelines/etl/landing/orders_dlt.jsonl');
Ingestion gotchas
- Use one spelling everywhere:
s3:///<key>(triple slash targets the workspace default storage;s3://<storage>/<key>for a named storage). The same URI works in SDK calls,// onannotations and DuckDB SQL, so the write edge, the trigger and the read edge all connect. Watch the slashes: in annotations and DuckDB SQL,s3://k(double slash) resolves to asset pathkwhiles3:///kresolves to/k(what SDK writes produce), so one missing slash silently splits the lineage; in SDK callss3://kis a malformed URI and raises. - Bare key strings are rejected by the SDK (clients newer than 1.746.0 raise with a message pointing at the
s3:///spelling; older clients silently uploaded to an auto-generated name underwindmill_uploads/). An auto-generated key is requested by omitting the object, never by a bare or empty string. The object formsS3Object(s3="<key>")/{ s3: '<key>' }stay equivalent to the URI. - Do not write an empty landing batch: a zero-row extraction should return early (the examples do), otherwise the loader re-runs for nothing and
read_json_autoerrors on an empty file. - Append loaders assume disjoint batches. The cursor guarantees that, but a manual re-run of the loader on an already-loaded landing file duplicates rows; if operators may re-run freely, prefer
key=merge.