Background Tasks
Some work an agent triggers takes too long to run inside a single chat turn: crawling a website, refreshing a knowledge base, generating a report. AgentPress runs that work as a background task — a durable chain of steps executed by background workers, with live progress streamed to the UI while the user keeps chatting.
Tasks are authored with the defineTask typed pipeline builder. When an agent
calls a task's tool, the task is enqueued and the agent immediately replies with
a short "scheduled" message; the worker then runs the steps and streams progress
back over a WebSocket channel.
Tasks live alongside tools in
packages/custom/src/tools/. They are auto-discovered by the tool registry — exporting a task from a scanned file registers both the task and a corresponding LLM tool. See Creating Tools for the simpler, synchronous tool pattern.
Defining a Task
A task is created with defineTask(name, options) and then chained with one or
more .step(...) calls. Each step receives a single context object and returns a
JSON-serializable result that flows into the next step.
import { defineTask } from "@agentpress/lib/utils/task";
import * as z from "zod";
export const getWeatherDynamic = defineTask("getWeatherDynamic", {
title: "Get Dynamic Weather", // display name (defaults to the task name)
description: "Get the weather in a location.", // LLM-facing tool description
input: z.object({
// LLM tool contract; re-validated by the worker at the init step
location: z.string().describe("City or state, full name"),
memo: z.string().describe("Brief memo with the location"),
}),
// progressBar?: boolean — show the step progress UI (default true)
// scheduledMessage?: { text: string; voice?: string }
})
.step(
"FetchForecast",
{ title: "Fetch forecast" },
async ({ input, progress, emit, signal }) => {
progress("Contacting weather service…"); // live status line in the task UI
const obs = await fetchWeather(input.location, { signal });
emit({ temperature: obs.temperature }); // streaming partial → custom UI now
progress(0.7, "Parsing observations…"); // 0–1 fraction + status
return { observation: obs };
},
)
.step("ComposeReport", async ({ input, prev }) => {
// `prev` is TYPE-INFERRED as the previous step's return value
return { message: `${prev.observation.temperature}F in ${input.location}` };
});defineTask options
| Option | Type | Description |
|---|---|---|
description | string (required) | The LLM-facing tool description. This is what the model reads to decide when to schedule the task. |
input | z.ZodType (required) | The LLM tool contract. Also re-validated by the worker at the init step, so direct enqueues are validated too. |
title | string | Display name shown in the task UI. Defaults to the task name. |
progressBar | boolean | Whether to render the step progress UI for this task. Defaults to true. |
scheduledMessage | { text: string; voice?: string } | What the agent relays when the task is scheduled. text instructs the text model; voice is spoken verbatim by TTS if provided. |
Adding steps
.step() accepts a name and a run function, with optional display metadata in
between:
// Name + run function
.step("FetchForecast", async (ctx) => { /* ... */ })
// Name + options + run function
.step("FetchForecast", { title: "Fetch forecast", description: "…" }, async (ctx) => { /* ... */ })The step title defaults to the step name with camel case un-mashed
("FetchForecast" → "Fetch Forecast"). The name "init" is reserved for the
internal scheduling step and cannot be used.
Step Context
Every step's run function receives one destructurable object:
| Field | Type | Description |
|---|---|---|
input | Input | The task's original validated input — available to every step. |
prev | Prev | The previous step's return value and nothing else. Type-inferred. {} for the first step. |
progress(fractionOrMsg, msg?) | (number | string, string?) => void | Report in-step progress: a status message, a 0–1 fraction, or both. |
emit(partial) | (Record<string, unknown>) => void | Stream a partial result to the tool's custom UI immediately. Shallow-merged into the task's live data (32 KB cap). |
signal | AbortSignal | Fires when the user cancels the task — pass it to fetch / LLM calls so they abort cleanly. |
meta | { taskId, threadId?, userId, orgId?, authToken? } | Identifiers for the running task row. authToken is fetched at runtime and never persisted. |
progress and emit are fire-and-forget. They broadcast throttled progress
over the task WebSocket channel and are snapshotted to the task row on the
worker heartbeat, so page reloads restore mid-step progress and partial results.
.step("Crawl", async ({ input, progress, emit, signal }) => {
progress("Starting crawl…"); // status only
for (let i = 0; i < input.pages.length; i++) {
const page = await scrape(input.pages[i], { signal }); // aborts on cancel
emit({ scraped: i + 1 }); // partial → custom UI
progress((i + 1) / input.pages.length); // fraction only
}
return { pages: input.pages.length };
})Data-Flow Contract
The pipeline is explicit — there is no implicit merging of step inputs and outputs.
inputis the task's original validated input, available to every step.previs the previous step's return value and nothing else. Anything a later step needs must be returned forward explicitly.- The final step's return value is the task output.
- Step returns must be JSON-serializable — they are persisted and replayed into the next step after a worker crash.
How Execution Works
- The agent schedules the task. The task's generated AI SDK tool creates a
row in the
taskstable with statusPENDINGand immediately returns the scheduled message (plustaskId), so the agent responds without waiting for the work to finish. - A background worker picks it up. The task listener polls for claimable
rows using
SELECT … FOR UPDATE SKIP LOCKED, so only one worker ever runs a given step. It re-validates the task input at the init step, then runs each step in order. - Each step is a durable row. Steps move through
pending → running → completed/failed/cancelled. Because every step's return value is persisted, a worker crash or deploy mid-task is recoverable — another worker reclaims the row (after the heartbeat goes stale) and resumes. - The final step's return value becomes the task output.
Durability and Step Names
A step's name is its durable identity — it is persisted on task rows and
used to resume after a crash or deploy. Renaming a step of a production task
breaks in-flight rows, so treat step names as frozen once a task has shipped.
Live Progress and Reload Resilience
While a step runs, progress() and emit() push updates to the client:
progress(...)updates the step's live status line and fraction. These broadcast throttledSTEP_PROGRESSmessages over the task WebSocket channel.emit(partial)streams partial results. Partials are shallow-merged into the task'slivedata object on the client and rendered by the tool's custom UI immediately (see Consuming Live Partials).
On every worker heartbeat (~5 seconds), the latest in-step progress and
partials are snapshotted to the task row's live_state. If the user reloads the
page mid-task, the snapshot is replayed so the progress bar and any streamed
partial results are restored exactly where they were.
Task WebSocket Messages
The task channel carries these message types:
| Message | Meaning |
|---|---|
CONNECTION_ESTABLISHED | The client connected to the task channel. |
TASK_STARTED | The task began running. |
STEP_PROGRESS | Live in-step progress (progress/emit), throttled. |
STEP_COMPLETED | A step finished and the next one is starting. |
TASK_FINISHED | The task completed successfully. |
TASK_FAILED | The task failed during processing or execution. |
TASK_CANCELLED | The task was cancelled by the user. |
RESYNC_REQUEST / RESYNC_RESPONSE | Client-initiated state resync after a reconnect. |
PING / PONG | Keepalive. |
Cancellation
A running task can be cancelled by the task owner or an org admin.
From the UI
The task progress UI exposes a stop control. When a user cancels:
- Pending steps finalize immediately.
- A running step is aborted via its
AbortSignalwithin roughly one heartbeat (~5 seconds) — which is why steps should passsignalto theirfetchand LLM calls. - The task lands in the
cancelledstate (notfailed). Clients receive aTASK_CANCELLEDmessage, and the UI renders a muted "Stopped" state with no chat notification.
From the API
POST /orgs/:orgSlug/tasks/:taskId/cancel| Response | Meaning |
|---|---|
202 | Cancellation accepted. The worker aborts the running step shortly. |
409 | The task is already in a terminal state (completed/failed/cancelled). |
Cancellation is cooperative: flagging the row signals the worker, which aborts
the in-flight step via the AbortSignal. A step that ignores signal will run
to its next checkpoint before stopping.
Consuming Live Partials in a Custom Tool UI
A task's tool renders in chat just like any other tool — through a custom tool
UI registered in packages/custom/src/ui/. Task tool UIs additionally receive a
taskProgress prop describing the live run, including taskProgress.live (the
shallow-merged emit() partials) and the per-step status.
See Creating Tool UIs for how tool UIs are built and registered.
"use client";
import type { TToolUI } from "@agentpress/lib/types";
export const getWeatherDynamic: TToolUI = ({ toolPart, taskProgress }) => {
const toolResult = (toolPart?.output ?? {}) as Record<string, unknown>;
const isRunning = taskProgress?.status === "running";
const isCancelled = taskProgress?.status === "cancelled";
// While running, render from streamed partials (`taskProgress.live`).
// Once complete, the persisted step/tool output takes over.
const finalOutput = taskProgress?.steps?.find(
(step) => step.name === "ComposeReport",
)?.output as Record<string, unknown> | undefined;
const weather: Record<string, unknown> = {
...(taskProgress?.live ?? {}), // streaming partials from emit()
...(finalOutput ?? {}), // persisted step output
...(taskProgress?.status === "completed" ? toolResult : {}),
};
// The live status line set by progress(...) on the current step:
const currentStep = taskProgress?.steps?.find(
(step) => step.name === taskProgress.currentStep,
);
const statusLine = isCancelled
? "Stopped before completing"
: isRunning
? currentStep?.statusText || taskProgress?.currentStepDisplayName
: undefined;
return (
<div>
<div>{String(weather.location ?? "…")}</div>
<div>{statusLine}</div>
{/* render weather.temperature, etc. — appears as soon as emit() lands */}
</div>
);
};The taskProgress object exposes the run state your UI needs:
| Field | Description |
|---|---|
status | "running", "completed", "failed", or "cancelled". |
live | The shallow-merged emit() partials for the current run. |
steps | Per-step state, including each step's output, statusText, and fraction. |
currentStep / currentStepDisplayName | The step currently running and its display name. |
showProgressBar | Mirrors the task's progressBar option. |
Key Files
| File | Role |
|---|---|
packages/lib/src/utils/task.ts | The defineTask builder, step context types, and the generated tool. |
packages/api/src/utils/taskListener.ts | The background worker: claims rows, runs steps, heartbeat, abort. |
packages/api/src/tasks/taskRegistry.ts | Registers task instances and creates task records (initializeTask). |
packages/api/src/hono/routes/tasks.ts | Task routes, including the cancel endpoint. |
packages/lib/src/stores/taskWebSocketStore.ts | Frontend store for task progress over the WebSocket channel. |
packages/custom/src/ui/ | Custom tool UIs, including task UIs that consume taskProgress. |