Skip to content

Commit 2c14b2b

Browse files
committed
JAVA-2238: Remove DefaultServerMonitor.invalidate and replace usage of it with DefaultServerMonitor.check. This removes any need to create a new monitor thread and simplifies any necessary synchronization that goes along with that.
1 parent 6d29d1a commit 2c14b2b

File tree

6 files changed

+19
-119
lines changed

6 files changed

+19
-119
lines changed

driver-core/src/main/com/mongodb/connection/DefaultServer.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ public void invalidate() {
125125
.address(serverId.getAddress())
126126
.build()));
127127
connectionPool.invalidate();
128-
serverMonitor.invalidate();
128+
connect();
129129
}
130130

131131
@Override

driver-core/src/main/com/mongodb/connection/DefaultServerMonitor.java

+8-29
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
import java.util.concurrent.locks.Lock;
3333
import java.util.concurrent.locks.ReentrantLock;
3434

35-
import static com.mongodb.assertions.Assertions.isTrue;
3635
import static com.mongodb.connection.CommandHelper.executeCommand;
3736
import static com.mongodb.connection.DescriptionHelper.createServerDescription;
3837
import static com.mongodb.connection.ServerConnectionState.CONNECTING;
@@ -52,8 +51,8 @@ class DefaultServerMonitor implements ServerMonitor {
5251
private final InternalConnectionFactory internalConnectionFactory;
5352
private final ConnectionPool connectionPool;
5453
private final ServerSettings settings;
55-
private volatile ServerMonitorRunnable monitor;
56-
private volatile Thread monitorThread;
54+
private final ServerMonitorRunnable monitor;
55+
private final Thread monitorThread;
5756
private final Lock lock = new ReentrantLock();
5857
private final Condition condition = lock.newCondition();
5958
private volatile boolean isClosed;
@@ -69,7 +68,9 @@ class DefaultServerMonitor implements ServerMonitor {
6968
this.serverStateListener = serverStateListener;
7069
this.internalConnectionFactory = internalConnectionFactory;
7170
this.connectionPool = connectionPool;
72-
monitorThread = createMonitorThread();
71+
monitor = new ServerMonitorRunnable();
72+
monitorThread = new Thread(monitor, "cluster-" + this.serverId.getClusterId() + "-" + this.serverId.getAddress());
73+
monitorThread.setDaemon(true);
7374
isClosed = false;
7475
}
7576

@@ -88,45 +89,23 @@ public void connect() {
8889
}
8990
}
9091

91-
@Override
92-
public void invalidate() {
93-
isTrue("open", !isClosed);
94-
monitor.close();
95-
monitorThread.interrupt();
96-
monitorThread = createMonitorThread();
97-
monitorThread.start();
98-
}
99-
10092
@Override
10193
public void close() {
102-
monitor.close();
103-
monitorThread.interrupt();
10494
isClosed = true;
105-
}
106-
107-
Thread createMonitorThread() {
108-
monitor = new ServerMonitorRunnable();
109-
Thread monitorThread = new Thread(monitor, "cluster-" + serverId.getClusterId() + "-" + serverId.getAddress());
110-
monitorThread.setDaemon(true);
111-
return monitorThread;
95+
monitorThread.interrupt();
11296
}
11397

11498
class ServerMonitorRunnable implements Runnable {
115-
private volatile boolean monitorIsClosed;
11699
private final ExponentiallyWeightedMovingAverage averageRoundTripTime = new ExponentiallyWeightedMovingAverage(0.2);
117100

118-
public void close() {
119-
monitorIsClosed = true;
120-
}
121-
122101
@Override
123102
@SuppressWarnings("unchecked")
124103
public synchronized void run() {
125104
InternalConnection connection = null;
126105
try {
127106
ServerDescription currentServerDescription = getConnectingServerDescription(null);
128107
Throwable currentException = null;
129-
while (!monitorIsClosed) {
108+
while (!isClosed) {
130109
ServerDescription previousServerDescription = currentServerDescription;
131110
Throwable previousException = currentException;
132111
currentException = null;
@@ -167,7 +146,7 @@ public synchronized void run() {
167146
currentServerDescription = getConnectingServerDescription(t);
168147
}
169148

170-
if (!monitorIsClosed) {
149+
if (!isClosed) {
171150
try {
172151
logStateChange(previousServerDescription, previousException, currentServerDescription, currentException);
173152
sendStateChangedEvent(previousServerDescription, currentServerDescription);

driver-core/src/main/com/mongodb/connection/ServerMonitor.java

-2
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,6 @@ interface ServerMonitor {
2222

2323
void connect();
2424

25-
void invalidate();
26-
2725
void close();
2826

2927
}

driver-core/src/test/unit/com/mongodb/connection/DefaultServerMonitorSpecification.groovy

-73
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
package com.mongodb.connection
1818

19-
import com.mongodb.MongoSocketOpenException
2019
import com.mongodb.MongoSocketReadTimeoutException
2120
import com.mongodb.ServerAddress
2221
import com.mongodb.event.ServerHeartbeatFailedEvent
@@ -38,78 +37,6 @@ class DefaultServerMonitorSpecification extends Specification {
3837

3938
DefaultServerMonitor monitor
4039

41-
def 'A thread interrupt should send a sendStateChangedEvent'() {
42-
given:
43-
def stateChanged = false
44-
def latch = new CountDownLatch(1);
45-
def changeListener = new ChangeListener<ServerDescription>() {
46-
@Override
47-
void stateChanged(final ChangeEvent<ServerDescription> event) {
48-
stateChanged = true;
49-
latch.countDown()
50-
}
51-
}
52-
def internalConnectionFactory = Mock(InternalConnectionFactory) {
53-
create(_) >> {
54-
Mock(InternalConnection) {
55-
open() >> { throw new MongoSocketOpenException('open', new ServerAddress(), new IOException()) }
56-
}
57-
}
58-
}
59-
monitor = new DefaultServerMonitor(new ServerId(new ClusterId(), new ServerAddress()),
60-
ServerSettings.builder().addServerListener(new NoOpServerListener()).build(),
61-
changeListener, internalConnectionFactory, new TestConnectionPool())
62-
monitor.start()
63-
64-
when:
65-
monitor.monitorThread.interrupt()
66-
latch.await()
67-
68-
then:
69-
stateChanged
70-
71-
cleanup:
72-
monitor?.close()
73-
}
74-
75-
def 'invalidate should not send a sendStateChangedEvent'() {
76-
given:
77-
def stateChanged = false
78-
def changeListener = new ChangeListener<ServerDescription>() {
79-
@Override
80-
void stateChanged(final ChangeEvent<ServerDescription> event) {
81-
stateChanged = true;
82-
}
83-
}
84-
def latch = new CountDownLatch(1)
85-
def internalConnectionFactory = Mock(InternalConnectionFactory) {
86-
create(_) >> {
87-
Mock(InternalConnection) {
88-
open() >> {
89-
latch.countDown()
90-
Thread.sleep(Long.MAX_VALUE);
91-
}
92-
}
93-
}
94-
}
95-
monitor = new DefaultServerMonitor(new ServerId(new ClusterId(), new ServerAddress()),
96-
ServerSettings.builder().addServerListener(new NoOpServerListener()).build(),
97-
changeListener, internalConnectionFactory, new TestConnectionPool())
98-
monitor.start()
99-
def monitorThread = monitor.monitorThread
100-
latch.await()
101-
102-
when:
103-
monitor.invalidate()
104-
monitorThread.join();
105-
106-
then:
107-
!stateChanged
108-
109-
cleanup:
110-
monitor?.close()
111-
}
112-
11340
def 'close should not send a sendStateChangedEvent'() {
11441
given:
11542
def stateChanged = false

driver-core/src/test/unit/com/mongodb/connection/DefaultServerSpecification.groovy

+10-10
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ class DefaultServerSpecification extends Specification {
144144
then:
145145
thrown(MongoSecurityException)
146146
1 * connectionPool.invalidate()
147-
1 * serverMonitor.invalidate()
147+
1 * serverMonitor.connect()
148148
}
149149

150150
def 'failed open should invalidate the server asychronously'() {
@@ -170,7 +170,7 @@ class DefaultServerSpecification extends Specification {
170170
then:
171171
!receivedConnection
172172
receivedThrowable.is(exceptionToThrow)
173-
1 * serverMonitor.invalidate()
173+
1 * serverMonitor.connect()
174174
}
175175

176176
def 'should invalidate on MongoNotPrimaryException'() {
@@ -196,7 +196,7 @@ class DefaultServerSpecification extends Specification {
196196
then:
197197
thrown(MongoNotPrimaryException)
198198
1 * connectionPool.invalidate()
199-
1 * serverMonitor.invalidate()
199+
1 * serverMonitor.connect()
200200

201201
when:
202202
def futureResultCallback = new FutureResultCallback()
@@ -207,7 +207,7 @@ class DefaultServerSpecification extends Specification {
207207
then:
208208
thrown(MongoNotPrimaryException)
209209
1 * connectionPool.invalidate()
210-
1 * serverMonitor.invalidate()
210+
1 * serverMonitor.connect()
211211

212212
when:
213213
futureResultCallback = new FutureResultCallback()
@@ -218,7 +218,7 @@ class DefaultServerSpecification extends Specification {
218218
then:
219219
thrown(MongoNotPrimaryException)
220220
1 * connectionPool.invalidate()
221-
1 * serverMonitor.invalidate()
221+
1 * serverMonitor.connect()
222222
}
223223

224224
def 'should invalidate on MongoNodeIsRecoveringException'() {
@@ -244,7 +244,7 @@ class DefaultServerSpecification extends Specification {
244244
then:
245245
thrown(MongoNodeIsRecoveringException)
246246
1 * connectionPool.invalidate()
247-
1 * serverMonitor.invalidate()
247+
1 * serverMonitor.connect()
248248
}
249249

250250

@@ -271,7 +271,7 @@ class DefaultServerSpecification extends Specification {
271271
then:
272272
thrown(MongoSocketException)
273273
1 * connectionPool.invalidate()
274-
1 * serverMonitor.invalidate()
274+
1 * serverMonitor.connect()
275275

276276
when:
277277
def futureResultCallback = new FutureResultCallback<WriteConcernResult>()
@@ -282,7 +282,7 @@ class DefaultServerSpecification extends Specification {
282282
then:
283283
thrown(MongoSocketException)
284284
1 * connectionPool.invalidate()
285-
1 * serverMonitor.invalidate()
285+
1 * serverMonitor.connect()
286286
}
287287

288288
def 'should not invalidate on MongoSocketReadTimeoutException'() {
@@ -309,7 +309,7 @@ class DefaultServerSpecification extends Specification {
309309
then:
310310
thrown(MongoSocketReadTimeoutException)
311311
0 * connectionPool.invalidate()
312-
0 * serverMonitor.invalidate()
312+
0 * serverMonitor.connect()
313313

314314
when:
315315
def futureResultCallback = new FutureResultCallback<WriteConcernResult>()
@@ -320,7 +320,7 @@ class DefaultServerSpecification extends Specification {
320320
then:
321321
thrown(MongoSocketReadTimeoutException)
322322
0 * connectionPool.invalidate()
323-
0 * serverMonitor.invalidate()
323+
0 * serverMonitor.connect()
324324
}
325325

326326
def 'should enable command listener'() {

driver-core/src/test/unit/com/mongodb/connection/TestServerMonitor.java

-4
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,6 @@ public void start() {
3535
public void connect() {
3636
}
3737

38-
@Override
39-
public void invalidate() {
40-
}
41-
4238
@Override
4339
public void close() {
4440
}

0 commit comments

Comments
 (0)