Skip to content

Commit 443b684

Browse files
committed
JAVA-1295: Using difference between the current and previous exception in ServerMonitor to determine whether to log a state change.
1 parent 06e1d91 commit 443b684

File tree

2 files changed

+130
-35
lines changed

2 files changed

+130
-35
lines changed

src/main/com/mongodb/ServerMonitor.java

+49-21
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,7 @@ class ServerMonitor {
4848
private final PooledConnectionProvider connectionProvider;
4949
private int count;
5050
private long elapsedNanosSum;
51-
private volatile ServerDescription serverDescription;
5251
private volatile boolean isClosed;
53-
private volatile DBPort connection;
5452
private final Thread monitorThread;
5553
private final Lock lock = new ReentrantLock();
5654
private final Condition condition = lock.newCondition();
@@ -65,7 +63,6 @@ class ServerMonitor {
6563
this.settings = settings;
6664
this.mongo = mongo;
6765
this.connectionProvider = connectionProvider;
68-
serverDescription = getConnectingServerDescription();
6966
monitorThread = new Thread(new ServerMonitorRunnable(), "cluster-" + clusterId + "-" + serverAddress);
7067
monitorThread.setDaemon(true);
7168
}
@@ -78,16 +75,19 @@ class ServerMonitorRunnable implements Runnable {
7875
@Override
7976
@SuppressWarnings("unchecked")
8077
public void run() {
78+
DBPort connection = null;
8179
try {
80+
ServerDescription currentServerDescription = getConnectingServerDescription();
81+
Throwable currentException = null;
8282
while (!isClosed) {
83-
ServerDescription currentServerDescription = serverDescription;
84-
Throwable throwable = null;
83+
ServerDescription previousServerDescription = currentServerDescription;
84+
Throwable previousException = currentException;
8585
try {
8686
if (connection == null) {
8787
connection = new DBPort(serverAddress, null, getOptions(), 0);
8888
}
8989
try {
90-
serverDescription = lookupServerDescription();
90+
currentServerDescription = lookupServerDescription(connection);
9191
} catch (IOException e) {
9292
// in case the connection has been reset since the last run, do one retry immediately before reporting that the
9393
// server is down
@@ -100,22 +100,22 @@ public void run() {
100100
}
101101
connection = new DBPort(serverAddress, null, getOptions(), 0);
102102
try {
103-
serverDescription = lookupServerDescription();
103+
currentServerDescription = lookupServerDescription(connection);
104104
} catch (IOException e1) {
105105
connection.close();
106106
connection = null;
107107
throw e1;
108108
}
109109
}
110110
} catch (Throwable t) {
111-
throwable = t;
112-
serverDescription = getConnectingServerDescription();
111+
currentException = t;
112+
currentServerDescription = getConnectingServerDescription();
113113
}
114114

115115
if (!isClosed) {
116116
try {
117-
logStateChange(currentServerDescription, throwable);
118-
sendStateChangedEvent(currentServerDescription);
117+
logStateChange(previousServerDescription, previousException, currentServerDescription, currentException);
118+
sendStateChangedEvent(previousServerDescription, currentServerDescription);
119119
} catch (Throwable t) {
120120
LOGGER.log(Level.WARNING, "Exception in monitor thread during notification of server state change", t);
121121
}
@@ -129,21 +129,25 @@ public void run() {
129129
}
130130
}
131131

132-
private void sendStateChangedEvent(final ServerDescription currentServerDescription) {
133-
if (!currentServerDescription.equals(serverDescription) ||
134-
currentServerDescription.getAverageLatencyNanos() != serverDescription.getAverageLatencyNanos()) {
135-
serverStateListener.stateChanged(new ChangeEvent<ServerDescription>(currentServerDescription, serverDescription));
132+
private void sendStateChangedEvent(final ServerDescription previousServerDescription,
133+
final ServerDescription currentServerDescription) {
134+
if (stateHasChanged(previousServerDescription, currentServerDescription)) {
135+
serverStateListener.stateChanged(new ChangeEvent<ServerDescription>(previousServerDescription,
136+
currentServerDescription));
136137
}
137138
}
138139

139-
private void logStateChange(final ServerDescription currentServerDescription, final Throwable throwable) {
140+
private void logStateChange(final ServerDescription previousServerDescription, final Throwable previousException,
141+
final ServerDescription currentServerDescription, final Throwable currentException) {
140142
// Note that the ServerDescription.equals method does not include the average ping time as part of the comparison,
141143
// so this will not spam the logs too hard.
142-
if (!currentServerDescription.equals(serverDescription)) {
143-
if (throwable != null) {
144-
LOGGER.log(Level.INFO, format("Exception in monitor thread while connecting to server %s", serverAddress), throwable);
144+
if (descriptionHasChanged(previousServerDescription, currentServerDescription)
145+
|| exceptionHasChanged(previousException, currentException)) {
146+
if (currentException != null) {
147+
LOGGER.log(Level.INFO, format("Exception in monitor thread while connecting to server %s", serverAddress),
148+
currentException);
145149
} else {
146-
LOGGER.info(format("Monitor thread successfully connected to server with description %s", serverDescription));
150+
LOGGER.info(format("Monitor thread successfully connected to server with description %s", currentServerDescription));
147151
}
148152
}
149153
}
@@ -198,7 +202,31 @@ private MongoOptions getOptions() {
198202
return options;
199203
}
200204

201-
private ServerDescription lookupServerDescription() throws IOException {
205+
static boolean descriptionHasChanged(final ServerDescription previousServerDescription,
206+
final ServerDescription currentServerDescription) {
207+
return !previousServerDescription.equals(currentServerDescription);
208+
}
209+
210+
static boolean stateHasChanged(final ServerDescription previousServerDescription, final ServerDescription currentServerDescription) {
211+
return descriptionHasChanged(previousServerDescription, currentServerDescription) ||
212+
previousServerDescription.getAverageLatencyNanos() != currentServerDescription.getAverageLatencyNanos();
213+
}
214+
215+
static boolean exceptionHasChanged(final Throwable previousException, final Throwable currentException) {
216+
if (currentException == null) {
217+
return previousException != null;
218+
} else if (previousException == null) {
219+
return true;
220+
} else if (!currentException.getClass().equals(previousException.getClass())) {
221+
return true;
222+
} else if (currentException.getMessage() == null) {
223+
return previousException.getMessage() != null;
224+
} else {
225+
return !currentException.getMessage().equals(previousException.getMessage());
226+
}
227+
}
228+
229+
private ServerDescription lookupServerDescription(final DBPort connection) throws IOException {
202230
LOGGER.fine(format("Checking status of %s", serverAddress));
203231
long startNanoTime = System.nanoTime();
204232
final CommandResult isMasterResult = connection.runCommand(mongo.getDB("admin"), new BasicDBObject("ismaster", 1));

src/test/com/mongodb/ServerMonitorSpecification.groovy

+81-14
Original file line numberDiff line numberDiff line change
@@ -1,36 +1,39 @@
11
package com.mongodb
22

33
import java.util.concurrent.CountDownLatch
4+
import java.util.concurrent.TimeUnit
45

56
import static com.mongodb.Fixture.getMongoClient
67
import static com.mongodb.Fixture.serverIsAtLeastVersion
8+
import static com.mongodb.ServerMonitor.exceptionHasChanged
9+
import static com.mongodb.ServerMonitor.stateHasChanged
710
import static org.junit.Assume.assumeFalse
811
import static org.junit.Assume.assumeTrue
912

1013
class ServerMonitorSpecification extends FunctionalSpecification {
1114
ServerDescription newDescription
12-
ServerMonitor serverStateNotifier
15+
ServerMonitor serverMonitor
1316
CountDownLatch latch = new CountDownLatch(1)
1417

1518
def setup() {
1619
def connectionProvider = new PooledConnectionProvider('cluster-1', new ServerAddress(), new DBPortFactory(new MongoOptions()),
1720
ConnectionPoolSettings.builder().maxSize(1).build(),
1821
new JMXConnectionPoolListener());
19-
serverStateNotifier = new ServerMonitor(new ServerAddress(),
20-
new ChangeListener<ServerDescription>() {
21-
@Override
22-
void stateChanged(final ChangeEvent<ServerDescription> event) {
23-
newDescription = event.newValue
24-
latch.countDown()
25-
}
26-
},
27-
SocketSettings.builder().build(), ServerSettings.builder().build(),
28-
'cluster-1', getMongoClient(), connectionProvider)
29-
serverStateNotifier.start()
22+
serverMonitor = new ServerMonitor(new ServerAddress(),
23+
new ChangeListener<ServerDescription>() {
24+
@Override
25+
void stateChanged(final ChangeEvent<ServerDescription> event) {
26+
newDescription = event.newValue
27+
latch.countDown()
28+
}
29+
},
30+
SocketSettings.builder().build(), ServerSettings.builder().build(),
31+
'cluster-1', getMongoClient(), connectionProvider)
32+
serverMonitor.start()
3033
}
3134

3235
def cleanup() {
33-
serverStateNotifier.close();
36+
serverMonitor.close();
3437
}
3538

3639
def 'should set server version'() {
@@ -68,4 +71,68 @@ class ServerMonitorSpecification extends FunctionalSpecification {
6871
then:
6972
newDescription.maxWriteBatchSize == ServerDescription.getDefaultMaxWriteBatchSize()
7073
}
71-
}
74+
75+
def 'should report exception has changed when the current and previous are different'() {
76+
expect:
77+
exceptionHasChanged(null, new NullPointerException())
78+
exceptionHasChanged(new NullPointerException(), null)
79+
exceptionHasChanged(new SocketException(), new SocketException('A message'))
80+
exceptionHasChanged(new SocketException('A message'), new SocketException())
81+
exceptionHasChanged(new SocketException('A message'), new MongoException('A message'))
82+
exceptionHasChanged(new SocketException('A message'), new SocketException('A different message'))
83+
}
84+
85+
def 'should report exception has not changed when the current and previous are the same'() {
86+
expect:
87+
!exceptionHasChanged(null, null)
88+
!exceptionHasChanged(new NullPointerException(), new NullPointerException())
89+
!exceptionHasChanged(new MongoException('A message'), new MongoException('A message'))
90+
}
91+
92+
def 'should report state has changed if descriptions are different'() {
93+
expect:
94+
stateHasChanged(ServerDescription.builder()
95+
.type(ServerType.Unknown)
96+
.state(ServerConnectionState.Connecting)
97+
.address(new ServerAddress())
98+
.build(),
99+
ServerDescription.builder()
100+
.type(ServerType.StandAlone)
101+
.state(ServerConnectionState.Connected)
102+
.address(new ServerAddress())
103+
.averageLatency(5, TimeUnit.MILLISECONDS)
104+
.build());
105+
}
106+
107+
def 'should report state has changed if latencies are different'() {
108+
expect:
109+
stateHasChanged(ServerDescription.builder()
110+
.type(ServerType.StandAlone)
111+
.state(ServerConnectionState.Connected)
112+
.address(new ServerAddress())
113+
.averageLatency(5, TimeUnit.MILLISECONDS)
114+
.build(),
115+
ServerDescription.builder()
116+
.type(ServerType.StandAlone)
117+
.state(ServerConnectionState.Connected)
118+
.address(new ServerAddress())
119+
.averageLatency(6, TimeUnit.MILLISECONDS)
120+
.build());
121+
}
122+
123+
def 'should report state has not changed if descriptions and latencies are the same'() {
124+
expect:
125+
!stateHasChanged(ServerDescription.builder()
126+
.type(ServerType.StandAlone)
127+
.state(ServerConnectionState.Connected)
128+
.address(new ServerAddress())
129+
.averageLatency(5, TimeUnit.MILLISECONDS)
130+
.build(),
131+
ServerDescription.builder()
132+
.type(ServerType.StandAlone)
133+
.state(ServerConnectionState.Connected)
134+
.address(new ServerAddress())
135+
.averageLatency(5, TimeUnit.MILLISECONDS)
136+
.build());
137+
}
138+
}

0 commit comments

Comments
 (0)