Gitpulse
LatestReleasesStand-up
Merged
Size
XL
Extra Large: 1000+ weighted lines
Change Breakdown
Feature85%
Dependencies10%
Docs5%
#3146feat: Input Streams - Bidirectional task communication

Applications can now send typed data directly to running tasks

Applications can now send typed data directly to running tasks
ER
ericallam
·Mar 2, 2026·#3146feat: Input Streams - Bidirectional task communication

A new input streams API enables bidirectional communication, allowing developers to stream real-time data into paused or running task processes to build interactive workflows and human-in-the-loop systems.

Interactive workflows, such as human-in-the-loop approvals or chat interfaces, require systems to feed data into a process while it runs. Previously, tasks operated mostly in a unidirectional flow.

Tasks can now define typed to receive data mid-execution. A task can be entirely suspended while waiting for a single external payload, or it can remain alive to of incoming messages. Outside the task, external backends and frontends can send data directly to these specific runs using a standard API or a provided .

Under the hood, suspended tasks rely on internal waitpoints mapped via a Redis cache, ensuring minimal latency when the external data finally arrives. The underlying event streaming relies on an upgraded version of the S2 streamstore SDK, which now includes local development support.

Here is how a task awaits external input:

typescript
1const approval = streams.input<{ approved: boolean }>({ id: "approval" });
2const result = await approval.wait({ timeout: "5m" });
View Original GitHub Description

Input streams enable sending typed data to executing tasks from external callers — backends, frontends, or other tasks. This unlocks interactive use cases like approval UIs, cancel buttons, chat interfaces, and human-in-the-loop AI workflows where the task needs to receive data while running.

Three consumption patterns inside a task:

  • .wait() — Suspend the task until data arrives (process freed, most efficient)
  • .once() — Wait for the next message (process stays alive)
  • .on() — Subscribe to a continuous stream of messages

One send pattern from outside:

  • .send(runId, data) — Send typed data to a specific run's input stream

User-facing API

Define a typed input stream

import { streams, task } from "@trigger.dev/sdk";

const approval = streams.input<{ approved: boolean; reviewer: string }>({ id: "approval" });

Consume inside a task

export const myTask = task({
  id: "my-task",
  run: async () => {
    // Pattern 1: Suspend until data arrives (most efficient — frees the process)
    const result = await approval.wait({ timeout: "5m" });

    // Pattern 2: Wait for next message (process stays alive)
    const data = await approval.once().unwrap();

    // Pattern 3: Subscribe to multiple messages
    approval.on((data) => { /* handle each message */ });
  },
});

Send from outside

// From a backend (using secret API key)
await approval.send(runId, { approved: true, reviewer: "alice" });

// From a frontend (using public JWT token from trigger response)
const { send } = useInputStreamSend("approval", runId, { accessToken });
send({ approved: true, reviewer: "alice" });
© 2026 · via Gitpulse