Skip to content

Commit 44b1e89

Browse files
committed
Ensure that AsyncReadWriteBinding is released prior to subscriber notification (#676)
Previously the driver assumed that Mono#doOnTerminate is executed prior to the subscriber being notified of completion. But it turns out that behavior is not guaranteed in the Californium release of Project Reactor (though it is in later releases). So instead, now the SingleResultCallback that is used to notify the Mongo of completion is wrapped by one that first releases the binding, and Mono#doOnTerminate is no longer used. JAVA-4027
1 parent 74f836b commit 44b1e89

File tree

1 file changed

+20
-13
lines changed

1 file changed

+20
-13
lines changed

driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/OperationExecutorImpl.java

+20-13
Original file line numberDiff line numberDiff line change
@@ -66,12 +66,16 @@ public <T> Mono<T> execute(final AsyncReadOperation<T> operation, final ReadPref
6666
binding.release();
6767
return Mono.error(new MongoClientException("Read preference in a transaction must be primary"));
6868
} else {
69-
return Mono.<T>create(sink -> operation.executeAsync(binding, sinkToCallback(sink)))
70-
.doOnTerminate(binding::release)
71-
.doOnError((t) -> {
72-
labelException(session, t);
73-
unpinServerAddressOnTransientTransactionError(session, t);
74-
});
69+
return Mono.<T>create(sink -> operation.executeAsync(binding, (result, t) -> {
70+
try {
71+
binding.release();
72+
} finally {
73+
sinkToCallback(sink).onResult(result, t);
74+
}
75+
})).doOnError((t) -> {
76+
labelException(session, t);
77+
unpinServerAddressOnTransientTransactionError(session, t);
78+
});
7579
}
7680
});
7781
}
@@ -86,13 +90,16 @@ public <T> Mono<T> execute(final AsyncWriteOperation<T> operation, final ReadCon
8690
session == null && clientSession != null))
8791
.switchIfEmpty(Mono.fromCallable(() -> getReadWriteBinding(ReadPreference.primary(), readConcern, session, false)))
8892
.flatMap(binding ->
89-
Mono.<T>create(sink -> operation.executeAsync(binding, sinkToCallback(sink)))
90-
.doOnTerminate(binding::release)
91-
.doOnError((t) -> {
92-
labelException(session, t);
93-
unpinServerAddressOnTransientTransactionError(session, t);
94-
})
95-
93+
Mono.<T>create(sink -> operation.executeAsync(binding, (result, t) -> {
94+
try {
95+
binding.release();
96+
} finally {
97+
sinkToCallback(sink).onResult(result, t);
98+
}
99+
})).doOnError((t) -> {
100+
labelException(session, t);
101+
unpinServerAddressOnTransientTransactionError(session, t);
102+
})
96103
);
97104
}
98105

0 commit comments

Comments
 (0)