Background Tasks

Some work an agent triggers takes too long to run inside a single chat turn: crawling a website, refreshing a knowledge base, generating a report. AgentPress runs that work as a background task — a durable chain of steps executed by background workers, with live progress streamed to the UI while the user keeps chatting.

Tasks are authored with the defineTask typed pipeline builder. When an agent calls a task's tool, the task is enqueued and the agent immediately replies with a short "scheduled" message; the worker then runs the steps and streams progress back over a WebSocket channel.

Tasks live alongside tools in packages/custom/src/tools/. They are auto-discovered by the tool registry — exporting a task from a scanned file registers both the task and a corresponding LLM tool. See Creating Tools for the simpler, synchronous tool pattern.

Defining a Task

A task is created with defineTask(name, options) and then chained with one or more .step(...) calls. Each step receives a single context object and returns a JSON-serializable result that flows into the next step.

import { defineTask } from "@agentpress/lib/utils/task";
import * as z from "zod";

export const getWeatherDynamic = defineTask("getWeatherDynamic", {
  title: "Get Dynamic Weather", // display name (defaults to the task name)
  description: "Get the weather in a location.", // LLM-facing tool description
  input: z.object({
    // LLM tool contract; re-validated by the worker at the init step
    location: z.string().describe("City or state, full name"),
    memo: z.string().describe("Brief memo with the location"),
  }),
  // progressBar?: boolean — show the step progress UI (default true)
  // scheduledMessage?: { text: string; voice?: string }
})
  .step(
    "FetchForecast",
    { title: "Fetch forecast" },
    async ({ input, progress, emit, signal }) => {
      progress("Contacting weather service…"); // live status line in the task UI
      const obs = await fetchWeather(input.location, { signal });
      emit({ temperature: obs.temperature }); // streaming partial → custom UI now
      progress(0.7, "Parsing observations…"); // 0–1 fraction + status
      return { observation: obs };
    },
  )
  .step("ComposeReport", async ({ input, prev }) => {
    // `prev` is TYPE-INFERRED as the previous step's return value
    return { message: `${prev.observation.temperature}F in ${input.location}` };
  });

defineTask options

OptionTypeDescription
descriptionstring (required)The LLM-facing tool description. This is what the model reads to decide when to schedule the task.
inputz.ZodType (required)The LLM tool contract. Also re-validated by the worker at the init step, so direct enqueues are validated too.
titlestringDisplay name shown in the task UI. Defaults to the task name.
progressBarbooleanWhether to render the step progress UI for this task. Defaults to true.
scheduledMessage{ text: string; voice?: string }What the agent relays when the task is scheduled. text instructs the text model; voice is spoken verbatim by TTS if provided.

Adding steps

.step() accepts a name and a run function, with optional display metadata in between:

// Name + run function
.step("FetchForecast", async (ctx) => { /* ... */ })

// Name + options + run function
.step("FetchForecast", { title: "Fetch forecast", description: "…" }, async (ctx) => { /* ... */ })

The step title defaults to the step name with camel case un-mashed ("FetchForecast""Fetch Forecast"). The name "init" is reserved for the internal scheduling step and cannot be used.

Step Context

Every step's run function receives one destructurable object:

FieldTypeDescription
inputInputThe task's original validated input — available to every step.
prevPrevThe previous step's return value and nothing else. Type-inferred. {} for the first step.
progress(fractionOrMsg, msg?)(number | string, string?) => voidReport in-step progress: a status message, a 0–1 fraction, or both.
emit(partial)(Record<string, unknown>) => voidStream a partial result to the tool's custom UI immediately. Shallow-merged into the task's live data (32 KB cap).
signalAbortSignalFires when the user cancels the task — pass it to fetch / LLM calls so they abort cleanly.
meta{ taskId, threadId?, userId, orgId?, authToken? }Identifiers for the running task row. authToken is fetched at runtime and never persisted.

progress and emit are fire-and-forget. They broadcast throttled progress over the task WebSocket channel and are snapshotted to the task row on the worker heartbeat, so page reloads restore mid-step progress and partial results.

.step("Crawl", async ({ input, progress, emit, signal }) => {
  progress("Starting crawl…");                 // status only
  for (let i = 0; i < input.pages.length; i++) {
    const page = await scrape(input.pages[i], { signal }); // aborts on cancel
    emit({ scraped: i + 1 });                   // partial → custom UI
    progress((i + 1) / input.pages.length);      // fraction only
  }
  return { pages: input.pages.length };
})

Data-Flow Contract

The pipeline is explicit — there is no implicit merging of step inputs and outputs.

  • input is the task's original validated input, available to every step.
  • prev is the previous step's return value and nothing else. Anything a later step needs must be returned forward explicitly.
  • The final step's return value is the task output.
  • Step returns must be JSON-serializable — they are persisted and replayed into the next step after a worker crash.

How Execution Works

  1. The agent schedules the task. The task's generated AI SDK tool creates a row in the tasks table with status PENDING and immediately returns the scheduled message (plus taskId), so the agent responds without waiting for the work to finish.
  2. A background worker picks it up. The task listener polls for claimable rows using SELECT … FOR UPDATE SKIP LOCKED, so only one worker ever runs a given step. It re-validates the task input at the init step, then runs each step in order.
  3. Each step is a durable row. Steps move through pending → running → completed/failed/cancelled. Because every step's return value is persisted, a worker crash or deploy mid-task is recoverable — another worker reclaims the row (after the heartbeat goes stale) and resumes.
  4. The final step's return value becomes the task output.

Durability and Step Names

A step's name is its durable identity — it is persisted on task rows and used to resume after a crash or deploy. Renaming a step of a production task breaks in-flight rows, so treat step names as frozen once a task has shipped.

Live Progress and Reload Resilience

While a step runs, progress() and emit() push updates to the client:

  • progress(...) updates the step's live status line and fraction. These broadcast throttled STEP_PROGRESS messages over the task WebSocket channel.
  • emit(partial) streams partial results. Partials are shallow-merged into the task's live data object on the client and rendered by the tool's custom UI immediately (see Consuming Live Partials).

On every worker heartbeat (~5 seconds), the latest in-step progress and partials are snapshotted to the task row's live_state. If the user reloads the page mid-task, the snapshot is replayed so the progress bar and any streamed partial results are restored exactly where they were.

Task WebSocket Messages

The task channel carries these message types:

MessageMeaning
CONNECTION_ESTABLISHEDThe client connected to the task channel.
TASK_STARTEDThe task began running.
STEP_PROGRESSLive in-step progress (progress/emit), throttled.
STEP_COMPLETEDA step finished and the next one is starting.
TASK_FINISHEDThe task completed successfully.
TASK_FAILEDThe task failed during processing or execution.
TASK_CANCELLEDThe task was cancelled by the user.
RESYNC_REQUEST / RESYNC_RESPONSEClient-initiated state resync after a reconnect.
PING / PONGKeepalive.

Cancellation

A running task can be cancelled by the task owner or an org admin.

From the UI

The task progress UI exposes a stop control. When a user cancels:

  • Pending steps finalize immediately.
  • A running step is aborted via its AbortSignal within roughly one heartbeat (~5 seconds) — which is why steps should pass signal to their fetch and LLM calls.
  • The task lands in the cancelled state (not failed). Clients receive a TASK_CANCELLED message, and the UI renders a muted "Stopped" state with no chat notification.

From the API

POST /orgs/:orgSlug/tasks/:taskId/cancel
ResponseMeaning
202Cancellation accepted. The worker aborts the running step shortly.
409The task is already in a terminal state (completed/failed/cancelled).

Cancellation is cooperative: flagging the row signals the worker, which aborts the in-flight step via the AbortSignal. A step that ignores signal will run to its next checkpoint before stopping.

Consuming Live Partials in a Custom Tool UI

A task's tool renders in chat just like any other tool — through a custom tool UI registered in packages/custom/src/ui/. Task tool UIs additionally receive a taskProgress prop describing the live run, including taskProgress.live (the shallow-merged emit() partials) and the per-step status.

See Creating Tool UIs for how tool UIs are built and registered.

"use client";

import type { TToolUI } from "@agentpress/lib/types";

export const getWeatherDynamic: TToolUI = ({ toolPart, taskProgress }) => {
  const toolResult = (toolPart?.output ?? {}) as Record<string, unknown>;
  const isRunning = taskProgress?.status === "running";
  const isCancelled = taskProgress?.status === "cancelled";

  // While running, render from streamed partials (`taskProgress.live`).
  // Once complete, the persisted step/tool output takes over.
  const finalOutput = taskProgress?.steps?.find(
    (step) => step.name === "ComposeReport",
  )?.output as Record<string, unknown> | undefined;

  const weather: Record<string, unknown> = {
    ...(taskProgress?.live ?? {}), // streaming partials from emit()
    ...(finalOutput ?? {}), // persisted step output
    ...(taskProgress?.status === "completed" ? toolResult : {}),
  };

  // The live status line set by progress(...) on the current step:
  const currentStep = taskProgress?.steps?.find(
    (step) => step.name === taskProgress.currentStep,
  );
  const statusLine = isCancelled
    ? "Stopped before completing"
    : isRunning
      ? currentStep?.statusText || taskProgress?.currentStepDisplayName
      : undefined;

  return (
    <div>
      <div>{String(weather.location ?? "…")}</div>
      <div>{statusLine}</div>
      {/* render weather.temperature, etc. — appears as soon as emit() lands */}
    </div>
  );
};

The taskProgress object exposes the run state your UI needs:

FieldDescription
status"running", "completed", "failed", or "cancelled".
liveThe shallow-merged emit() partials for the current run.
stepsPer-step state, including each step's output, statusText, and fraction.
currentStep / currentStepDisplayNameThe step currently running and its display name.
showProgressBarMirrors the task's progressBar option.

Key Files

FileRole
packages/lib/src/utils/task.tsThe defineTask builder, step context types, and the generated tool.
packages/api/src/utils/taskListener.tsThe background worker: claims rows, runs steps, heartbeat, abort.
packages/api/src/tasks/taskRegistry.tsRegisters task instances and creates task records (initializeTask).
packages/api/src/hono/routes/tasks.tsTask routes, including the cancel endpoint.
packages/lib/src/stores/taskWebSocketStore.tsFrontend store for task progress over the WebSocket channel.
packages/custom/src/ui/Custom tool UIs, including task UIs that consume taskProgress.

On this page