Skip to content

Commit 25b75ac

Browse files
committed
introduce executeStreamIterator
1 parent 79983d5 commit 25b75ac

File tree

2 files changed

+151
-107
lines changed

2 files changed

+151
-107
lines changed

src/execution/__tests__/stream-test.ts

-10
Original file line numberDiff line numberDiff line change
@@ -532,11 +532,6 @@ describe('Execute: stream directive', () => {
532532
},
533533
],
534534
},
535-
],
536-
hasNext: true,
537-
},
538-
{
539-
incremental: [
540535
{
541536
items: [{ name: 'Leia', id: '3' }],
542537
path: ['friendList', 2],
@@ -978,11 +973,6 @@ describe('Execute: stream directive', () => {
978973
},
979974
],
980975
},
981-
],
982-
hasNext: true,
983-
},
984-
{
985-
incremental: [
986976
{
987977
items: [{ nonNullName: 'Han' }],
988978
path: ['friendList', 2],

src/execution/execute.ts

+151-97
Original file line numberDiff line numberDiff line change
@@ -1473,38 +1473,39 @@ function completeListValue(
14731473
// This is specified as a simple map, however we're optimizing the path
14741474
// where the list contains no Promises by avoiding creating another Promise.
14751475
let containsPromise = false;
1476-
let currentParents = parentRecords;
14771476
const completedResults: Array<unknown> = [];
14781477
let index = 0;
14791478
let streamContext: StreamContext | undefined;
1480-
for (const item of result) {
1481-
// No need to modify the info object containing the path,
1482-
// since from here on it is not ever accessed by resolver functions.
1483-
const itemPath = addPath(path, index, fieldGroup);
1484-
1479+
const iterator = result[Symbol.iterator]();
1480+
// eslint-disable-next-line no-constant-condition
1481+
while (true) {
14851482
if (streamUsage && index >= streamUsage.initialCount) {
1486-
if (streamContext === undefined) {
1487-
streamContext = {
1488-
label: streamUsage.label,
1489-
path: pathToArray(path),
1490-
};
1491-
}
1492-
currentParents = executeStreamField(
1493-
path,
1494-
itemPath,
1495-
item,
1483+
streamContext = {
1484+
label: streamUsage.label,
1485+
path: pathToArray(path),
1486+
};
1487+
executeStreamIterator(
1488+
index,
1489+
iterator,
14961490
exeContext,
1491+
itemType,
14971492
getStreamedFieldGroup(fieldGroup, streamUsage),
14981493
info,
1499-
itemType,
1500-
streamContext,
1494+
path,
15011495
deferMap,
1502-
currentParents,
1496+
streamContext,
1497+
parentRecords,
15031498
);
1504-
index++;
1505-
continue;
1499+
break;
1500+
}
1501+
1502+
const iteration = iterator.next();
1503+
if (iteration.done) {
1504+
break;
15061505
}
15071506

1507+
const item = iteration.value;
1508+
const itemPath = addPath(path, index, fieldGroup);
15081509
addPendingDeferredField(fieldGroup, itemPath, deferMap);
15091510
if (
15101511
completeListItemValue(
@@ -2171,86 +2172,71 @@ function assertEventStream(result: unknown): AsyncIterable<unknown> {
21712172
return result;
21722173
}
21732174

2174-
function executeStreamField(
2175-
path: Path<FieldGroup>,
2176-
itemPath: Path<FieldGroup>,
2177-
item: PromiseOrValue<unknown>,
2175+
function executeStreamIterator(
2176+
initialIndex: number,
2177+
iterator: Iterator<unknown>,
21782178
exeContext: ExecutionContext,
2179+
itemType: GraphQLOutputType,
21792180
fieldGroup: FieldGroup,
21802181
info: GraphQLResolveInfo,
2181-
itemType: GraphQLOutputType,
2182-
streamContext: StreamContext,
2182+
path: Path<FieldGroup>,
21832183
deferMap: Map<DeferUsage, DeferredFragmentRecord>,
2184-
parents?: Array<AsyncPayloadRecord> | undefined,
2185-
): Array<StreamRecord> {
2186-
const streamRecord = new StreamRecord({
2187-
streamContext,
2188-
path: itemPath,
2189-
parents,
2190-
exeContext,
2191-
});
2192-
const currentParents = [streamRecord];
2193-
if (isPromise(item)) {
2194-
const completedItems = completePromisedValue(
2195-
exeContext,
2196-
itemType,
2197-
fieldGroup,
2198-
info,
2199-
itemPath,
2200-
item,
2201-
deferMap,
2202-
streamRecord,
2203-
currentParents,
2204-
).then(
2205-
(value) => [value],
2206-
(error) => {
2207-
streamRecord.errors.push(error);
2208-
filterSubsequentPayloads(exeContext, path, currentParents);
2209-
return null;
2210-
},
2211-
);
2184+
streamContext: StreamContext,
2185+
parents: Array<AsyncPayloadRecord> | undefined,
2186+
): void {
2187+
let index = initialIndex;
2188+
let currentParents = parents;
2189+
// eslint-disable-next-line no-constant-condition
2190+
while (true) {
2191+
const iteration = iterator.next();
2192+
if (iteration.done) {
2193+
break;
2194+
}
22122195

2213-
streamRecord.addItems(completedItems);
2214-
return currentParents;
2215-
}
2196+
const item = iteration.value;
2197+
const itemPath = addPath(path, index, fieldGroup);
2198+
const streamRecord = new StreamRecord({
2199+
streamContext,
2200+
path: itemPath,
2201+
parents: currentParents,
2202+
exeContext,
2203+
});
22162204

2217-
let completedItem: PromiseOrValue<unknown>;
2218-
try {
2219-
try {
2220-
completedItem = completeValue(
2205+
currentParents = [streamRecord];
2206+
if (isPromise(item)) {
2207+
// eslint-disable-next-line @typescript-eslint/no-floating-promises
2208+
handlePromisedItemForStream(
22212209
exeContext,
22222210
itemType,
22232211
fieldGroup,
22242212
info,
2213+
path,
22252214
itemPath,
22262215
item,
22272216
deferMap,
22282217
streamRecord,
22292218
currentParents,
22302219
);
2231-
} catch (rawError) {
2232-
handleFieldError(
2233-
rawError,
2234-
exeContext,
2235-
itemType,
2236-
fieldGroup,
2237-
itemPath,
2238-
deferMap,
2239-
streamRecord,
2240-
);
2241-
completedItem = null;
2242-
filterSubsequentPayloads(exeContext, itemPath, currentParents);
2220+
2221+
index++;
2222+
continue;
22432223
}
2244-
} catch (error) {
2245-
streamRecord.errors.push(error);
2246-
filterSubsequentPayloads(exeContext, path, currentParents);
2247-
streamRecord.addItems(null);
2248-
return currentParents;
2249-
}
22502224

2251-
if (isPromise(completedItem)) {
2252-
const completedItems = completedItem
2253-
.then(undefined, (rawError) => {
2225+
let completedItem: PromiseOrValue<unknown>;
2226+
try {
2227+
try {
2228+
completedItem = completeValue(
2229+
exeContext,
2230+
itemType,
2231+
fieldGroup,
2232+
info,
2233+
itemPath,
2234+
item,
2235+
deferMap,
2236+
streamRecord,
2237+
currentParents,
2238+
);
2239+
} catch (rawError) {
22542240
handleFieldError(
22552241
rawError,
22562242
exeContext,
@@ -2261,23 +2247,91 @@ function executeStreamField(
22612247
streamRecord,
22622248
);
22632249
filterSubsequentPayloads(exeContext, itemPath, currentParents);
2264-
return null;
2265-
})
2266-
.then(
2267-
(value) => [value],
2268-
(error) => {
2269-
streamRecord.errors.push(error);
2270-
filterSubsequentPayloads(exeContext, path, currentParents);
2271-
return null;
2272-
},
2250+
completedItem = null;
2251+
}
2252+
} catch (error) {
2253+
if (fieldGroup.inInitialResult) {
2254+
streamRecord.errors.push(error);
2255+
}
2256+
returnStreamIteratorIgnoringError(streamContext);
2257+
filterSubsequentPayloads(exeContext, path, currentParents);
2258+
streamRecord.addItems(null);
2259+
break;
2260+
}
2261+
2262+
if (isPromise(completedItem)) {
2263+
// eslint-disable-next-line @typescript-eslint/no-floating-promises
2264+
handlePromisedStreamResult(
2265+
completedItem,
2266+
streamRecord,
2267+
exeContext,
2268+
itemType,
2269+
fieldGroup,
2270+
path,
2271+
itemPath,
2272+
deferMap,
2273+
streamContext,
22732274
);
2275+
} else {
2276+
streamRecord.addItems([completedItem]);
2277+
}
22742278

2275-
streamRecord.addItems(completedItems);
2276-
return currentParents;
2279+
index++;
22772280
}
2281+
}
22782282

2279-
streamRecord.addItems([completedItem]);
2280-
return currentParents;
2283+
async function handlePromisedItemForStream(
2284+
exeContext: ExecutionContext,
2285+
returnType: GraphQLOutputType,
2286+
fieldGroup: FieldGroup,
2287+
info: GraphQLResolveInfo,
2288+
path: Path<FieldGroup>,
2289+
itemPath: Path<FieldGroup>,
2290+
item: Promise<unknown>,
2291+
deferMap: Map<DeferUsage, DeferredFragmentRecord>,
2292+
streamRecord: StreamRecord,
2293+
parentRecords: Array<AsyncPayloadRecord> | undefined,
2294+
): Promise<void> {
2295+
try {
2296+
try {
2297+
const resolvedItem = await item;
2298+
let completedItem = completeValue(
2299+
exeContext,
2300+
returnType,
2301+
fieldGroup,
2302+
info,
2303+
itemPath,
2304+
resolvedItem,
2305+
deferMap,
2306+
streamRecord,
2307+
parentRecords,
2308+
);
2309+
// TODO: add test for this
2310+
/* c8 ignore next 3 */
2311+
if (isPromise(completedItem)) {
2312+
completedItem = await completedItem;
2313+
}
2314+
streamRecord.addItems([completedItem]);
2315+
} catch (rawError) {
2316+
handleFieldError(
2317+
rawError,
2318+
exeContext,
2319+
returnType,
2320+
fieldGroup,
2321+
itemPath,
2322+
deferMap,
2323+
streamRecord,
2324+
);
2325+
filterSubsequentPayloads(exeContext, itemPath, [streamRecord]);
2326+
streamRecord.addItems([null]);
2327+
}
2328+
} catch (error) {
2329+
if (fieldGroup.inInitialResult) {
2330+
streamRecord.errors.push(error);
2331+
}
2332+
filterSubsequentPayloads(exeContext, path, [streamRecord]);
2333+
streamRecord.addItems(null);
2334+
}
22812335
}
22822336

22832337
async function executeStreamAsyncIterator(
@@ -2765,7 +2819,7 @@ class StreamRecord {
27652819
});
27662820
}
27672821

2768-
addItems(items: PromiseOrValue<Array<unknown> | null>) {
2822+
addItems(items: Array<unknown> | null) {
27692823
if (this.parents !== undefined) {
27702824
const parentPromises = this.parents.map((parent) => parent.promise);
27712825
this._resolve?.(Promise.any(parentPromises).then(() => items));

0 commit comments

Comments
 (0)