Partitioned pipelines
Many pipelines materialize one dataset per time bucket (a day, an hour) or per key (a tenant, a shard). Declare this with // partitioned <kind> [options] and use the literal {partition} token in asset paths.
The canonical partitioned step is a DuckDB script that materializes one slice of a DuckLake table per run - the ingest_orders step of the example:
-- pipeline
-- on schedule
-- partitioned daily
-- materialize ducklake://main/raw_orders
-- Pull one day of orders from the shop API. `'{partition}'` is replaced
-- with the run's partition value, e.g. '2026-05-16'.
SELECT *
FROM shop_api_orders()
WHERE created_at::DATE = '{partition}';
For a step that writes one S3 file per partition from Python or TypeScript, declare the path once as a URI constant containing the token. The constant is detected as an asset (mark it as a write in the editor's asset panel), and the code derives the concrete key from it at run time:
# pipeline
# partitioned daily
import wmill
# Detected as an asset; set its access to "write" in the asset panel.
ORDERS_URI = "s3:///lake/raw/orders/{partition}/orders.json"
def main(partition: str):
# `partition` is injected by Windmill, e.g. "2026-05-16"
data = fetch_orders_for(partition)
wmill.write_s3_file(ORDERS_URI.replace("{partition}", partition), data)
The {partition} token stays literal in the lineage graph, so the graph is partition-agnostic. At run time Windmill resolves the concrete partition value once and injects it into the script as a partition argument. The same value is then carried down the whole chain, so every step of one pipeline run materializes the same partition.
Grammar: // partitioned <kind> [option="value" ...]. At most one // partitioned per script; if several are present the first wins.
Partition kinds
| Kind | Default format | Example value |
|---|---|---|
daily | %Y-%m-%d | 2026-05-16 |
hourly | %Y-%m-%dT%H | 2026-05-16T09 |
weekly | %G-W%V (ISO week-year/week) | 2026-W20 |
monthly | %Y-%m | 2026-05 |
dynamic key="<path>" | n/a (value comes from the payload) | acme |
daily, hourly, weekly, monthly are time kinds. dynamic requires a key and reads the value from the triggering payload.
Options by kind
| Option | Applies to | Default | Meaning |
|---|---|---|---|
tz="<IANA tz>" | time kinds | UTC | Resolve the bucket in this timezone. Matters at boundaries: 00:30 UTC can still be the previous local day. An invalid timezone fails the run with a clear error. |
format="<strftime>" | time kinds | per-kind (table above) | Override the rendered string with a standard strftime pattern, e.g. %Y/%m/%d. |
start="YYYY-MM-DD" | time kinds | none | Backfill anchor. A run whose computed partition is strictly before this date resolves to no partition: the run is not failed, but it is also not auto-skipped — it proceeds without a partition argument (a warning is logged), so guard on the argument in your code if you rely on the anchor. An invalid start fails the run with a clear error. |
key="<json path>" | dynamic (required) | none | Where in the triggering payload the value is. Minimal $.a.b.c dotted path only: no array indexing, wildcards or filters. The leaf must be a scalar (string, number or bool). A missing key, a non-scalar leaf, or no payload fails the run with a clear error. |
How the value is resolved
The concrete partition is resolved exactly once, at the top of a chain, by this precedence:
- An explicit
partitionargument on the run (a manual run, a backfill, or a value already propagated from upstream). It is used as-is and never re-resolved. This is what makes the whole chain consistent. - Otherwise, for time kinds: the run's fire time, localized to
tzand rendered withformat. The anchor is the schedule fire time for a scheduled run and the upstream trigger time for a cascaded run (so a chain that crosses midnight stays on one partition). - Otherwise, for
dynamic: the value extracted from the triggering payload (triggerobject if present, otherwise the run arguments) atkey.
The resolved value is injected into the script as a partition argument and persisted, so every downstream step of the same run receives the identical value.
In a // partitioned DuckDB script, the partition argument is declared automatically, so the SQL can reference $partition directly without a manual -- $partition (text) declaration (an explicit declaration still wins). The auto-declared argument is optional in run forms since the platform resolves it at run start.
The {partition} token
Use the literal token {partition} inside asset paths, both in // on declarations and in the URI constants your code derives its write paths from. It stays literal in the lineage graph (the graph is partition-agnostic) and is substituted with the resolved value when the cascade fires. An // on whose path contains {partition} is a partition-bearing input (relevant to joins).
In DuckDB SQL, the token is substituted in the text of managed materialize scripts, always as a complete quoted literal:
'{partition}'(the whole literal is the token) becomes'2026-05-16'- use it in comparisons:WHERE created_at::DATE = '{partition}'.- To build an S3 path around it, concatenate:
read_json('s3:///lake/raw/orders/' || '{partition}' || '/orders.json'). A token in the middle of a longer literal ('s3:///lake/{partition}/x') is not substituted cleanly - keep the token a standalone literal. - The
partitionargument is also auto-declared for// partitionedDuckDB scripts, so the SQL can bind$partitionlike any other argument where a bound parameter is accepted.
Partition kinds and options, with the polyglot URI-constant pattern:
- Daily (defaults)
- Hourly + start anchor
- Daily, timezone + format
- Dynamic (per tenant)
// pipeline
// partitioned daily
import * as wmill from 'windmill-client';
// Detected as an asset; mark it as a write in the editor's asset panel.
const EVENTS_URI = 's3:///datasets/raw/{partition}/events.parquet';
export async function main(partition: string) {
// partition e.g. "2026-05-16"
const key = EVENTS_URI.replace('s3:///', '').replace('{partition}', partition);
await wmill.writeS3File({ s3: key }, await fetchEventsFor(partition));
}
// pipeline
// partitioned hourly start="2026-01-01"
import * as wmill from 'windmill-client';
const DATA_URI = 's3:///lake/{partition}/data.parquet';
export async function main(partition: string) {
// partition e.g. "2026-05-16T09"; nothing materialized before 2026-01-01
const key = DATA_URI.replace('s3:///', '').replace('{partition}', partition);
await wmill.writeS3File({ s3: key }, await build(partition));
}
// pipeline
// partitioned daily tz="America/New_York" format="%Y/%m/%d"
import * as wmill from 'windmill-client';
const DATA_URI = 's3:///lake/{partition}/data.parquet';
export async function main(partition: string) {
// partition rendered as e.g. "2026/05/16" in America/New_York
const key = DATA_URI.replace('s3:///', '').replace('{partition}', partition);
await wmill.writeS3File({ s3: key }, await build(partition));
}
// pipeline
// partitioned dynamic key="$.tenant_id"
import * as wmill from 'windmill-client';
const SUMMARY_URI = 's3:///lake/{partition}/summary.parquet';
// Triggered with a payload like { "tenant_id": "acme", ... }
// `partition` resolves to "acme"
export async function main(partition: string, tenant_id: string) {
const key = SUMMARY_URI.replace('s3:///', '').replace('{partition}', partition);
await wmill.writeS3File({ s3: key }, await summarize(tenant_id));
}
Backfill and re-runs
Because the partition is resolved once and travels with the run, you can backfill a historical partition by running the top of the pipeline manually with an explicit partition argument (for example partition = "2026-01-10"). That value is never re-resolved or overridden downstream, so the entire chain reprocesses exactly that partition. A retry of a failed step likewise reprocesses the run's partition rather than drifting to "now".
For materialized assets, this is surfaced as a one-click backfill over a range of partitions (Enterprise Edition).