Applications can now send typed data directly to running tasks

A new input streams API enables bidirectional communication, allowing developers to stream real-time data into paused or running task processes to build interactive workflows and human-in-the-loop systems.
Interactive workflows, such as human-in-the-loop approvals or chat interfaces, require systems to feed data into a process while it runs. Previously, tasks operated mostly in a unidirectional flow.
Tasks can now define typed to receive data mid-execution. A task can be entirely suspended while waiting for a single external payload, or it can remain alive to of incoming messages. Outside the task, external backends and frontends can send data directly to these specific runs using a standard API or a provided .
Under the hood, suspended tasks rely on internal waitpoints mapped via a Redis cache, ensuring minimal latency when the external data finally arrives. The underlying event streaming relies on an upgraded version of the S2 streamstore SDK, which now includes local development support.
Here is how a task awaits external input:
1const approval = streams.input<{ approved: boolean }>({ id: "approval" });2const result = await approval.wait({ timeout: "5m" });
View Original GitHub Description
Input streams enable sending typed data to executing tasks from external callers — backends, frontends, or other tasks. This unlocks interactive use cases like approval UIs, cancel buttons, chat interfaces, and human-in-the-loop AI workflows where the task needs to receive data while running.
Three consumption patterns inside a task:
.wait()— Suspend the task until data arrives (process freed, most efficient).once()— Wait for the next message (process stays alive).on()— Subscribe to a continuous stream of messages
One send pattern from outside:
.send(runId, data)— Send typed data to a specific run's input stream
User-facing API
Define a typed input stream
import { streams, task } from "@trigger.dev/sdk";
const approval = streams.input<{ approved: boolean; reviewer: string }>({ id: "approval" });
Consume inside a task
export const myTask = task({
id: "my-task",
run: async () => {
// Pattern 1: Suspend until data arrives (most efficient — frees the process)
const result = await approval.wait({ timeout: "5m" });
// Pattern 2: Wait for next message (process stays alive)
const data = await approval.once().unwrap();
// Pattern 3: Subscribe to multiple messages
approval.on((data) => { /* handle each message */ });
},
});
Send from outside
// From a backend (using secret API key)
await approval.send(runId, { approved: true, reviewer: "alice" });
// From a frontend (using public JWT token from trigger response)
const { send } = useInputStreamSend("approval", runId, { accessToken });
send({ approved: true, reviewer: "alice" });