Files
react/packages/react-server/src/ReactFizzThenable.js
T
Sebastian Markbåge 7909d8eabb [Flight] Encode ReadableStream and AsyncIterables (#28847)
This adds support in Flight for serializing four kinds of streams:

- `ReadableStream` with objects as a model. This is a single shot
iterator so you can read it only once. It can contain any value
including Server Components. Chunks are encoded as is so if you send in
10 typed arrays, you get the same typed arrays out on the other side.
- Binary `ReadableStream` with `type: 'bytes'` option. This supports the
BYOB protocol. In this mode, the receiving side just gets `Uint8Array`s
and they can be split across any single byte boundary into arbitrary
chunks.
- `AsyncIterable` where the `AsyncIterator` function is different than
the `AsyncIterable` itself. In this case we assume that this might be a
multi-shot iterable and so we buffer its value and you can iterate it
multiple times on the other side. We support the `return` value as a
value in the single completion slot, but you can't pass values in
`next()`. If you want single-shot, return the AsyncIterator instead.
- `AsyncIterator`. These gets serialized as a single-shot as it's just
an iterator.

`AsyncIterable`/`AsyncIterator` yield Promises that are instrumented
with our `.status`/`.value` convention so that they can be synchronously
looped over if available. They are also lazily parsed upon read.

We can't do this with `ReadableStream` because we use the native
implementation of `ReadableStream` which owns the promises.

The format is a leading row that indicates which type of stream it is.
Then a new row with the same ID is emitted for every chunk. Followed by
either an error or close row.

`AsyncIterable`s can also be returned as children of Server Components
and then they're conceptually the same as fragment arrays/iterables.
They can't actually be used as children in Fizz/Fiber but there's a
separate plan for that. Only `AsyncIterable` not `AsyncIterator` will be
valid as children - just like sync `Iterable` is already supported but
single-shot `Iterator` is not. Notably, neither of these streams
represent updates over time to a value. They represent multiple values
in a list.

When the server stream is aborted we also close the underlying stream.
However, closing a stream on the client, doesn't close the underlying
stream.

A couple of possible follow ups I'm not planning on doing right now:

- [ ] Free memory by releasing the buffer if an Iterator has been
exhausted. Single shots could be optimized further to release individual
items as you go.
- [ ] We could clean up the underlying stream if the only pending data
that's still flowing is from streams and all the streams have cleaned
up. It's not very reliable though. It's better to do cancellation for
the whole stream - e.g. at the framework level.
- [ ] Implement smarter Binary Stream chunk handling. Currently we wait
until we've received a whole row for binary chunks and copy them into
consecutive memory. We need this to preserve semantics when passing
typed arrays. However, for binary streams we don't need that. We can
just send whatever pieces we have so far.
2024-04-16 12:20:07 -04:00

153 lines
5.6 KiB
JavaScript

/**
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*
* @flow
*/
// Corresponds to ReactFiberWakeable and ReactFlightWakeable modules. Generally,
// changes to one module should be reflected in the others.
// TODO: Rename this module and the corresponding Fiber one to "Thenable"
// instead of "Wakeable". Or some other more appropriate name.
import type {
Thenable,
PendingThenable,
FulfilledThenable,
RejectedThenable,
} from 'shared/ReactTypes';
export opaque type ThenableState = Array<Thenable<any>>;
// An error that is thrown (e.g. by `use`) to trigger Suspense. If we
// detect this is caught by userspace, we'll log a warning in development.
export const SuspenseException: mixed = new Error(
"Suspense Exception: This is not a real error! It's an implementation " +
'detail of `use` to interrupt the current render. You must either ' +
'rethrow it immediately, or move the `use` call outside of the ' +
'`try/catch` block. Capturing without rethrowing will lead to ' +
'unexpected behavior.\n\n' +
'To handle async errors, wrap your component in an error boundary, or ' +
"call the promise's `.catch` method and pass the result to `use`",
);
export function createThenableState(): ThenableState {
// The ThenableState is created the first time a component suspends. If it
// suspends again, we'll reuse the same state.
return [];
}
function noop(): void {}
export function trackUsedThenable<T>(
thenableState: ThenableState,
thenable: Thenable<T>,
index: number,
): T {
const previous = thenableState[index];
if (previous === undefined) {
thenableState.push(thenable);
} else {
if (previous !== thenable) {
// Reuse the previous thenable, and drop the new one. We can assume
// they represent the same value, because components are idempotent.
// Avoid an unhandled rejection errors for the Promises that we'll
// intentionally ignore.
thenable.then(noop, noop);
thenable = previous;
}
}
// We use an expando to track the status and result of a thenable so that we
// can synchronously unwrap the value. Think of this as an extension of the
// Promise API, or a custom interface that is a superset of Thenable.
//
// If the thenable doesn't have a status, set it to "pending" and attach
// a listener that will update its status and result when it resolves.
switch (thenable.status) {
case 'fulfilled': {
const fulfilledValue: T = thenable.value;
return fulfilledValue;
}
case 'rejected': {
const rejectedError = thenable.reason;
throw rejectedError;
}
default: {
if (typeof thenable.status === 'string') {
// Only instrument the thenable if the status if not defined. If
// it's defined, but an unknown value, assume it's been instrumented by
// some custom userspace implementation. We treat it as "pending".
// Attach a dummy listener, to ensure that any lazy initialization can
// happen. Flight lazily parses JSON when the value is actually awaited.
thenable.then(noop, noop);
} else {
const pendingThenable: PendingThenable<T> = (thenable: any);
pendingThenable.status = 'pending';
pendingThenable.then(
fulfilledValue => {
if (thenable.status === 'pending') {
const fulfilledThenable: FulfilledThenable<T> = (thenable: any);
fulfilledThenable.status = 'fulfilled';
fulfilledThenable.value = fulfilledValue;
}
},
(error: mixed) => {
if (thenable.status === 'pending') {
const rejectedThenable: RejectedThenable<T> = (thenable: any);
rejectedThenable.status = 'rejected';
rejectedThenable.reason = error;
}
},
);
}
// Check one more time in case the thenable resolved synchronously
switch (thenable.status) {
case 'fulfilled': {
const fulfilledThenable: FulfilledThenable<T> = (thenable: any);
return fulfilledThenable.value;
}
case 'rejected': {
const rejectedThenable: RejectedThenable<T> = (thenable: any);
throw rejectedThenable.reason;
}
}
// Suspend.
//
// Throwing here is an implementation detail that allows us to unwind the
// call stack. But we shouldn't allow it to leak into userspace. Throw an
// opaque placeholder value instead of the actual thenable. If it doesn't
// get captured by the work loop, log a warning, because that means
// something in userspace must have caught it.
suspendedThenable = thenable;
throw SuspenseException;
}
}
}
// This is used to track the actual thenable that suspended so it can be
// passed to the rest of the Suspense implementation — which, for historical
// reasons, expects to receive a thenable.
let suspendedThenable: Thenable<any> | null = null;
export function getSuspendedThenable(): Thenable<mixed> {
// This is called right after `use` suspends by throwing an exception. `use`
// throws an opaque value instead of the thenable itself so that it can't be
// caught in userspace. Then the work loop accesses the actual thenable using
// this function.
if (suspendedThenable === null) {
throw new Error(
'Expected a suspended thenable. This is a bug in React. Please file ' +
'an issue.',
);
}
const thenable = suspendedThenable;
suspendedThenable = null;
return thenable;
}