Skip to content

Commit 9432f3c

Browse files
committed
extracts McpClient/McpServer abstraction into mcp-spi
1 parent cd74509 commit 9432f3c

28 files changed

+887
-344
lines changed

mcp-reactor/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -72,19 +72,19 @@
7272
* @author Dariusz Jędrzejczyk
7373
* @author Christian Tzolov
7474
* @author Jihoon Kim
75-
* @see McpClient
75+
* @see McpClientFactory
7676
* @see McpSchema
7777
* @see McpClientSession
7878
*/
79-
public class McpAsyncClient {
79+
public class McpAsyncClient implements McpClient {
8080

8181
private static final Logger logger = LoggerFactory.getLogger(McpAsyncClient.class);
8282

8383
private static final McpType<Void> VOID_TYPE_REFERENCE = McpType.of(Void.class);
8484

8585
protected final Sinks.One<McpSchema.InitializeResult> initializedSink = Sinks.one();
8686

87-
private AtomicBoolean initialized = new AtomicBoolean(false);
87+
private final AtomicBoolean initialized = new AtomicBoolean(false);
8888

8989
/**
9090
* The max timeout to await for the client-server connection to be initialized.
@@ -808,7 +808,7 @@ public Mono<Void> setLoggingLevel(LoggingLevel loggingLevel) {
808808
* code.
809809
* @param protocolVersions the Client supported protocol versions.
810810
*/
811-
void setProtocolVersions(List<String> protocolVersions) {
811+
public void setProtocolVersions(List<String> protocolVersions) {
812812
this.protocolVersions = protocolVersions;
813813

814814
}

mcp-reactor/src/main/java/io/modelcontextprotocol/client/McpClient.java renamed to mcp-reactor/src/main/java/io/modelcontextprotocol/client/McpClientFactory.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@
9999
* @see McpSyncClient
100100
* @see McpTransport
101101
*/
102-
public interface McpClient {
102+
public interface McpClientFactory {
103103

104104
/**
105105
* Start building a synchronous MCP client with the specified transport layer. The

mcp-reactor/src/main/java/io/modelcontextprotocol/client/McpClientFeatures.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242
* operations in non-blocking contexts by scheduling them on a bounded elastic scheduler.
4343
*
4444
* @author Dariusz Jędrzejczyk
45-
* @see McpClient
45+
* @see McpClientFactory
4646
* @see McpSchema.Implementation
4747
* @see McpSchema.ClientCapabilities
4848
*/

mcp-reactor/src/main/java/io/modelcontextprotocol/client/McpSyncClient.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@
4747
* @author Dariusz Jędrzejczyk
4848
* @author Christian Tzolov
4949
* @author Jihoon Kim
50-
* @see McpClient
50+
* @see McpClientFactory
5151
* @see McpAsyncClient
5252
* @see McpSchema
5353
*/

mcp-reactor/src/main/java/io/modelcontextprotocol/server/McpAsyncServer.java

+30-35
Original file line numberDiff line numberDiff line change
@@ -77,11 +77,11 @@
7777
* @author Christian Tzolov
7878
* @author Dariusz Jędrzejczyk
7979
* @author Jihoon Kim
80-
* @see McpServer
80+
* @see McpServerFactory
8181
* @see McpSchema
8282
* @see McpClientSession
8383
*/
84-
public class McpAsyncServer {
84+
public class McpAsyncServer implements McpServer {
8585

8686
private static final Logger logger = LoggerFactory.getLogger(McpAsyncServer.class);
8787

@@ -144,7 +144,7 @@ public void close() {
144144
* @param toolSpecification The tool specification to add
145145
* @return Mono that completes when clients have been notified of the change
146146
*/
147-
public Mono<Void> addTool(McpServerFeatures.AsyncToolSpecification toolSpecification) {
147+
public Mono<Void> addTool(McpServer.AsyncToolSpecification toolSpecification) {
148148
return this.delegate.addTool(toolSpecification);
149149
}
150150

@@ -173,7 +173,7 @@ public Mono<Void> notifyToolsListChanged() {
173173
* @param resourceHandler The resource handler to add
174174
* @return Mono that completes when clients have been notified of the change
175175
*/
176-
public Mono<Void> addResource(McpServerFeatures.AsyncResourceSpecification resourceHandler) {
176+
public Mono<Void> addResource(McpServer.AsyncResourceSpecification resourceHandler) {
177177
return this.delegate.addResource(resourceHandler);
178178
}
179179

@@ -202,7 +202,7 @@ public Mono<Void> notifyResourcesListChanged() {
202202
* @param promptSpecification The prompt handler to add
203203
* @return Mono that completes when clients have been notified of the change
204204
*/
205-
public Mono<Void> addPrompt(McpServerFeatures.AsyncPromptSpecification promptSpecification) {
205+
public Mono<Void> addPrompt(McpServer.AsyncPromptSpecification promptSpecification) {
206206
return this.delegate.addPrompt(promptSpecification);
207207
}
208208

@@ -251,7 +251,7 @@ public Mono<Void> loggingNotification(LoggingMessageNotification loggingMessageN
251251
* code.
252252
* @param protocolVersions the Client supported protocol versions.
253253
*/
254-
void setProtocolVersions(List<String> protocolVersions) {
254+
public void setProtocolVersions(List<String> protocolVersions) {
255255
this.delegate.setProtocolVersions(protocolVersions);
256256
}
257257

@@ -267,13 +267,13 @@ private static class AsyncServerImpl extends McpAsyncServer {
267267

268268
private final String instructions;
269269

270-
private final CopyOnWriteArrayList<McpServerFeatures.AsyncToolSpecification> tools = new CopyOnWriteArrayList<>();
270+
private final CopyOnWriteArrayList<McpServer.AsyncToolSpecification> tools = new CopyOnWriteArrayList<>();
271271

272272
private final CopyOnWriteArrayList<McpSchema.ResourceTemplate> resourceTemplates = new CopyOnWriteArrayList<>();
273273

274-
private final ConcurrentHashMap<String, McpServerFeatures.AsyncResourceSpecification> resources = new ConcurrentHashMap<>();
274+
private final ConcurrentHashMap<String, McpServer.AsyncResourceSpecification> resources = new ConcurrentHashMap<>();
275275

276-
private final ConcurrentHashMap<String, McpServerFeatures.AsyncPromptSpecification> prompts = new ConcurrentHashMap<>();
276+
private final ConcurrentHashMap<String, McpServer.AsyncPromptSpecification> prompts = new ConcurrentHashMap<>();
277277

278278
// FIXME: this field is deprecated and should be remvoed together with the
279279
// broadcasting loggingNotification.
@@ -283,7 +283,7 @@ private static class AsyncServerImpl extends McpAsyncServer {
283283

284284
private List<String> protocolVersions = List.of(McpSchema.LATEST_PROTOCOL_VERSION);
285285

286-
private McpUriTemplateManagerFactory uriTemplateManagerFactory = new DeafaultMcpUriTemplateManagerFactory();
286+
private final McpUriTemplateManagerFactory uriTemplateManagerFactory;
287287

288288
AsyncServerImpl(McpServerTransportProvider mcpTransportProvider, ObjectMapper objectMapper,
289289
Duration requestTimeout, McpServerFeatures.Async features,
@@ -432,7 +432,7 @@ private McpServerSession.NotificationHandler asyncRootsListChangedNotificationHa
432432
// ---------------------------------------
433433

434434
@Override
435-
public Mono<Void> addTool(McpServerFeatures.AsyncToolSpecification toolSpecification) {
435+
public Mono<Void> addTool(McpServer.AsyncToolSpecification toolSpecification) {
436436
if (toolSpecification == null) {
437437
return Mono.error(new McpError("Tool specification must not be null"));
438438
}
@@ -494,7 +494,7 @@ public Mono<Void> notifyToolsListChanged() {
494494

495495
private McpServerSession.RequestHandler<McpSchema.ListToolsResult> toolsListRequestHandler() {
496496
return (exchange, params) -> {
497-
List<Tool> tools = this.tools.stream().map(McpServerFeatures.AsyncToolSpecification::tool).toList();
497+
List<Tool> tools = this.tools.stream().map(McpServer.AsyncToolSpecification::tool).toList();
498498

499499
return Mono.just(new McpSchema.ListToolsResult(tools, null));
500500
};
@@ -505,15 +505,16 @@ private McpServerSession.RequestHandler<CallToolResult> toolsCallRequestHandler(
505505
McpSchema.CallToolRequest callToolRequest = schemaCodec.decodeResult(params,
506506
McpType.of(McpSchema.CallToolRequest.class));
507507

508-
Optional<McpServerFeatures.AsyncToolSpecification> toolSpecification = this.tools.stream()
508+
Optional<McpServer.AsyncToolSpecification> toolSpecification = this.tools.stream()
509509
.filter(tr -> callToolRequest.name().equals(tr.tool().name()))
510510
.findAny();
511511

512512
if (toolSpecification.isEmpty()) {
513513
return Mono.error(new McpError("Tool not found: " + callToolRequest.name()));
514514
}
515515

516-
return toolSpecification.map(tool -> tool.call().apply(exchange, callToolRequest.arguments()))
516+
return toolSpecification
517+
.map(tool -> Mono.from(tool.call().apply(exchange, callToolRequest.arguments())))
517518
.orElse(Mono.error(new McpError("Tool not found: " + callToolRequest.name())));
518519
};
519520
}
@@ -523,7 +524,7 @@ private McpServerSession.RequestHandler<CallToolResult> toolsCallRequestHandler(
523524
// ---------------------------------------
524525

525526
@Override
526-
public Mono<Void> addResource(McpServerFeatures.AsyncResourceSpecification resourceSpecification) {
527+
public Mono<Void> addResource(McpServer.AsyncResourceSpecification resourceSpecification) {
527528
if (resourceSpecification == null || resourceSpecification.resource() == null) {
528529
return Mono.error(new McpError("Resource must not be null"));
529530
}
@@ -555,7 +556,7 @@ public Mono<Void> removeResource(String resourceUri) {
555556
}
556557

557558
return Mono.defer(() -> {
558-
McpServerFeatures.AsyncResourceSpecification removed = this.resources.remove(resourceUri);
559+
McpServer.AsyncResourceSpecification removed = this.resources.remove(resourceUri);
559560
if (removed != null) {
560561
logger.debug("Removed resource handler: {}", resourceUri);
561562
if (this.serverCapabilities.resources().listChanged()) {
@@ -577,7 +578,7 @@ private McpServerSession.RequestHandler<McpSchema.ListResourcesResult> resources
577578
return (exchange, params) -> {
578579
var resourceList = this.resources.values()
579580
.stream()
580-
.map(McpServerFeatures.AsyncResourceSpecification::resource)
581+
.map(McpServer.AsyncResourceSpecification::resource)
581582
.toList();
582583
return Mono.just(new McpSchema.ListResourcesResult(resourceList, null));
583584
};
@@ -613,15 +614,15 @@ private McpServerSession.RequestHandler<McpSchema.ReadResourceResult> resourcesR
613614
McpType.of(McpSchema.ReadResourceRequest.class));
614615
var resourceUri = resourceRequest.uri();
615616

616-
McpServerFeatures.AsyncResourceSpecification specification = this.resources.values()
617+
McpServer.AsyncResourceSpecification specification = this.resources.values()
617618
.stream()
618619
.filter(resourceSpecification -> this.uriTemplateManagerFactory
619620
.create(resourceSpecification.resource().uri())
620621
.matches(resourceUri))
621622
.findFirst()
622623
.orElseThrow(() -> new McpError("Resource not found: " + resourceUri));
623624

624-
return specification.readHandler().apply(exchange, resourceRequest);
625+
return Mono.from(specification.readHandler().apply(exchange, resourceRequest));
625626
};
626627
}
627628

@@ -630,7 +631,7 @@ private McpServerSession.RequestHandler<McpSchema.ReadResourceResult> resourcesR
630631
// ---------------------------------------
631632

632633
@Override
633-
public Mono<Void> addPrompt(McpServerFeatures.AsyncPromptSpecification promptSpecification) {
634+
public Mono<Void> addPrompt(McpServer.AsyncPromptSpecification promptSpecification) {
634635
if (promptSpecification == null) {
635636
return Mono.error(new McpError("Prompt specification must not be null"));
636637
}
@@ -639,7 +640,7 @@ public Mono<Void> addPrompt(McpServerFeatures.AsyncPromptSpecification promptSpe
639640
}
640641

641642
return Mono.defer(() -> {
642-
McpServerFeatures.AsyncPromptSpecification specification = this.prompts
643+
McpServer.AsyncPromptSpecification specification = this.prompts
643644
.putIfAbsent(promptSpecification.prompt().name(), promptSpecification);
644645
if (specification != null) {
645646
return Mono.error(new McpError(
@@ -668,7 +669,7 @@ public Mono<Void> removePrompt(String promptName) {
668669
}
669670

670671
return Mono.defer(() -> {
671-
McpServerFeatures.AsyncPromptSpecification removed = this.prompts.remove(promptName);
672+
McpServer.AsyncPromptSpecification removed = this.prompts.remove(promptName);
672673

673674
if (removed != null) {
674675
logger.debug("Removed prompt handler: {}", promptName);
@@ -698,7 +699,7 @@ private McpServerSession.RequestHandler<McpSchema.ListPromptsResult> promptsList
698699

699700
var promptList = this.prompts.values()
700701
.stream()
701-
.map(McpServerFeatures.AsyncPromptSpecification::prompt)
702+
.map(McpServer.AsyncPromptSpecification::prompt)
702703
.toList();
703704

704705
return Mono.just(new McpSchema.ListPromptsResult(promptList, null));
@@ -711,12 +712,12 @@ private McpServerSession.RequestHandler<McpSchema.GetPromptResult> promptsGetReq
711712
McpType.of(McpSchema.GetPromptRequest.class));
712713

713714
// Implement prompt retrieval logic here
714-
McpServerFeatures.AsyncPromptSpecification specification = this.prompts.get(promptRequest.name());
715+
McpServer.AsyncPromptSpecification specification = this.prompts.get(promptRequest.name());
715716
if (specification == null) {
716717
return Mono.error(new McpError("Prompt not found: " + promptRequest.name()));
717718
}
718719

719-
return specification.promptHandler().apply(exchange, promptRequest);
720+
return Mono.from(specification.promptHandler().apply(exchange, promptRequest));
720721
};
721722
}
722723

@@ -775,25 +776,19 @@ private McpServerSession.RequestHandler<McpSchema.CompleteResult> completionComp
775776

776777
// check if the referenced resource exists
777778
if (type.equals("ref/prompt") && request.ref() instanceof McpSchema.PromptReference promptReference) {
778-
McpServerFeatures.AsyncPromptSpecification promptSpec = this.prompts.get(promptReference.name());
779+
McpServer.AsyncPromptSpecification promptSpec = this.prompts.get(promptReference.name());
779780
if (promptSpec == null) {
780781
return Mono.error(new McpError("Prompt not found: " + promptReference.name()));
781782
}
782-
if (!promptSpec.prompt()
783-
.arguments()
784-
.stream()
785-
.filter(arg -> arg.name().equals(argumentName))
786-
.findFirst()
787-
.isPresent()) {
783+
if (promptSpec.prompt().arguments().stream().noneMatch(arg -> arg.name().equals(argumentName))) {
788784

789785
return Mono.error(new McpError("Argument not found: " + argumentName));
790786
}
791787
}
792788

793789
if (type.equals("ref/resource")
794790
&& request.ref() instanceof McpSchema.ResourceReference resourceReference) {
795-
McpServerFeatures.AsyncResourceSpecification resourceSpec = this.resources
796-
.get(resourceReference.uri());
791+
McpServer.AsyncResourceSpecification resourceSpec = this.resources.get(resourceReference.uri());
797792
if (resourceSpec == null) {
798793
return Mono.error(new McpError("Resource not found: " + resourceReference.uri()));
799794
}
@@ -855,7 +850,7 @@ private McpSchema.CompleteRequest parseCompletionParams(Object object) {
855850
// ---------------------------------------
856851

857852
@Override
858-
void setProtocolVersions(List<String> protocolVersions) {
853+
public void setProtocolVersions(List<String> protocolVersions) {
859854
this.protocolVersions = protocolVersions;
860855
}
861856

mcp-reactor/src/main/java/io/modelcontextprotocol/server/McpAsyncServerExchange.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
* @author Dariusz Jędrzejczyk
2121
* @author Christian Tzolov
2222
*/
23-
public class McpAsyncServerExchange {
23+
public class McpAsyncServerExchange implements McpServerExchange {
2424

2525
private final McpServerSession session;
2626

@@ -136,7 +136,7 @@ public Mono<Void> loggingNotification(LoggingMessageNotification loggingMessageN
136136
* filtered out.
137137
* @param minLoggingLevel The minimum logging level
138138
*/
139-
void setMinLoggingLevel(LoggingLevel minLoggingLevel) {
139+
public void setMinLoggingLevel(LoggingLevel minLoggingLevel) {
140140
Assert.notNull(minLoggingLevel, "minLoggingLevel must not be null");
141141
this.minLoggingLevel = minLoggingLevel;
142142
}

0 commit comments

Comments
 (0)