Skip to content

Add tracing support using Micrometer #1695

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions driver-core/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ dependencies {

optionalImplementation(libs.snappy.java)
optionalImplementation(libs.zstd.jni)
optionalImplementation(libs.micrometer)

testImplementation(project(path = ":bson", configuration = "testArtifacts"))
testImplementation(libs.reflections)
Expand Down
31 changes: 31 additions & 0 deletions driver-core/src/main/com/mongodb/MongoClientSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,11 @@
import com.mongodb.connection.SslSettings;
import com.mongodb.connection.TransportSettings;
import com.mongodb.event.CommandListener;
import com.mongodb.internal.tracing.TracingManager;
import com.mongodb.lang.Nullable;
import com.mongodb.spi.dns.DnsClient;
import com.mongodb.spi.dns.InetAddressResolver;
import com.mongodb.tracing.Tracer;
import org.bson.UuidRepresentation;
import org.bson.codecs.BsonCodecProvider;
import org.bson.codecs.BsonValueCodecProvider;
Expand Down Expand Up @@ -118,6 +120,7 @@ public final class MongoClientSettings {
private final InetAddressResolver inetAddressResolver;
@Nullable
private final Long timeoutMS;
private final TracingManager tracingManager;

/**
* Gets the default codec registry. It includes the following providers:
Expand Down Expand Up @@ -238,6 +241,7 @@ public static final class Builder {
private ContextProvider contextProvider;
private DnsClient dnsClient;
private InetAddressResolver inetAddressResolver;
private TracingManager tracingManager;

private Builder() {
}
Expand Down Expand Up @@ -275,6 +279,7 @@ private Builder(final MongoClientSettings settings) {
if (settings.heartbeatSocketTimeoutSetExplicitly) {
heartbeatSocketTimeoutMS = settings.heartbeatSocketSettings.getReadTimeout(MILLISECONDS);
}
tracingManager = settings.tracingManager;
}

/**
Expand Down Expand Up @@ -723,6 +728,20 @@ Builder heartbeatSocketTimeoutMS(final int heartbeatSocketTimeoutMS) {
return this;
}

/**
* Sets the tracer to use for creating Spans for operations and commands.
*
* @param tracer the tracer
* @see com.mongodb.tracing.MicrometerTracer
* @return this
* @since 5.5
*/
@Alpha(Reason.CLIENT)
public Builder tracer(final Tracer tracer) {
this.tracingManager = new TracingManager(tracer);
return this;
}

/**
* Build an instance of {@code MongoClientSettings}.
*
Expand Down Expand Up @@ -1040,6 +1059,17 @@ public ContextProvider getContextProvider() {
return contextProvider;
}

/**
* Get the tracer to create Spans for operations and commands.
*
* @return this
* @since 5.5
*/
@Alpha(Reason.CLIENT)
public TracingManager getTracingManager() {
return tracingManager;
}

@Override
public boolean equals(final Object o) {
if (this == o) {
Expand Down Expand Up @@ -1156,5 +1186,6 @@ private MongoClientSettings(final Builder builder) {
heartbeatConnectTimeoutSetExplicitly = builder.heartbeatConnectTimeoutMS != 0;
contextProvider = builder.contextProvider;
timeoutMS = builder.timeoutMS;
tracingManager = builder.tracingManager;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,10 @@ BsonDocument getCommandDocument(final ByteBufferBsonOutput bsonOutput) {
}
}

BsonDocument getCommand() {
return command;
}

/**
* Get the field name from a buffer positioned at the start of the document sequence identifier of an OP_MSG Section of type
* `PAYLOAD_TYPE_1_DOCUMENT_SEQUENCE`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@
import com.mongodb.internal.logging.StructuredLogger;
import com.mongodb.internal.session.SessionContext;
import com.mongodb.internal.time.Timeout;
import com.mongodb.internal.tracing.Span;
import com.mongodb.internal.tracing.TraceContext;
import com.mongodb.internal.tracing.TracingManager;
import com.mongodb.lang.Nullable;
import org.bson.BsonBinaryReader;
import org.bson.BsonDocument;
Expand All @@ -75,8 +78,8 @@
import static com.mongodb.assertions.Assertions.assertNull;
import static com.mongodb.assertions.Assertions.isTrue;
import static com.mongodb.assertions.Assertions.notNull;
import static com.mongodb.internal.async.AsyncRunnable.beginAsync;
import static com.mongodb.internal.TimeoutContext.createMongoTimeoutException;
import static com.mongodb.internal.async.AsyncRunnable.beginAsync;
import static com.mongodb.internal.async.ErrorHandlingResultCallback.errorHandlingCallback;
import static com.mongodb.internal.connection.Authenticator.shouldAuthenticate;
import static com.mongodb.internal.connection.CommandHelper.HELLO;
Expand Down Expand Up @@ -374,13 +377,24 @@ public boolean isClosed() {
public <T> T sendAndReceive(final CommandMessage message, final Decoder<T> decoder, final OperationContext operationContext) {
Supplier<T> sendAndReceiveInternal = () -> sendAndReceiveInternal(
message, decoder, operationContext);

Span tracingSpan = createTracingSpan(message, operationContext);

try {
return sendAndReceiveInternal.get();
} catch (MongoCommandException e) {
if (tracingSpan != null) {
tracingSpan.error(e);
}

if (reauthenticationIsTriggered(e)) {
return reauthenticateAndRetry(sendAndReceiveInternal, operationContext);
}
throw e;
} finally {
if (tracingSpan != null) {
tracingSpan.end();
}
}
}

Expand All @@ -391,6 +405,7 @@ public <T> void sendAndReceiveAsync(final CommandMessage message, final Decoder<

AsyncSupplier<T> sendAndReceiveAsyncInternal = c -> sendAndReceiveAsyncInternal(
message, decoder, operationContext, c);

beginAsync().<T>thenSupply(c -> {
sendAndReceiveAsyncInternal.getAsync(c);
}).onErrorIf(e -> reauthenticationIsTriggered(e), (t, c) -> {
Expand Down Expand Up @@ -872,6 +887,42 @@ public ByteBuf getBuffer(final int size) {
return stream.getBuffer(size);
}

@Nullable
private Span createTracingSpan(final CommandMessage message, final OperationContext operationContext) {
TracingManager tracingManager = operationContext.getTracingManager();
Span span;
if (tracingManager.isEnabled()) {
BsonDocument command = message.getCommand();
TraceContext parentContext = null;
long cursorId = -1;
if (command.containsKey("getMore")) {
cursorId = command.getInt64("getMore").longValue();
parentContext = tracingManager.getCursorParentContext(cursorId);
} else {
parentContext = tracingManager.getParentContext(operationContext.getId());
}

span = tracingManager.addSpan("Command " + command.getFirstKey(), parentContext);
span.tag("db.system", "mongodb");
span.tag("db.namespace", message.getNamespace().getFullName());
span.tag("db.query.summary", command.getFirstKey());
span.tag("db.query.opcode", String.valueOf(message.getOpCode()));
span.tag("db.query.text", command.toString());
if (cursorId != -1) {
span.tag("db.mongodb.cursor_id", String.valueOf(cursorId));
}
span.tag("server.address", serverId.getAddress().getHost());
span.tag("server.port", String.valueOf(serverId.getAddress().getPort()));
span.tag("server.type", message.getSettings().getServerType().name());

span.tag("db.mongodb.server_connection_id", this.description.getConnectionId().toString());
} else {
span = null;
}

return span;
}

private class MessageHeaderCallback implements SingleResultCallback<ByteBuf> {
private final OperationContext operationContext;
private final SingleResultCallback<ResponseBuffers> callback;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.mongodb.internal.TimeoutSettings;
import com.mongodb.internal.VisibleForTesting;
import com.mongodb.internal.session.SessionContext;
import com.mongodb.internal.tracing.TracingManager;
import com.mongodb.lang.Nullable;
import com.mongodb.selector.ServerSelector;

Expand All @@ -49,10 +50,11 @@ public class OperationContext {
private final TimeoutContext timeoutContext;
@Nullable
private final ServerApi serverApi;
private final TracingManager tracingManager;

public OperationContext(final RequestContext requestContext, final SessionContext sessionContext, final TimeoutContext timeoutContext,
@Nullable final ServerApi serverApi) {
this(NEXT_ID.incrementAndGet(), requestContext, sessionContext, timeoutContext, new ServerDeprioritization(), serverApi);
@Nullable final ServerApi serverApi, final TracingManager tracingManager) {
this(NEXT_ID.incrementAndGet(), requestContext, sessionContext, timeoutContext, new ServerDeprioritization(), serverApi, tracingManager);
}

public static OperationContext simpleOperationContext(
Expand All @@ -61,29 +63,35 @@ public static OperationContext simpleOperationContext(
IgnorableRequestContext.INSTANCE,
NoOpSessionContext.INSTANCE,
new TimeoutContext(timeoutSettings),
serverApi);
serverApi,
TracingManager.NO_OP);
}

public static OperationContext simpleOperationContext(final TimeoutContext timeoutContext) {
return new OperationContext(
IgnorableRequestContext.INSTANCE,
NoOpSessionContext.INSTANCE,
timeoutContext,
null);
null,
TracingManager.NO_OP);
}

public OperationContext withSessionContext(final SessionContext sessionContext) {
return new OperationContext(id, requestContext, sessionContext, timeoutContext, serverDeprioritization, serverApi);
return new OperationContext(id, requestContext, sessionContext, timeoutContext, serverDeprioritization, serverApi, tracingManager);
}

public OperationContext withTimeoutContext(final TimeoutContext timeoutContext) {
return new OperationContext(id, requestContext, sessionContext, timeoutContext, serverDeprioritization, serverApi);
return new OperationContext(id, requestContext, sessionContext, timeoutContext, serverDeprioritization, serverApi, tracingManager);
}

public long getId() {
return id;
}

public TracingManager getTracingManager() {
return tracingManager;
}

public SessionContext getSessionContext() {
return sessionContext;
}
Expand All @@ -107,27 +115,31 @@ public OperationContext(final long id,
final SessionContext sessionContext,
final TimeoutContext timeoutContext,
final ServerDeprioritization serverDeprioritization,
@Nullable final ServerApi serverApi) {
@Nullable final ServerApi serverApi,
final TracingManager tracingManager) {
this.id = id;
this.serverDeprioritization = serverDeprioritization;
this.requestContext = requestContext;
this.sessionContext = sessionContext;
this.timeoutContext = timeoutContext;
this.serverApi = serverApi;
this.tracingManager = tracingManager;
}

@VisibleForTesting(otherwise = VisibleForTesting.AccessModifier.PRIVATE)
public OperationContext(final long id,
final RequestContext requestContext,
final SessionContext sessionContext,
final TimeoutContext timeoutContext,
@Nullable final ServerApi serverApi) {
@Nullable final ServerApi serverApi,
final TracingManager tracingManager) {
this.id = id;
this.serverDeprioritization = new ServerDeprioritization();
this.requestContext = requestContext;
this.sessionContext = sessionContext;
this.timeoutContext = timeoutContext;
this.serverApi = serverApi;
this.tracingManager = tracingManager;
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ class CommandBatchCursor<T> implements AggregateResponseBatchCursor<T> {
@Nullable
private List<T> nextBatch;
private boolean resetTimeoutWhenClosing;
private final long cursorId;

CommandBatchCursor(
final TimeoutMode timeoutMode,
Expand All @@ -95,10 +96,13 @@ class CommandBatchCursor<T> implements AggregateResponseBatchCursor<T> {
operationContext = connectionSource.getOperationContext();
this.timeoutMode = timeoutMode;

ServerCursor serverCursor = commandCursorResult.getServerCursor();
this.cursorId = serverCursor != null ? serverCursor.getId() : -1;

operationContext.getTimeoutContext().setMaxTimeOverride(maxTimeMS);

Connection connectionToPin = connectionSource.getServerDescription().getType() == ServerType.LOAD_BALANCER ? connection : null;
resourceManager = new ResourceManager(namespace, connectionSource, connectionToPin, commandCursorResult.getServerCursor());
resourceManager = new ResourceManager(namespace, connectionSource, connectionToPin, serverCursor);
resetTimeoutWhenClosing = true;
}

Expand Down Expand Up @@ -169,6 +173,7 @@ public void remove() {

@Override
public void close() {
operationContext.getTracingManager().removeCursorParentContext(cursorId);
resourceManager.close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import com.mongodb.internal.binding.AsyncReadBinding;
import com.mongodb.internal.binding.ReadBinding;
import com.mongodb.internal.connection.OperationContext;
import com.mongodb.internal.tracing.Span;
import com.mongodb.internal.tracing.TracingManager;
import com.mongodb.lang.Nullable;
import org.bson.BsonBoolean;
import org.bson.BsonDocument;
Expand Down Expand Up @@ -290,21 +292,35 @@ public BatchCursor<T> execute(final ReadBinding binding) {
if (invalidTimeoutModeException != null) {
throw invalidTimeoutModeException;
}
OperationContext operationContext = binding.getOperationContext();

RetryState retryState = initialRetryState(retryReads, binding.getOperationContext().getTimeoutContext());
Supplier<BatchCursor<T>> read = decorateReadWithRetries(retryState, binding.getOperationContext(), () ->
// Adds a Tracing Span for 'find' operation
TracingManager tracingManager = operationContext.getTracingManager();
Span tracingSpan = tracingManager.addSpan("find", operationContext.getId());

RetryState retryState = initialRetryState(retryReads, operationContext.getTimeoutContext());
Supplier<BatchCursor<T>> read = decorateReadWithRetries(retryState, operationContext, () ->
withSourceAndConnection(binding::getReadConnectionSource, false, (source, connection) -> {
retryState.breakAndThrowIfRetryAnd(() -> !canRetryRead(source.getServerDescription(), binding.getOperationContext()));
retryState.breakAndThrowIfRetryAnd(() -> !canRetryRead(source.getServerDescription(), operationContext));
try {
return createReadCommandAndExecute(retryState, binding.getOperationContext(), source, namespace.getDatabaseName(),
return createReadCommandAndExecute(retryState, operationContext, source, namespace.getDatabaseName(),
getCommandCreator(), CommandResultDocumentCodec.create(decoder, FIRST_BATCH),
transformer(), connection);
} catch (MongoCommandException e) {
throw new MongoQueryException(e.getResponse(), e.getServerAddress());
}
})
);
return read.get();
try {
return read.get();
} catch (MongoQueryException e) {
tracingSpan.error(e);
throw e;
} finally {
tracingSpan.end();
// Clean up the tracing span after the operation is complete
tracingManager.cleanContexts(operationContext.getId());
}
}

@Override
Expand Down Expand Up @@ -469,9 +485,17 @@ private TimeoutMode getTimeoutMode() {
}

private CommandReadTransformer<BsonDocument, CommandBatchCursor<T>> transformer() {
return (result, source, connection) ->
new CommandBatchCursor<>(getTimeoutMode(), result, batchSize, getMaxTimeForCursor(source.getOperationContext()), decoder,
comment, source, connection);
return (result, source, connection) -> {
OperationContext operationContext = source.getOperationContext();

// register cursor id with the operation context, so 'getMore' commands can be folded under the 'find' operation
long cursorId = result.getDocument("cursor").getInt64("id").longValue();
TracingManager tracingManager = operationContext.getTracingManager();
tracingManager.addCursorParentContext(cursorId, operationContext.getId());

return new CommandBatchCursor<>(getTimeoutMode(), result, batchSize, getMaxTimeForCursor(operationContext), decoder,
comment, source, connection);
};
}

private CommandReadTransformerAsync<BsonDocument, AsyncBatchCursor<T>> asyncTransformer() {
Expand Down
Loading