Files
react/packages/react-server-dom-parcel/src/server/ReactFlightDOMServerEdge.js
T
Sebastian Markbåge 9f540fcc51 [Flight] Support streaming of decodeReply in Edge environments (#31852)
We support streaming `multipart/form-data` in Node.js using Busboy since
that's kind of the idiomatic ecosystem way for handling these stream
there. There's not really anything idiomatic like that for Edge that's
universal yet.

This adds a version that's basically just
`AsyncIterable.from(formData)`. It could also be a `ReadableStream` of
those entries since those are also `AsyncIterable`.

I imagine that in the future we might add one from a binary
`ReadableStream` that does the parsing built-in.
2024-12-19 12:54:59 -05:00

264 lines
7.2 KiB
JavaScript

/**
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*
* @flow
*/
import type {ReactClientValue} from 'react-server/src/ReactFlightServer';
import type {ReactFormState, Thenable} from 'shared/ReactTypes';
import {
preloadModule,
requireModule,
resolveServerReference,
type ServerManifest,
type ServerReferenceId,
} from '../client/ReactFlightClientConfigBundlerParcel';
import {ASYNC_ITERATOR} from 'shared/ReactSymbols';
import {
createRequest,
createPrerenderRequest,
startWork,
startFlowing,
stopFlowing,
abort,
} from 'react-server/src/ReactFlightServer';
import {
createResponse,
close,
getRoot,
reportGlobalError,
resolveField,
resolveFile,
} from 'react-server/src/ReactFlightReplyServer';
import {
decodeAction as decodeActionImpl,
decodeFormState as decodeFormStateImpl,
} from 'react-server/src/ReactFlightActionServer';
export {
createClientReference,
registerServerReference,
} from '../ReactFlightParcelReferences';
import type {TemporaryReferenceSet} from 'react-server/src/ReactFlightServerTemporaryReferences';
export {createTemporaryReferenceSet} from 'react-server/src/ReactFlightServerTemporaryReferences';
export type {TemporaryReferenceSet};
type Options = {
environmentName?: string | (() => string),
filterStackFrame?: (url: string, functionName: string) => boolean,
identifierPrefix?: string,
signal?: AbortSignal,
temporaryReferences?: TemporaryReferenceSet,
onError?: (error: mixed) => void,
onPostpone?: (reason: string) => void,
};
export function renderToReadableStream(
model: ReactClientValue,
options?: Options,
): ReadableStream {
const request = createRequest(
model,
null,
options ? options.onError : undefined,
options ? options.identifierPrefix : undefined,
options ? options.onPostpone : undefined,
options ? options.temporaryReferences : undefined,
__DEV__ && options ? options.environmentName : undefined,
__DEV__ && options ? options.filterStackFrame : undefined,
);
if (options && options.signal) {
const signal = options.signal;
if (signal.aborted) {
abort(request, (signal: any).reason);
} else {
const listener = () => {
abort(request, (signal: any).reason);
signal.removeEventListener('abort', listener);
};
signal.addEventListener('abort', listener);
}
}
const stream = new ReadableStream(
{
type: 'bytes',
start: (controller): ?Promise<void> => {
startWork(request);
},
pull: (controller): ?Promise<void> => {
startFlowing(request, controller);
},
cancel: (reason): ?Promise<void> => {
stopFlowing(request);
abort(request, reason);
},
},
// $FlowFixMe[prop-missing] size() methods are not allowed on byte streams.
{highWaterMark: 0},
);
return stream;
}
type StaticResult = {
prelude: ReadableStream,
};
export function prerender(
model: ReactClientValue,
options?: Options,
): Promise<StaticResult> {
return new Promise((resolve, reject) => {
const onFatalError = reject;
function onAllReady() {
const stream = new ReadableStream(
{
type: 'bytes',
start: (controller): ?Promise<void> => {
startWork(request);
},
pull: (controller): ?Promise<void> => {
startFlowing(request, controller);
},
cancel: (reason): ?Promise<void> => {
stopFlowing(request);
abort(request, reason);
},
},
// $FlowFixMe[prop-missing] size() methods are not allowed on byte streams.
{highWaterMark: 0},
);
resolve({prelude: stream});
}
const request = createPrerenderRequest(
model,
null,
onAllReady,
onFatalError,
options ? options.onError : undefined,
options ? options.identifierPrefix : undefined,
options ? options.onPostpone : undefined,
options ? options.temporaryReferences : undefined,
__DEV__ && options ? options.environmentName : undefined,
__DEV__ && options ? options.filterStackFrame : undefined,
);
if (options && options.signal) {
const signal = options.signal;
if (signal.aborted) {
const reason = (signal: any).reason;
abort(request, reason);
} else {
const listener = () => {
const reason = (signal: any).reason;
abort(request, reason);
signal.removeEventListener('abort', listener);
};
signal.addEventListener('abort', listener);
}
}
startWork(request);
});
}
let serverManifest = {};
export function registerServerActions(manifest: ServerManifest) {
// This function is called by the bundler to register the manifest.
serverManifest = manifest;
}
export function decodeReply<T>(
body: string | FormData,
options?: {temporaryReferences?: TemporaryReferenceSet},
): Thenable<T> {
if (typeof body === 'string') {
const form = new FormData();
form.append('0', body);
body = form;
}
const response = createResponse(
serverManifest,
'',
options ? options.temporaryReferences : undefined,
body,
);
const root = getRoot<T>(response);
close(response);
return root;
}
export function decodeReplyFromAsyncIterable<T>(
iterable: AsyncIterable<[string, string | File]>,
options?: {temporaryReferences?: TemporaryReferenceSet},
): Thenable<T> {
const iterator: AsyncIterator<[string, string | File]> =
iterable[ASYNC_ITERATOR]();
const response = createResponse(
serverManifest,
'',
options ? options.temporaryReferences : undefined,
);
function progress(
entry:
| {done: false, +value: [string, string | File], ...}
| {done: true, +value: void, ...},
) {
if (entry.done) {
close(response);
} else {
const [name, value] = entry.value;
if (typeof value === 'string') {
resolveField(response, name, value);
} else {
resolveFile(response, name, value);
}
iterator.next().then(progress, error);
}
}
function error(reason: Error) {
reportGlobalError(response, reason);
if (typeof (iterator: any).throw === 'function') {
// The iterator protocol doesn't necessarily include this but a generator do.
// $FlowFixMe should be able to pass mixed
iterator.throw(reason).then(error, error);
}
}
iterator.next().then(progress, error);
return getRoot(response);
}
export function decodeAction<T>(body: FormData): Promise<() => T> | null {
return decodeActionImpl(body, serverManifest);
}
export function decodeFormState<S>(
actionResult: S,
body: FormData,
): Promise<ReactFormState<S, ServerReferenceId> | null> {
return decodeFormStateImpl(actionResult, body, serverManifest);
}
export function loadServerAction<F: (...any[]) => any>(id: string): Promise<F> {
const reference = resolveServerReference<any>(serverManifest, id);
return Promise.resolve(reference)
.then(() => preloadModule(reference))
.then(() => {
const fn = requireModule(reference);
if (typeof fn !== 'function') {
throw new Error('Server actions must be functions');
}
return fn;
});
}