From 323318c3b1acc7df9e4efc34ef685d580ba7b458 Mon Sep 17 00:00:00 2001 From: Metis-Adrastea <> Date: Sat, 5 Apr 2025 14:15:39 +0300 Subject: [PATCH] reconnect issue --- .../kotlin/connection/KeepAliveHandler.kt | 8 +- rsocket-test/build.gradle.kts | 6 ++ .../kotlin/io/rsocket/kotlin/test/Client.kt | 88 +++++++++++++++++++ .../kotlin/io/rsocket/kotlin/test/Server.kt | 69 +++++++++++++++ .../src/jvmMain/resources/logback.xml | 14 +++ 5 files changed, 183 insertions(+), 2 deletions(-) create mode 100644 rsocket-test/src/jvmMain/kotlin/io/rsocket/kotlin/test/Client.kt create mode 100644 rsocket-test/src/jvmMain/kotlin/io/rsocket/kotlin/test/Server.kt create mode 100644 rsocket-test/src/jvmMain/resources/logback.xml diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/connection/KeepAliveHandler.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/connection/KeepAliveHandler.kt index 13319e83..a3e8ada7 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/connection/KeepAliveHandler.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/connection/KeepAliveHandler.kt @@ -37,12 +37,16 @@ internal class KeepAliveHandler( init { // this could be moved to a function like `run` or `start` connectionScope.launch { + launch { + while (true) { + delay(keepAlive.intervalMillis.toLong()) + outbound.sendKeepAlive(true, EmptyBuffer, 0) + } + } while (true) { delay(keepAlive.intervalMillis.toLong()) if (currentDelayMillis() - lastMark.value >= keepAlive.maxLifetimeMillis) throw RSocketError.ConnectionError("No keep-alive for ${keepAlive.maxLifetimeMillis} ms") - - outbound.sendKeepAlive(true, EmptyBuffer, 0) } } } diff --git a/rsocket-test/build.gradle.kts b/rsocket-test/build.gradle.kts index caf72fc8..63ea6610 100644 --- a/rsocket-test/build.gradle.kts +++ b/rsocket-test/build.gradle.kts @@ -52,6 +52,12 @@ kotlin { } jvmMain.dependencies { api(kotlin("test-junit")) + implementation("org.jetbrains.kotlinx:kotlinx-serialization-json:1.8.0") + runtimeOnly("ch.qos.logback:logback-classic:1.3.15") + implementation(projects.rsocketTransportKtorWebsocketClient) + implementation(projects.rsocketTransportKtorWebsocketServer) + implementation("io.ktor:ktor-client-cio:3.1.2") + implementation("io.ktor:ktor-server-cio:3.1.2") } jsMain.dependencies { api(kotlin("test-js")) diff --git a/rsocket-test/src/jvmMain/kotlin/io/rsocket/kotlin/test/Client.kt b/rsocket-test/src/jvmMain/kotlin/io/rsocket/kotlin/test/Client.kt new file mode 100644 index 00000000..4f72243d --- /dev/null +++ b/rsocket-test/src/jvmMain/kotlin/io/rsocket/kotlin/test/Client.kt @@ -0,0 +1,88 @@ +/* + * Copyright 2015-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.kotlin.test + +import io.ktor.client.request.* +import io.rsocket.kotlin.* +import io.rsocket.kotlin.core.* +import io.rsocket.kotlin.keepalive.* +import io.rsocket.kotlin.logging.* +import io.rsocket.kotlin.payload.* +import io.rsocket.kotlin.transport.ktor.websocket.client.* +import kotlinx.coroutines.* +import org.slf4j.event.* +import kotlin.coroutines.* +import kotlin.time.Duration.Companion.seconds + +lateinit var rsocket: RSocket + +val Slf4jLoggerFactory = object : LoggerFactory { + private fun LoggingLevel.sl4jLevel() = when (this) { + LoggingLevel.TRACE -> Level.TRACE + LoggingLevel.DEBUG -> Level.DEBUG + LoggingLevel.INFO -> Level.INFO + LoggingLevel.WARN -> Level.WARN + LoggingLevel.ERROR -> Level.ERROR + } + + override fun logger(tag: String): Logger = org.slf4j.LoggerFactory.getLogger(tag).let { logger -> + return object : Logger { + override val tag: String + get() = logger.name + + override fun isLoggable(level: LoggingLevel) = logger.isEnabledForLevel(level.sl4jLevel()) + + override fun rawLog( + level: LoggingLevel, + throwable: Throwable?, + message: Any?, + ) { + logger.atLevel(level.sl4jLevel()).setCause(throwable).log(message?.toString()) + } + } + } +} + +suspend fun main() { + rsocket = RSocketConnector { + connectionConfig { + payloadMimeType = PayloadMimeType( + WellKnownMimeType.ApplicationJson, + WellKnownMimeType.MessageRSocketRouting + ) + keepAlive = KeepAlive(5.seconds, 10.seconds) + } + loggerFactory = Slf4jLoggerFactory + reconnectable { cause, attempt -> + println("$attempt ${cause.message}") + delay(5.seconds) + true + } + }.connect(KtorWebSocketClientTransport(coroutineContext).target { + this.host = "localhost" + this.port = 7777 + }) + while (true) { + delay(5.seconds) + try { + //println(rsocket.requestResponse(payload("find-rates", Unit)).data.readText()) + //println(requestResponse("find-rates", Unit)) + } catch (e: Exception) { + e.printStackTrace() + } + } +} \ No newline at end of file diff --git a/rsocket-test/src/jvmMain/kotlin/io/rsocket/kotlin/test/Server.kt b/rsocket-test/src/jvmMain/kotlin/io/rsocket/kotlin/test/Server.kt new file mode 100644 index 00000000..77dff1e8 --- /dev/null +++ b/rsocket-test/src/jvmMain/kotlin/io/rsocket/kotlin/test/Server.kt @@ -0,0 +1,69 @@ +/* + * Copyright 2015-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +@file:OptIn(ExperimentalMetadataApi::class) + +package io.rsocket.kotlin.test + +import io.ktor.server.cio.* +import io.ktor.utils.io.core.* +import io.rsocket.kotlin.* +import io.rsocket.kotlin.core.* +import io.rsocket.kotlin.metadata.* +import io.rsocket.kotlin.payload.* +import io.rsocket.kotlin.transport.ktor.websocket.server.* +import kotlinx.coroutines.* +import kotlinx.io.Buffer +import kotlinx.serialization.json.* +import kotlin.coroutines.* + +val json = Json { + encodeDefaults = true +} + +fun Payload.route(): String = metadata + ?.read(RoutingMetadata) + ?.tags + ?.first() + ?: error("No route provided") + +inline fun payload(route: String, t: T): Payload = buildPayload { + metadata(RoutingMetadata(route).toBuffer()) + if (t == Unit) data(Buffer()) else data(json.encodeToString(t)) +} + +inline fun Payload.data(): T = json.decodeFromString(data.readText()) + +inline fun payload(t: T) = + if (t == Unit) Payload.Empty else buildPayload { data(json.encodeToString(t)) } + +suspend fun main() { + RSocketServer { + loggerFactory = Slf4jLoggerFactory + } + .startServer( + KtorWebSocketServerTransport(coroutineContext) { httpEngine(CIO) }.target { port = 7777 } + ) { + RSocketRequestHandler { + requestResponse { payload -> + when (val route = payload.route()) { + "test" -> payload("test") + else -> error("Wrong route: $route") + } + } + } + }.coroutineContext.job.join() +} \ No newline at end of file diff --git a/rsocket-test/src/jvmMain/resources/logback.xml b/rsocket-test/src/jvmMain/resources/logback.xml new file mode 100644 index 00000000..13e87757 --- /dev/null +++ b/rsocket-test/src/jvmMain/resources/logback.xml @@ -0,0 +1,14 @@ + + + + + %d %-5level [%thread] %logger{36} - %msg%n + + + + + + + + + \ No newline at end of file