File tree 4 files changed +30
-2
lines changed
4 files changed +30
-2
lines changed Original file line number Diff line number Diff line change @@ -233,14 +233,17 @@ def gen_ordered(
233
233
operation, in the order **provided**.
234
234
"""
235
235
run = None
236
+ ctr = 0
236
237
for idx , request in enumerate (requests ):
237
238
retryable = process (request )
238
239
(op_type , operation ) = self .ops [idx ]
239
240
if run is None :
240
241
run = _Run (op_type )
241
- elif run .op_type != op_type :
242
+ elif run .op_type != op_type or ctr >= common . MAX_WRITE_BATCH_SIZE // 200 :
242
243
yield run
244
+ ctr = 0
243
245
run = _Run (op_type )
246
+ ctr += 1
244
247
run .add (idx , operation )
245
248
run .is_retryable = run .is_retryable and retryable
246
249
if run is None :
@@ -604,6 +607,9 @@ async def _execute_command(
604
607
break
605
608
# Reset our state
606
609
self .current_run = run = self .next_run
610
+ import gc
611
+
612
+ gc .collect ()
607
613
608
614
async def execute_command (
609
615
self ,
Original file line number Diff line number Diff line change @@ -233,14 +233,17 @@ def gen_ordered(
233
233
operation, in the order **provided**.
234
234
"""
235
235
run = None
236
+ ctr = 0
236
237
for idx , request in enumerate (requests ):
237
238
retryable = process (request )
238
239
(op_type , operation ) = self .ops [idx ]
239
240
if run is None :
240
241
run = _Run (op_type )
241
- elif run .op_type != op_type :
242
+ elif run .op_type != op_type or ctr >= common . MAX_WRITE_BATCH_SIZE // 200 :
242
243
yield run
244
+ ctr = 0
243
245
run = _Run (op_type )
246
+ ctr += 1
244
247
run .add (idx , operation )
245
248
run .is_retryable = run .is_retryable and retryable
246
249
if run is None :
@@ -604,6 +607,9 @@ def _execute_command(
604
607
break
605
608
# Reset our state
606
609
self .current_run = run = self .next_run
610
+ import gc
611
+
612
+ gc .collect ()
607
613
608
614
def execute_command (
609
615
self ,
Original file line number Diff line number Diff line change @@ -314,6 +314,14 @@ async def test_numerous_inserts_generator(self):
314
314
self .assertEqual (n_docs , result .inserted_count )
315
315
self .assertEqual (n_docs , await self .coll .count_documents ({}))
316
316
317
+ async def test_huge_inserts_generator (self ):
318
+ # Ensure we don't exceed server's maxWriteBatchSize size limit.
319
+ n_docs = 1000000
320
+ requests = (InsertOne ({"x" : "large" * 1024 * 1024 }) for _ in range (n_docs ))
321
+ result = await self .coll .bulk_write (requests )
322
+ self .assertEqual (n_docs , result .inserted_count )
323
+ self .assertEqual (n_docs , await self .coll .count_documents ({}))
324
+
317
325
async def test_bulk_max_message_size (self ):
318
326
await self .coll .delete_many ({})
319
327
self .addAsyncCleanup (self .coll .delete_many , {})
Original file line number Diff line number Diff line change @@ -314,6 +314,14 @@ def test_numerous_inserts_generator(self):
314
314
self .assertEqual (n_docs , result .inserted_count )
315
315
self .assertEqual (n_docs , self .coll .count_documents ({}))
316
316
317
+ def test_huge_inserts_generator (self ):
318
+ # Ensure we don't exceed server's maxWriteBatchSize size limit.
319
+ n_docs = 1000000
320
+ requests = (InsertOne ({"x" : "large" * 1024 * 1024 }) for _ in range (n_docs ))
321
+ result = self .coll .bulk_write (requests )
322
+ self .assertEqual (n_docs , result .inserted_count )
323
+ self .assertEqual (n_docs , self .coll .count_documents ({}))
324
+
317
325
def test_bulk_max_message_size (self ):
318
326
self .coll .delete_many ({})
319
327
self .addCleanup (self .coll .delete_many , {})
You can’t perform that action at this time.
0 commit comments