From f1f0423f63cc9f39fa74fc9b68bb680f1d1abd31 Mon Sep 17 00:00:00 2001 From: Robert Jaros Date: Sun, 31 Mar 2019 00:06:25 +0100 Subject: Websockets implementation --- .../pl/treksoft/kvision/remote/KVServiceManager.kt | 11 ++ .../pl/treksoft/kvision/remote/KVRemoteAgent.kt | 180 +++++++++++++++++++++ .../pl/treksoft/kvision/remote/KVServiceManager.kt | 15 ++ .../kotlin/pl/treksoft/kvision/remote/Socket.kt | 171 ++++++++++++++++++++ .../kotlin/pl/treksoft/kvision/remote/Utils.kt | 12 ++ .../pl/treksoft/kvision/remote/KVServiceManager.kt | 15 ++ kvision-modules/kvision-server-ktor/build.gradle | 1 + .../kotlin/pl/treksoft/kvision/remote/KVModules.kt | 55 +++++++ .../pl/treksoft/kvision/remote/KVServiceManager.kt | 80 ++++++++- .../pl/treksoft/kvision/remote/KVServiceManager.kt | 14 ++ 10 files changed, 547 insertions(+), 7 deletions(-) create mode 100644 kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/Socket.kt (limited to 'kvision-modules') diff --git a/kvision-modules/kvision-common-remote/src/main/kotlin/pl/treksoft/kvision/remote/KVServiceManager.kt b/kvision-modules/kvision-common-remote/src/main/kotlin/pl/treksoft/kvision/remote/KVServiceManager.kt index 514fb07d..e89744f6 100644 --- a/kvision-modules/kvision-common-remote/src/main/kotlin/pl/treksoft/kvision/remote/KVServiceManager.kt +++ b/kvision-modules/kvision-common-remote/src/main/kotlin/pl/treksoft/kvision/remote/KVServiceManager.kt @@ -21,6 +21,8 @@ */ package pl.treksoft.kvision.remote +import kotlinx.coroutines.channels.ReceiveChannel +import kotlinx.coroutines.channels.SendChannel import kotlin.reflect.KClass enum class HttpMethod { @@ -115,4 +117,13 @@ expect open class KVServiceManager(serviceClass: KClass) { protected fun bind( function: T.(String?, String?) -> List ) + + /** + * Binds a given function of the receiver as a web socket connection + * @param function a function of the receiver + */ + protected inline fun bind( + noinline function: suspend T.(ReceiveChannel, SendChannel) -> Unit, + route: String? = null + ) } diff --git a/kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/KVRemoteAgent.kt b/kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/KVRemoteAgent.kt index 6054592e..3e538bd1 100644 --- a/kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/KVRemoteAgent.kt +++ b/kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/KVRemoteAgent.kt @@ -21,10 +21,18 @@ */ package pl.treksoft.kvision.remote +import kotlinx.coroutines.Job import kotlinx.coroutines.asDeferred +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.channels.ReceiveChannel +import kotlinx.coroutines.channels.SendChannel +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.launch import kotlinx.serialization.ImplicitReflectionSerializer import kotlinx.serialization.list import kotlinx.serialization.serializer +import kotlinx.serialization.stringify +import kotlin.js.console import kotlin.js.js import kotlin.reflect.KClass import kotlin.js.JSON as NativeJSON @@ -366,6 +374,178 @@ open class KVRemoteAgent(val serviceManager: KVServiceManager) : Rem }.asDeferred().await() } + /** + * Executes defined web socket connection + */ + suspend inline fun webSocket( + noinline function: suspend T.(ReceiveChannel, SendChannel) -> Unit, + noinline handler: suspend (SendChannel, ReceiveChannel) -> Unit + ) { + val (url, _) = + serviceManager.getCalls()[function.toString().replace("\\s".toRegex(), "")] + ?: throw IllegalStateException("Function not specified!") + val socket = Socket() + val requestChannel = Channel() + val responseChannel = Channel() + try { + coroutineScope { + socket.connect(getWebSocketUrl(url)) + lateinit var responseJob: Job + lateinit var handlerJob: Job + val requestJob = launch { + for (par1 in requestChannel) { + val param = serialize(par1) + val str = JSON.plain.stringify(JsonRpcRequest(0, url, listOf(param))) + if (!socket.sendOrFalse(str)) break + } + responseJob.cancel() + handlerJob.cancel() + if (!requestChannel.isClosedForReceive) requestChannel.close() + if (!responseChannel.isClosedForSend) responseChannel.close() + } + responseJob = launch { + while (true) { + val str = socket.receiveOrNull() ?: break + val data = kotlin.js.JSON.parse(str).result + val par2 = try { + @Suppress("UNCHECKED_CAST") + deserialize(data, PAR2::class.js.name) + } catch (t: NotStandardTypeException) { + try { + @Suppress("UNCHECKED_CAST") + tryDeserializeEnum(PAR2::class as KClass, data) as PAR2 + } catch (t: NotEnumTypeException) { + JSON.nonstrict.parse(PAR2::class.serializer(), data) + } + } + responseChannel.send(par2) + } + requestJob.cancel() + handlerJob.cancel() + if (!requestChannel.isClosedForReceive) requestChannel.close() + if (!responseChannel.isClosedForSend) responseChannel.close() + } + handlerJob = launch { + exceptionHelper { + handler(requestChannel, responseChannel) + } + requestJob.cancel() + responseJob.cancel() + if (!requestChannel.isClosedForReceive) requestChannel.close() + if (!responseChannel.isClosedForSend) responseChannel.close() + } + } + } catch (e: Exception) { + console.log(e) + } + if (!requestChannel.isClosedForReceive) requestChannel.close() + if (!responseChannel.isClosedForSend) responseChannel.close() + socket.close() + } + + /** + * Executes defined web socket connection returning list objects + */ + suspend inline fun webSocket( + noinline function: suspend T.(ReceiveChannel, SendChannel>) -> Unit, + noinline handler: suspend (SendChannel, ReceiveChannel>) -> Unit + ) { + val (url, _) = + serviceManager.getCalls()[function.toString().replace("\\s".toRegex(), "")] + ?: throw IllegalStateException("Function not specified!") + val socket = Socket() + val requestChannel = Channel() + val responseChannel = Channel>() + try { + coroutineScope { + socket.connect(getWebSocketUrl(url)) + lateinit var responseJob: Job + lateinit var handlerJob: Job + val requestJob = launch { + for (par1 in requestChannel) { + val param = serialize(par1) + val str = JSON.plain.stringify(JsonRpcRequest(0, url, listOf(param))) + if (!socket.sendOrFalse(str)) break + } + responseJob.cancel() + handlerJob.cancel() + if (!requestChannel.isClosedForReceive) requestChannel.close() + if (!responseChannel.isClosedForSend) responseChannel.close() + } + responseJob = launch { + while (true) { + val str = socket.receiveOrNull() ?: break + val data = kotlin.js.JSON.parse(str).result + val par2 = try { + deserializeList(data, PAR2::class.js.name) + } catch (t: NotStandardTypeException) { + try { + @Suppress("UNCHECKED_CAST") + tryDeserializeEnumList(PAR2::class as KClass, data) as List + } catch (t: NotEnumTypeException) { + JSON.nonstrict.parse(PAR2::class.serializer().list, data) + } + } + responseChannel.send(par2) + } + requestJob.cancel() + handlerJob.cancel() + if (!requestChannel.isClosedForReceive) requestChannel.close() + if (!responseChannel.isClosedForSend) responseChannel.close() + } + handlerJob = launch { + exceptionHelper { + handler(requestChannel, responseChannel) + } + requestJob.cancel() + responseJob.cancel() + if (!requestChannel.isClosedForReceive) requestChannel.close() + if (!responseChannel.isClosedForSend) responseChannel.close() + } + } + } catch (e: Exception) { + console.log(e) + } + if (!requestChannel.isClosedForReceive) requestChannel.close() + if (!responseChannel.isClosedForSend) responseChannel.close() + socket.close() + } + + /** + * @suppress internal function + */ + suspend fun Socket.receiveOrNull(): String? { + return try { + this.receive() + } catch (e: SocketClosedException) { + console.log("Socket was closed: ${e.reason}") + null + } + } + + /** + * @suppress internal function + */ + fun Socket.sendOrFalse(str: String): Boolean { + return try { + this.send(str) + true + } catch (e: SocketClosedException) { + console.log("Socket was closed: ${e.reason}") + false + } + } + + /** + * @suppress internal function + */ + suspend fun exceptionHelper(block: suspend () -> Unit) { + try { + block() + } catch (e: Exception) { + console.log(e) + } + } /** * @suppress diff --git a/kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/KVServiceManager.kt b/kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/KVServiceManager.kt index 4ca6a4a5..0cf72f76 100644 --- a/kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/KVServiceManager.kt +++ b/kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/KVServiceManager.kt @@ -21,6 +21,8 @@ */ package pl.treksoft.kvision.remote +import kotlinx.coroutines.channels.ReceiveChannel +import kotlinx.coroutines.channels.SendChannel import kotlin.reflect.KClass /** @@ -137,6 +139,19 @@ actual open class KVServiceManager actual constructor(serviceClass: KCl calls[function.toString().replace("\\s".toRegex(), "")] = Pair("/kv/$routeDef", HttpMethod.POST) } + /** + * Binds a given web socket connetion with a function of the receiver. + * @param function a function of the receiver + * @param route a web socket route + */ + protected actual inline fun bind( + noinline function: suspend T.(ReceiveChannel, SendChannel) -> Unit, + route: String? + ) { + val routeDef = "route${this::class.simpleName}${counter++}" + calls[function.toString().replace("\\s".toRegex(), "")] = Pair("/kv/$routeDef", HttpMethod.POST) + } + /** * Returns the map of defined paths. */ diff --git a/kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/Socket.kt b/kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/Socket.kt new file mode 100644 index 00000000..c547ce9b --- /dev/null +++ b/kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/Socket.kt @@ -0,0 +1,171 @@ +/* + * Copyright (c) 2017-present Robert Jaros + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ +package pl.treksoft.kvision.remote + +import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import org.w3c.dom.CloseEvent +import org.w3c.dom.ErrorEvent +import org.w3c.dom.MessageEvent +import org.w3c.dom.WebSocket +import org.w3c.dom.WebSocket.Companion.CLOSED +import org.w3c.dom.WebSocket.Companion.CLOSING +import org.w3c.dom.WebSocket.Companion.OPEN +import org.w3c.dom.events.Event +import kotlin.coroutines.resume +import kotlin.coroutines.suspendCoroutine + +// +// Code taken from: https://discuss.kotlinlang.org/t/js-coroutines-experiements/8245/2 +// + +/** + * Websocket closed exception class. + */ +class SocketClosedException(val reason: String) : Throwable(reason) + +/** + * A websocket client implementation. + */ +class Socket { + private var eventQueue: Channel = Channel(Channel.UNLIMITED) + private lateinit var ws: WebSocket + val state: Short + get() = ws.readyState + + private fun onWsEvent(event: Event) { + GlobalScope.launch { eventQueue.send(event) } + } + + /** + * Connect to a websocket. + */ + suspend fun connect(url: String, retryDelay: Long = 1000) { + while (true) { + val connected = suspendCoroutine { cont -> + while (eventQueue.poll() != null) {/*drain*/ + } + ws = WebSocket(url) + ws.onopen = { + ws.onclose = ::onWsEvent + ws.onerror = ::onWsEvent + cont.resume(true) + } + ws.onmessage = ::onWsEvent + ws.onerror = { + logError(it) + } + ws.onclose = { + cont.resume(false) + } + } + if (connected) + break + else + delay(retryDelay) + } + } + + /** + * Receive a string from a websocket. + */ + suspend fun receive(): String { + val event = eventQueue.receive() + return when (event) { + is MessageEvent -> { + event.data as String + } + is CloseEvent -> { + val reason = getReason(event.code) + throw SocketClosedException(reason) + } + is ErrorEvent -> { + logError(event) + close() + throw SocketClosedException(event.message) + } + else -> { + val reason = getReason(4001) + console.error(reason) + close() + throw SocketClosedException(reason) + } + } + } + + /** + * Send string to a websocket. + */ + fun send(obj: String) { + when { + isClosed() -> { + console.error(getReason(4002)) + throw SocketClosedException(getReason(4002)) + } + else -> ws.send(obj) + } + } + + /** + * Close a websocket. + */ + fun close(code: Short = 1000) { + when (state) { + OPEN -> ws.close(code, getReason(1000)) + } + } + + /** + * Returns if a websocket is closed. + */ + fun isClosed(): Boolean { + return when (state) { + CLOSED, CLOSING -> true + else -> false + } + } + + private fun logError(event: Event) = console.error("An error %o occurred when connecting to ${ws.url}", event) + + private fun getReason(code: Short): String { + return when (code.toInt()) { // See http://tools.ietf.org/html/rfc6455#section-7.4.1 + 1000 -> "Normal closure" + 1001 -> "An endpoint is \"going away\", such as a server going down or a browser having navigated away from a page." + 1002 -> "An endpoint is terminating the connection due to a protocol error" + 1003 -> "An endpoint is terminating the connection because it has received a type of data it cannot accept (e.g., an endpoint that understands only text data MAY send this if it receives a binary message)." + 1004 -> "Reserved. The specific meaning might be defined in the future." + 1005 -> "No status code was actually present." + 1006 -> "The connection was closed abnormally, e.g., without sending or receiving a Close control frame" + 1007 -> "An endpoint is terminating the connection because it has received data within a message that was not consistent with the type of the message (e.g., non-UTF-8 [http://tools.ietf.org/html/rfc3629] data within a text message)." + 1008 -> "An endpoint is terminating the connection because it has received a message that \"violates its policy\". This reason is given either if there is no other sutible reason, or if there is a need to hide specific details about the policy." + 1009 -> "An endpoint is terminating the connection because it has received a message that is too big for it to process." + 1010 -> "An endpoint (client ) is terminating the connection because it has expected the server to negotiate one or more extension, but the server didn't return them in the response message of the WebSocket handshake.
Specifically, the extensions that are needed are: " + 1011 -> "A server is terminating the connection because it encountered an unexpected condition that prevented it from fulfilling the request." + 1015 -> "The connection was closed due to a failure to perform a TLS handshake (e.g., the server certificate can't be verified)." + 4001 -> "Unexpected event" + 4002 -> "You are trying to use closed socket" + else -> "Unknown reason" + } + } +} diff --git a/kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/Utils.kt b/kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/Utils.kt index 273c2a65..c4f4ed6d 100644 --- a/kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/Utils.kt +++ b/kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/Utils.kt @@ -25,6 +25,7 @@ import kotlinx.serialization.SerializationStrategy import kotlinx.serialization.context.SimpleModule import kotlinx.serialization.json.Json import pl.treksoft.kvision.types.JsonDateSerializer +import kotlin.browser.window import kotlin.js.Date /** @@ -60,3 +61,14 @@ object JSON { return kotlin.js.JSON.parse(plain.stringify(serializer, this)) } } + +/** + * Creates a websocket URL from current window.location and given path. + */ +fun getWebSocketUrl(url: String): String { + val location = window.location + val scheme = if (location.protocol == "https:") "wss" else "ws" + val port = if (location.port == "8088") ":8080" + else if (location.port != "0" && location.port != "") ":${location.port}" else "" + return "$scheme://${location.hostname}$port$url" +} diff --git a/kvision-modules/kvision-server-jooby/src/main/kotlin/pl/treksoft/kvision/remote/KVServiceManager.kt b/kvision-modules/kvision-server-jooby/src/main/kotlin/pl/treksoft/kvision/remote/KVServiceManager.kt index d1e64ce4..cfd9d467 100644 --- a/kvision-modules/kvision-server-jooby/src/main/kotlin/pl/treksoft/kvision/remote/KVServiceManager.kt +++ b/kvision-modules/kvision-server-jooby/src/main/kotlin/pl/treksoft/kvision/remote/KVServiceManager.kt @@ -25,6 +25,8 @@ import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import com.google.inject.Injector import kotlinx.coroutines.CoroutineStart import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.channels.ReceiveChannel +import kotlinx.coroutines.channels.SendChannel import kotlinx.coroutines.launch import org.jooby.Kooby import org.jooby.Request @@ -310,6 +312,19 @@ actual open class KVServiceManager actual constructor(val serviceClass: } } + /** + * Binds a given web socket connetion with a function of the receiver. + * @param function a function of the receiver + * @param route a route + */ + protected actual inline fun bind( + noinline function: suspend T.(ReceiveChannel, SendChannel) -> Unit, + route: String? + ) { + TODO("Not implemented in Jooby module") + } + + /** * Binds a given function of the receiver as a select options source * @param function a function of the receiver diff --git a/kvision-modules/kvision-server-ktor/build.gradle b/kvision-modules/kvision-server-ktor/build.gradle index 62b5fe83..44be9171 100644 --- a/kvision-modules/kvision-server-ktor/build.gradle +++ b/kvision-modules/kvision-server-ktor/build.gradle @@ -10,6 +10,7 @@ dependencies { compile "org.jetbrains.kotlinx:kotlinx-coroutines-core:$coroutinesVersion" compile "io.ktor:ktor-server-core:$ktorVersion" compile "io.ktor:ktor-jackson:$ktorVersion" + compile "io.ktor:ktor-websockets:$ktorVersion" compile "com.google.inject:guice:$guiceVersion" compile "com.fasterxml.jackson.module:jackson-module-kotlin:$jacksonModuleKotlinVersion" testCompile "org.jetbrains.kotlin:kotlin-test:$kotlinVersion" diff --git a/kvision-modules/kvision-server-ktor/src/main/kotlin/pl/treksoft/kvision/remote/KVModules.kt b/kvision-modules/kvision-server-ktor/src/main/kotlin/pl/treksoft/kvision/remote/KVModules.kt index 956e7301..e2edc175 100644 --- a/kvision-modules/kvision-server-ktor/src/main/kotlin/pl/treksoft/kvision/remote/KVModules.kt +++ b/kvision-modules/kvision-server-ktor/src/main/kotlin/pl/treksoft/kvision/remote/KVModules.kt @@ -31,17 +31,25 @@ import io.ktor.application.ApplicationCallPipeline import io.ktor.application.call import io.ktor.application.install import io.ktor.features.ContentNegotiation +import io.ktor.http.cio.websocket.Frame import io.ktor.http.content.defaultResource import io.ktor.http.content.resources import io.ktor.http.content.static import io.ktor.jackson.jackson import io.ktor.routing.routing import io.ktor.util.AttributeKey +import io.ktor.util.KtorExperimentalAPI +import io.ktor.websocket.WebSocketServerSession +import io.ktor.websocket.WebSockets +import kotlinx.coroutines.channels.ReceiveChannel +import kotlinx.coroutines.channels.SendChannel +import kotlin.coroutines.CoroutineContext fun Application.kvisionInit(vararg modules: Module) { install(ContentNegotiation) { jackson() } + install(WebSockets) routing { static("/") { resources("assets") @@ -72,3 +80,50 @@ class MainModule(private val application: Application) : AbstractModule() { bind(Application::class.java).toInstance(application) } } + +class WsSessionModule(private val webSocketSession: WebSocketServerSession) : + AbstractModule() { + override fun configure() { + bind(WebSocketServerSession::class.java).toInstance(webSocketSession) + } +} + +class DummyWsSessionModule() : AbstractModule() { + override fun configure() { + bind(WebSocketServerSession::class.java).toInstance(DummyWebSocketServerSession()) + } +} + +class DummyWebSocketServerSession : WebSocketServerSession { + override val call: ApplicationCall + get() = throw UnsupportedOperationException() + override val coroutineContext: CoroutineContext + get() = throw UnsupportedOperationException() + override val incoming: ReceiveChannel + get() = throw UnsupportedOperationException() + override var masking: Boolean + get() = throw UnsupportedOperationException() + set(value) { + throw UnsupportedOperationException() + } + override var maxFrameSize: Long + get() = throw UnsupportedOperationException() + set(value) { + throw UnsupportedOperationException() + } + override val outgoing: SendChannel + get() = throw UnsupportedOperationException() + + @UseExperimental(KtorExperimentalAPI::class) + override suspend fun close(cause: Throwable?) { + throw UnsupportedOperationException() + } + + override suspend fun flush() { + throw UnsupportedOperationException() + } + + override fun terminate() { + throw UnsupportedOperationException() + } +} diff --git a/kvision-modules/kvision-server-ktor/src/main/kotlin/pl/treksoft/kvision/remote/KVServiceManager.kt b/kvision-modules/kvision-server-ktor/src/main/kotlin/pl/treksoft/kvision/remote/KVServiceManager.kt index d4985adf..0839a5c5 100644 --- a/kvision-modules/kvision-server-ktor/src/main/kotlin/pl/treksoft/kvision/remote/KVServiceManager.kt +++ b/kvision-modules/kvision-server-ktor/src/main/kotlin/pl/treksoft/kvision/remote/KVServiceManager.kt @@ -24,6 +24,10 @@ package pl.treksoft.kvision.remote import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import io.ktor.application.ApplicationCall import io.ktor.application.call +import io.ktor.http.cio.websocket.CloseReason +import io.ktor.http.cio.websocket.Frame +import io.ktor.http.cio.websocket.close +import io.ktor.http.cio.websocket.readText import io.ktor.request.receive import io.ktor.response.respond import io.ktor.routing.Route @@ -33,7 +37,17 @@ import io.ktor.routing.options import io.ktor.routing.post import io.ktor.routing.put import io.ktor.util.pipeline.PipelineContext +import io.ktor.websocket.WebSocketServerSession +import io.ktor.websocket.webSocket import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.channels.ReceiveChannel +import kotlinx.coroutines.channels.SendChannel +import kotlinx.coroutines.channels.filterNotNull +import kotlinx.coroutines.channels.map +import kotlinx.coroutines.channels.mapNotNull +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.launch import org.slf4j.Logger import org.slf4j.LoggerFactory import kotlin.reflect.KClass @@ -55,6 +69,8 @@ actual open class KVServiceManager actual constructor(val serviceClass: mutableMapOf() val optionsRequests: MutableMap.(Unit) -> Unit> = mutableMapOf() + val webSocketRequests: MutableMap Unit> = + mutableMapOf() val mapper = jacksonObjectMapper() var counter: Int = 0 @@ -72,7 +88,7 @@ actual open class KVServiceManager actual constructor(val serviceClass: ) { val routeDef = route ?: "route${this::class.simpleName}${counter++}" addRoute(method, "/kv/$routeDef") { - val service = call.injector.getInstance(serviceClass.java) + val service = call.injector.createChildInjector(DummyWsSessionModule()).getInstance(serviceClass.java) val jsonRpcRequest = if (method == HttpMethod.GET) { JsonRpcRequest(call.request.queryParameters["id"]?.toInt() ?: 0, "", listOf()) } else { @@ -113,7 +129,7 @@ actual open class KVServiceManager actual constructor(val serviceClass: throw UnsupportedOperationException("GET method is only supported for methods without parameters") val routeDef = route ?: "route${this::class.simpleName}${counter++}" addRoute(method, "/kv/$routeDef") { - val service = call.injector.getInstance(serviceClass.java) + val service = call.injector.createChildInjector(DummyWsSessionModule()).getInstance(serviceClass.java) val jsonRpcRequest = call.receive() if (jsonRpcRequest.params.size == 1) { val param = getParameter(jsonRpcRequest.params[0]) @@ -160,7 +176,7 @@ actual open class KVServiceManager actual constructor(val serviceClass: throw UnsupportedOperationException("GET method is only supported for methods without parameters") val routeDef = route ?: "route${this::class.simpleName}${counter++}" addRoute(method, "/kv/$routeDef") { - val service = call.injector.getInstance(serviceClass.java) + val service = call.injector.createChildInjector(DummyWsSessionModule()).getInstance(serviceClass.java) val jsonRpcRequest = call.receive() if (jsonRpcRequest.params.size == 2) { val param1 = getParameter(jsonRpcRequest.params[0]) @@ -208,7 +224,7 @@ actual open class KVServiceManager actual constructor(val serviceClass: throw UnsupportedOperationException("GET method is only supported for methods without parameters") val routeDef = route ?: "route${this::class.simpleName}${counter++}" addRoute(method, "/kv/$routeDef") { - val service = call.injector.getInstance(serviceClass.java) + val service = call.injector.createChildInjector(DummyWsSessionModule()).getInstance(serviceClass.java) val jsonRpcRequest = call.receive() @Suppress("MagicNumber") if (jsonRpcRequest.params.size == 3) { @@ -258,7 +274,7 @@ actual open class KVServiceManager actual constructor(val serviceClass: throw UnsupportedOperationException("GET method is only supported for methods without parameters") val routeDef = route ?: "route${this::class.simpleName}${counter++}" addRoute(method, "/kv/$routeDef") { - val service = call.injector.getInstance(serviceClass.java) + val service = call.injector.createChildInjector(DummyWsSessionModule()).getInstance(serviceClass.java) val jsonRpcRequest = call.receive() @Suppress("MagicNumber") if (jsonRpcRequest.params.size == 4) { @@ -310,7 +326,7 @@ actual open class KVServiceManager actual constructor(val serviceClass: throw UnsupportedOperationException("GET method is only supported for methods without parameters") val routeDef = route ?: "route${this::class.simpleName}${counter++}" addRoute(method, "/kv/$routeDef") { - val service = call.injector.getInstance(serviceClass.java) + val service = call.injector.createChildInjector(DummyWsSessionModule()).getInstance(serviceClass.java) val jsonRpcRequest = call.receive() @Suppress("MagicNumber") if (jsonRpcRequest.params.size == 5) { @@ -347,6 +363,51 @@ actual open class KVServiceManager actual constructor(val serviceClass: } } + /** + * Binds a given web socket connetion with a function of the receiver. + * @param function a function of the receiver + * @param route a route + */ + protected actual inline fun bind( + noinline function: suspend T.(ReceiveChannel, SendChannel) -> Unit, + route: String? + ) { + val routeDef = "route${this::class.simpleName}${counter++}" + webSocketRequests["/kv/$routeDef"] = { + val wsInjector = call.injector.createChildInjector(WsSessionModule(this)) + val service = wsInjector.getInstance(serviceClass.java) + val requestChannel = incoming.mapNotNull { it as? Frame.Text }.map { + val jsonRpcRequest = getParameter(it.readText()) + if (jsonRpcRequest.params.size == 1) { + getParameter(jsonRpcRequest.params[0]) + } else { + null + } + }.filterNotNull() + val responseChannel = Channel() + val session = this + coroutineScope { + launch { + for (p in responseChannel) { + val text = mapper.writeValueAsString( + JsonRpcResponse( + id = 0, + result = mapper.writeValueAsString(p) + ) + ) + outgoing.send(Frame.Text(text)) + } + session.close(CloseReason(CloseReason.Codes.NORMAL, "")) + session.close() + } + launch { + function.invoke(service, requestChannel, responseChannel) + if (!responseChannel.isClosedForReceive) responseChannel.close() + } + } + } + } + /** * Binds a given function of the receiver as a select options source * @param function a function of the receiver @@ -357,7 +418,7 @@ actual open class KVServiceManager actual constructor(val serviceClass: ) { val routeDef = "route${this::class.simpleName}${counter++}" addRoute(HttpMethod.POST, "/kv/$routeDef") { - val service = call.injector.getInstance(serviceClass.java) + val service = call.injector.createChildInjector(DummyWsSessionModule()).getInstance(serviceClass.java) val jsonRpcRequest = call.receive() if (jsonRpcRequest.params.size == 2) { val param1 = getParameter(jsonRpcRequest.params[0]) @@ -431,4 +492,9 @@ fun Route.applyRoutes(serviceManager: KVServiceManager) { serviceManager.optionsRequests.forEach { (path, handler) -> options(path, handler) } + serviceManager.webSocketRequests.forEach { (path, handler) -> + this.webSocket(path) { + handler() + } + } } diff --git a/kvision-modules/kvision-server-spring-boot/src/main/kotlin/pl/treksoft/kvision/remote/KVServiceManager.kt b/kvision-modules/kvision-server-spring-boot/src/main/kotlin/pl/treksoft/kvision/remote/KVServiceManager.kt index 66dc4b99..66429acf 100644 --- a/kvision-modules/kvision-server-spring-boot/src/main/kotlin/pl/treksoft/kvision/remote/KVServiceManager.kt +++ b/kvision-modules/kvision-server-spring-boot/src/main/kotlin/pl/treksoft/kvision/remote/KVServiceManager.kt @@ -25,6 +25,8 @@ import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import kotlinx.coroutines.CoroutineStart import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.channels.ReceiveChannel +import kotlinx.coroutines.channels.SendChannel import kotlinx.coroutines.launch import org.slf4j.Logger import org.slf4j.LoggerFactory @@ -391,6 +393,18 @@ actual open class KVServiceManager actual constructor(val serviceClass: } } + /** + * Binds a given web socket connetion with a function of the receiver. + * @param function a function of the receiver + * @param route a route + */ + protected actual inline fun bind( + noinline function: suspend T.(ReceiveChannel, SendChannel) -> Unit, + route: String? + ) { + TODO("Not implemented in Spring Boot module") + } + /** * Binds a given function of the receiver as a select options source * @param function a function of the receiver -- cgit