Skip to content

stream/iter: per-consumer AbortSignal is ignored by broadcast.push() and share.pull() fast path #63302

@trivikr

Description

@trivikr

Version

26.1.0

Platform

macOS 26.5.0

Subsystem

stream

What steps will reproduce the bug?

import { setTimeout as delay } from "node:timers/promises";
import { broadcast, share } from "node:stream/iter";

async function check(label, next) {
  console.log(
    await Promise.race([
      next.then(
        (value) => `${label}: settled ${JSON.stringify(value)}`,
        (error) => `${label}: rejected ${error?.name ?? error}`,
      ),
      delay(50, `${label}: still pending`),
    ]),
  );
}

{
  const ac = new AbortController();
  const { broadcast: bc } = broadcast();
  const it = bc.push({ signal: ac.signal })[Symbol.asyncIterator]();

  const next = it.next();
  ac.abort(new Error("abort"));

  await check("broadcast.push({ signal }).next()", next);
}

{
  const ac = new AbortController();
  const shared = share(
    (async function* never() {
      await new Promise(() => {});
    })(),
  );
  const it = shared.pull({ signal: ac.signal })[Symbol.asyncIterator]();

  const next = it.next();
  ac.abort(new Error("abort"));

  await check("share.pull({ signal }).next()", next);
}

How often does it reproduce? Is there a required condition?

Always

What is the expected behavior? Why is that the expected behavior?

broadcast.push({ signal }).next(): rejected Error
share.pull({ signal }).next(): rejected Error

What do you see instead?

broadcast.push({ signal }).next(): still pending
share.pull({ signal }).next(): still pending

Additional information

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    streamIssues and PRs related to the stream subsystem.

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions