@@ -169,10 +169,18 @@ export interface FormattedCompletedResult {
169
169
errors ?: ReadonlyArray < GraphQLError > ;
170
170
}
171
171
172
+ interface IncrementalStreamTarget {
173
+ errors ?: Array < GraphQLError > ;
174
+ items : Array < unknown > ;
175
+ }
176
+
172
177
interface IncrementalAggregate {
173
178
newPendingSources : Set < DeferredFragmentRecord | StreamRecord > ;
174
179
incrementalResults : Array < IncrementalResult > ;
175
180
completedResults : Array < CompletedResult > ;
181
+ deferParents : Map < DeferredFragmentRecord , SubsequentDataRecord > ;
182
+ initialStreams : Map < StreamRecord , SubsequentDataRecord > ;
183
+ streamTargets : Map < StreamRecord , IncrementalStreamTarget > ;
176
184
}
177
185
178
186
/**
@@ -234,14 +242,24 @@ export class IncrementalPublisher {
234
242
235
243
const parentDeferUsage = deferUsage . ancestors [ 0 ] ;
236
244
237
- const parent =
238
- parentDeferUsage === undefined
239
- ? ( incrementalDataRecord as InitialResultRecord | StreamItemsRecord )
240
- : this . _deferredFragmentRecordFromDeferUsage (
241
- parentDeferUsage ,
242
- newDeferMap ,
243
- ) ;
244
- parent . children . add ( deferredFragmentRecord ) ;
245
+ if ( parentDeferUsage === undefined ) {
246
+ const parent = incrementalDataRecord as
247
+ | InitialResultRecord
248
+ | StreamItemsRecord ;
249
+ parent . children . add ( deferredFragmentRecord ) ;
250
+ if ( isStreamItemsRecord ( incrementalDataRecord ) ) {
251
+ incrementalDataRecord . childDefers . add ( deferredFragmentRecord ) ;
252
+ }
253
+ } else {
254
+ const parent = this . _deferredFragmentRecordFromDeferUsage (
255
+ parentDeferUsage ,
256
+ newDeferMap ,
257
+ ) ;
258
+ parent . children . add ( deferredFragmentRecord ) ;
259
+ if ( ! isInitialResultRecord ( incrementalDataRecord ) ) {
260
+ incrementalDataRecord . childDefers . add ( deferredFragmentRecord ) ;
261
+ }
262
+ }
245
263
246
264
newDeferMap . set ( deferUsage , deferredFragmentRecord ) ;
247
265
}
@@ -307,9 +325,15 @@ export class IncrementalPublisher {
307
325
} ) ;
308
326
309
327
if ( isDeferredGroupedFieldSetRecord ( incrementalDataRecord ) ) {
328
+ incrementalDataRecord . childStreams . add ( streamRecord ) ;
310
329
for ( const parent of incrementalDataRecord . deferredFragmentRecords ) {
311
330
parent . children . add ( streamItemsRecord ) ;
312
331
}
332
+ } else if ( isStreamItemsRecord ( incrementalDataRecord ) ) {
333
+ if ( streamRecord !== incrementalDataRecord . streamRecord ) {
334
+ incrementalDataRecord . childStreams . add ( streamRecord ) ;
335
+ }
336
+ incrementalDataRecord . children . add ( streamItemsRecord ) ;
313
337
} else {
314
338
incrementalDataRecord . children . add ( streamItemsRecord ) ;
315
339
}
@@ -596,15 +620,23 @@ export class IncrementalPublisher {
596
620
newPendingSources : new Set < DeferredFragmentRecord | StreamRecord > ( ) ,
597
621
incrementalResults : [ ] ,
598
622
completedResults : [ ] ,
623
+ deferParents : new Map ( ) ,
624
+ initialStreams : new Map ( ) ,
625
+ streamTargets : new Map ( ) ,
599
626
} ;
600
627
}
601
628
602
629
private _incrementalReducer (
603
630
aggregate : IncrementalAggregate ,
604
631
completedRecords : ReadonlySet < SubsequentResultRecord > ,
605
632
) : IncrementalAggregate {
606
- const { newPendingSources, incrementalResults, completedResults } =
607
- aggregate ;
633
+ const {
634
+ newPendingSources,
635
+ incrementalResults,
636
+ completedResults,
637
+ deferParents,
638
+ initialStreams,
639
+ } = aggregate ;
608
640
for ( const subsequentResultRecord of completedRecords ) {
609
641
for ( const child of subsequentResultRecord . children ) {
610
642
if ( child . filtered ) {
@@ -636,14 +668,56 @@ export class IncrementalPublisher {
636
668
if ( subsequentResultRecord . streamRecord . errors . length > 0 ) {
637
669
continue ;
638
670
}
639
- const incrementalResult : IncrementalStreamResult = {
640
- items : subsequentResultRecord . items ,
641
- path : subsequentResultRecord . streamRecord . path ,
642
- } ;
643
- if ( subsequentResultRecord . errors . length > 0 ) {
644
- incrementalResult . errors = subsequentResultRecord . errors ;
671
+ this . _updateTargets ( subsequentResultRecord , aggregate ) ;
672
+ const streamRecord = subsequentResultRecord . streamRecord ;
673
+ const initialStream = initialStreams . get ( streamRecord ) ;
674
+ if ( initialStream === undefined ) {
675
+ initialStreams . set ( streamRecord , subsequentResultRecord ) ;
676
+ const streamResult : IncrementalStreamResult = {
677
+ items : subsequentResultRecord . items ,
678
+ path : streamRecord . path ,
679
+ } ;
680
+ if ( subsequentResultRecord . errors . length > 0 ) {
681
+ streamResult . errors = subsequentResultRecord . errors ;
682
+ }
683
+ incrementalResults . push ( streamResult ) ;
684
+ } else if ( isStreamItemsRecord ( initialStream ) ) {
685
+ if ( initialStream . streamRecord === streamRecord ) {
686
+ if ( subsequentResultRecord . items . length > 0 ) {
687
+ initialStream . items . push ( ...subsequentResultRecord . items ) ;
688
+ }
689
+ this . _updateTargetErrors (
690
+ initialStream ,
691
+ subsequentResultRecord . errors ,
692
+ ) ;
693
+ } else {
694
+ const target = this . _findTargetFromStreamPath (
695
+ initialStream . items ,
696
+ initialStream . path ,
697
+ streamRecord . path ,
698
+ ) as Array < unknown > ;
699
+ if ( subsequentResultRecord . items . length > 0 ) {
700
+ target . push ( ...subsequentResultRecord . items ) ;
701
+ }
702
+ this . _updateTargetErrors (
703
+ initialStream ,
704
+ subsequentResultRecord . errors ,
705
+ ) ;
706
+ }
707
+ } else {
708
+ const target = this . _findTarget (
709
+ initialStream . data as ObjMap < unknown > ,
710
+ initialStream . path ,
711
+ streamRecord . path ,
712
+ ) as Array < unknown > ;
713
+ if ( subsequentResultRecord . items . length > 0 ) {
714
+ target . push ( ...subsequentResultRecord . items ) ;
715
+ }
716
+ this . _updateTargetErrors (
717
+ initialStream ,
718
+ subsequentResultRecord . errors ,
719
+ ) ;
645
720
}
646
- incrementalResults . push ( incrementalResult ) ;
647
721
}
648
722
} else {
649
723
newPendingSources . delete ( subsequentResultRecord ) ;
@@ -653,18 +727,50 @@ export class IncrementalPublisher {
653
727
if ( subsequentResultRecord . errors . length > 0 ) {
654
728
continue ;
655
729
}
730
+ const parent = deferParents . get ( subsequentResultRecord ) ;
656
731
for ( const deferredGroupedFieldSetRecord of subsequentResultRecord . deferredGroupedFieldSetRecords ) {
657
732
if ( ! deferredGroupedFieldSetRecord . sent ) {
733
+ this . _updateTargets ( deferredGroupedFieldSetRecord , aggregate ) ;
658
734
deferredGroupedFieldSetRecord . sent = true ;
659
- const incrementalResult : IncrementalDeferResult = {
660
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
661
- data : deferredGroupedFieldSetRecord . data ! ,
662
- path : deferredGroupedFieldSetRecord . path ,
663
- } ;
664
- if ( deferredGroupedFieldSetRecord . errors . length > 0 ) {
665
- incrementalResult . errors = deferredGroupedFieldSetRecord . errors ;
735
+ if ( parent === undefined ) {
736
+ const incrementalResult : IncrementalDeferResult = {
737
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
738
+ data : deferredGroupedFieldSetRecord . data ! ,
739
+ path : deferredGroupedFieldSetRecord . path ,
740
+ } ;
741
+ if ( deferredGroupedFieldSetRecord . errors . length > 0 ) {
742
+ incrementalResult . errors = deferredGroupedFieldSetRecord . errors ;
743
+ }
744
+ incrementalResults . push ( incrementalResult ) ;
745
+ } else {
746
+ const deferredFragmentTarget = isStreamItemsRecord ( parent )
747
+ ? this . _findTargetFromStreamPath (
748
+ parent . items ,
749
+ parent . path ,
750
+ deferredGroupedFieldSetRecord . path ,
751
+ )
752
+ : this . _findTarget (
753
+ parent . data as ObjMap < unknown > ,
754
+ parent . path ,
755
+ deferredGroupedFieldSetRecord . path ,
756
+ ) ;
757
+
758
+ const deferredGroupedFieldSetTarget = this . _findTarget (
759
+ deferredFragmentTarget ,
760
+ subsequentResultRecord . path ,
761
+ deferredGroupedFieldSetRecord . path ,
762
+ ) ;
763
+ const data =
764
+ deferredGroupedFieldSetRecord . data as ObjMap < unknown > ;
765
+ for ( const key of Object . keys ( data ) ) {
766
+ ( deferredGroupedFieldSetTarget as ObjMap < unknown > ) [ key ] =
767
+ data [ key ] ;
768
+ }
769
+ this . _updateTargetErrors (
770
+ parent ,
771
+ deferredGroupedFieldSetRecord . errors ,
772
+ ) ;
666
773
}
667
- incrementalResults . push ( incrementalResult ) ;
668
774
}
669
775
}
670
776
}
@@ -673,6 +779,72 @@ export class IncrementalPublisher {
673
779
return aggregate ;
674
780
}
675
781
782
+ private _updateTargets (
783
+ subsequentDataRecord : SubsequentDataRecord ,
784
+ aggregate : IncrementalAggregate ,
785
+ ) : void {
786
+ const { childDefers, childStreams } = subsequentDataRecord ;
787
+ const { deferParents, initialStreams } = aggregate ;
788
+ for ( const childDefer of childDefers ) {
789
+ deferParents . set ( childDefer , subsequentDataRecord ) ;
790
+ }
791
+ for ( const childStream of childStreams ) {
792
+ initialStreams . set ( childStream , subsequentDataRecord ) ;
793
+ }
794
+ }
795
+
796
+ private _findTarget (
797
+ data : ObjMap < unknown > | Array < unknown > ,
798
+ dataPath : ReadonlyArray < string | number > ,
799
+ targetPath : ReadonlyArray < string | number > ,
800
+ ) : ObjMap < unknown > | Array < unknown > {
801
+ let i = 0 ;
802
+ while ( i < dataPath . length ) {
803
+ i ++ ;
804
+ }
805
+ let dataOrItems = data ;
806
+ while ( i < targetPath . length ) {
807
+ const key = targetPath [ i ++ ] ;
808
+ const value = ( dataOrItems as ObjMap < unknown > ) [ key as string ] ;
809
+ dataOrItems = value as ObjMap < unknown > ;
810
+ }
811
+ return dataOrItems ;
812
+ }
813
+
814
+ private _findTargetFromStreamPath (
815
+ data : ObjMap < unknown > | Array < unknown > ,
816
+ dataPath : ReadonlyArray < string | number > ,
817
+ targetPath : ReadonlyArray < string | number > ,
818
+ ) : ObjMap < unknown > | Array < unknown > {
819
+ const pathToStream = [ ...dataPath ] ;
820
+ const start = pathToStream . pop ( ) as number ;
821
+ let i = 0 ;
822
+ while ( i < pathToStream . length ) {
823
+ i ++ ;
824
+ }
825
+ const adjustedIndex = ( targetPath [ i ++ ] as number ) - start ;
826
+ let dataOrItems = ( data as Array < unknown > ) [ adjustedIndex ] ;
827
+ while ( i < targetPath . length ) {
828
+ const key = targetPath [ i ++ ] ;
829
+ const value = ( dataOrItems as ObjMap < unknown > ) [ key as string ] ;
830
+ dataOrItems = value as ObjMap < unknown > ;
831
+ }
832
+ return dataOrItems as ObjMap < unknown > | Array < unknown > ;
833
+ }
834
+
835
+ private _updateTargetErrors (
836
+ subsequentDataRecord : SubsequentDataRecord ,
837
+ errors : ReadonlyArray < GraphQLError > ,
838
+ ) : void {
839
+ for ( const error of errors ) {
840
+ if ( subsequentDataRecord . errors === undefined ) {
841
+ subsequentDataRecord . errors = [ error ] ;
842
+ } else {
843
+ subsequentDataRecord . errors . push ( error ) ;
844
+ }
845
+ }
846
+ }
847
+
676
848
private _incrementalFinalizer (
677
849
aggregate : IncrementalAggregate ,
678
850
) : SubsequentIncrementalExecutionResult {
@@ -832,6 +1004,8 @@ export class DeferredGroupedFieldSetRecord {
832
1004
deferPriority : number ;
833
1005
streamPriority : number ;
834
1006
deferredFragmentRecords : ReadonlyArray < DeferredFragmentRecord > ;
1007
+ childDefers : Set < DeferredFragmentRecord > ;
1008
+ childStreams : Set < StreamRecord > ;
835
1009
groupedFieldSet : GroupedFieldSet ;
836
1010
shouldInitiateDefer : boolean ;
837
1011
errors : Array < GraphQLError > ;
@@ -852,6 +1026,8 @@ export class DeferredGroupedFieldSetRecord {
852
1026
this . deferPriority = opts . deferPriority ;
853
1027
this . streamPriority = opts . streamPriority ;
854
1028
this . deferredFragmentRecords = opts . deferredFragmentRecords ;
1029
+ this . childDefers = new Set ( ) ;
1030
+ this . childStreams = new Set ( ) ;
855
1031
this . groupedFieldSet = opts . groupedFieldSet ;
856
1032
this . shouldInitiateDefer = opts . shouldInitiateDefer ;
857
1033
this . errors = [ ] ;
@@ -918,6 +1094,8 @@ export class StreamItemsRecord {
918
1094
streamPriority : number ;
919
1095
items : Array < unknown > ;
920
1096
children : Set < SubsequentResultRecord > ;
1097
+ childDefers : Set < DeferredFragmentRecord > ;
1098
+ childStreams : Set < StreamRecord > ;
921
1099
isFinalRecord ?: boolean ;
922
1100
isCompletedAsyncIterator ?: boolean ;
923
1101
isCompleted : boolean ;
@@ -937,6 +1115,8 @@ export class StreamItemsRecord {
937
1115
this . deferPriority = opts . deferPriority ;
938
1116
this . streamPriority = opts . streamPriority ;
939
1117
this . children = new Set ( ) ;
1118
+ this . childDefers = new Set ( ) ;
1119
+ this . childStreams = new Set ( ) ;
940
1120
this . errors = [ ] ;
941
1121
this . isCompleted = false ;
942
1122
this . filtered = false ;
@@ -954,13 +1134,18 @@ export class StreamItemsRecord {
954
1134
}
955
1135
}
956
1136
957
- export type IncrementalDataRecord =
958
- | InitialResultRecord
959
- | DeferredGroupedFieldSetRecord
960
- | StreamItemsRecord ;
1137
+ export type IncrementalDataRecord = InitialResultRecord | SubsequentDataRecord ;
1138
+
1139
+ type SubsequentDataRecord = DeferredGroupedFieldSetRecord | StreamItemsRecord ;
961
1140
962
1141
export type SubsequentResultRecord = DeferredFragmentRecord | StreamItemsRecord ;
963
1142
1143
+ function isInitialResultRecord (
1144
+ incrementalDataRecord : unknown ,
1145
+ ) : incrementalDataRecord is InitialResultRecord {
1146
+ return incrementalDataRecord instanceof InitialResultRecord ;
1147
+ }
1148
+
964
1149
function isDeferredGroupedFieldSetRecord (
965
1150
incrementalDataRecord : unknown ,
966
1151
) : incrementalDataRecord is DeferredGroupedFieldSetRecord {
0 commit comments