diff options
author | Robert Jaros <rjaros@finn.pl> | 2019-03-31 00:06:25 +0100 |
---|---|---|
committer | Robert Jaros <rjaros@finn.pl> | 2019-03-31 00:06:25 +0100 |
commit | f1f0423f63cc9f39fa74fc9b68bb680f1d1abd31 (patch) | |
tree | dc083b68749b09d7d8c0ed4bfa90262feac83624 | |
parent | 967f6278ded0d540ce3d7613bd0d39338ef14d63 (diff) | |
download | kvision-f1f0423f63cc9f39fa74fc9b68bb680f1d1abd31.tar.gz kvision-f1f0423f63cc9f39fa74fc9b68bb680f1d1abd31.tar.bz2 kvision-f1f0423f63cc9f39fa74fc9b68bb680f1d1abd31.zip |
Websockets implementation
10 files changed, 547 insertions, 7 deletions
diff --git a/kvision-modules/kvision-common-remote/src/main/kotlin/pl/treksoft/kvision/remote/KVServiceManager.kt b/kvision-modules/kvision-common-remote/src/main/kotlin/pl/treksoft/kvision/remote/KVServiceManager.kt index 514fb07d..e89744f6 100644 --- a/kvision-modules/kvision-common-remote/src/main/kotlin/pl/treksoft/kvision/remote/KVServiceManager.kt +++ b/kvision-modules/kvision-common-remote/src/main/kotlin/pl/treksoft/kvision/remote/KVServiceManager.kt @@ -21,6 +21,8 @@ */ package pl.treksoft.kvision.remote +import kotlinx.coroutines.channels.ReceiveChannel +import kotlinx.coroutines.channels.SendChannel import kotlin.reflect.KClass enum class HttpMethod { @@ -115,4 +117,13 @@ expect open class KVServiceManager<T : Any>(serviceClass: KClass<T>) { protected fun bind( function: T.(String?, String?) -> List<RemoteSelectOption> ) + + /** + * Binds a given function of the receiver as a web socket connection + * @param function a function of the receiver + */ + protected inline fun <reified PAR1 : Any, reified PAR2 : Any> bind( + noinline function: suspend T.(ReceiveChannel<PAR1>, SendChannel<PAR2>) -> Unit, + route: String? = null + ) } diff --git a/kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/KVRemoteAgent.kt b/kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/KVRemoteAgent.kt index 6054592e..3e538bd1 100644 --- a/kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/KVRemoteAgent.kt +++ b/kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/KVRemoteAgent.kt @@ -21,10 +21,18 @@ */ package pl.treksoft.kvision.remote +import kotlinx.coroutines.Job import kotlinx.coroutines.asDeferred +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.channels.ReceiveChannel +import kotlinx.coroutines.channels.SendChannel +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.launch import kotlinx.serialization.ImplicitReflectionSerializer import kotlinx.serialization.list import kotlinx.serialization.serializer +import kotlinx.serialization.stringify +import kotlin.js.console import kotlin.js.js import kotlin.reflect.KClass import kotlin.js.JSON as NativeJSON @@ -366,6 +374,178 @@ open class KVRemoteAgent<T : Any>(val serviceManager: KVServiceManager<T>) : Rem }.asDeferred().await() } + /** + * Executes defined web socket connection + */ + suspend inline fun <reified PAR1 : Any, reified PAR2 : Any> webSocket( + noinline function: suspend T.(ReceiveChannel<PAR1>, SendChannel<PAR2>) -> Unit, + noinline handler: suspend (SendChannel<PAR1>, ReceiveChannel<PAR2>) -> Unit + ) { + val (url, _) = + serviceManager.getCalls()[function.toString().replace("\\s".toRegex(), "")] + ?: throw IllegalStateException("Function not specified!") + val socket = Socket() + val requestChannel = Channel<PAR1>() + val responseChannel = Channel<PAR2>() + try { + coroutineScope { + socket.connect(getWebSocketUrl(url)) + lateinit var responseJob: Job + lateinit var handlerJob: Job + val requestJob = launch { + for (par1 in requestChannel) { + val param = serialize(par1) + val str = JSON.plain.stringify(JsonRpcRequest(0, url, listOf(param))) + if (!socket.sendOrFalse(str)) break + } + responseJob.cancel() + handlerJob.cancel() + if (!requestChannel.isClosedForReceive) requestChannel.close() + if (!responseChannel.isClosedForSend) responseChannel.close() + } + responseJob = launch { + while (true) { + val str = socket.receiveOrNull() ?: break + val data = kotlin.js.JSON.parse<dynamic>(str).result + val par2 = try { + @Suppress("UNCHECKED_CAST") + deserialize<PAR2>(data, PAR2::class.js.name) + } catch (t: NotStandardTypeException) { + try { + @Suppress("UNCHECKED_CAST") + tryDeserializeEnum(PAR2::class as KClass<Any>, data) as PAR2 + } catch (t: NotEnumTypeException) { + JSON.nonstrict.parse(PAR2::class.serializer(), data) + } + } + responseChannel.send(par2) + } + requestJob.cancel() + handlerJob.cancel() + if (!requestChannel.isClosedForReceive) requestChannel.close() + if (!responseChannel.isClosedForSend) responseChannel.close() + } + handlerJob = launch { + exceptionHelper { + handler(requestChannel, responseChannel) + } + requestJob.cancel() + responseJob.cancel() + if (!requestChannel.isClosedForReceive) requestChannel.close() + if (!responseChannel.isClosedForSend) responseChannel.close() + } + } + } catch (e: Exception) { + console.log(e) + } + if (!requestChannel.isClosedForReceive) requestChannel.close() + if (!responseChannel.isClosedForSend) responseChannel.close() + socket.close() + } + + /** + * Executes defined web socket connection returning list objects + */ + suspend inline fun <reified PAR1 : Any, reified PAR2 : Any> webSocket( + noinline function: suspend T.(ReceiveChannel<PAR1>, SendChannel<List<PAR2>>) -> Unit, + noinline handler: suspend (SendChannel<PAR1>, ReceiveChannel<List<PAR2>>) -> Unit + ) { + val (url, _) = + serviceManager.getCalls()[function.toString().replace("\\s".toRegex(), "")] + ?: throw IllegalStateException("Function not specified!") + val socket = Socket() + val requestChannel = Channel<PAR1>() + val responseChannel = Channel<List<PAR2>>() + try { + coroutineScope { + socket.connect(getWebSocketUrl(url)) + lateinit var responseJob: Job + lateinit var handlerJob: Job + val requestJob = launch { + for (par1 in requestChannel) { + val param = serialize(par1) + val str = JSON.plain.stringify(JsonRpcRequest(0, url, listOf(param))) + if (!socket.sendOrFalse(str)) break + } + responseJob.cancel() + handlerJob.cancel() + if (!requestChannel.isClosedForReceive) requestChannel.close() + if (!responseChannel.isClosedForSend) responseChannel.close() + } + responseJob = launch { + while (true) { + val str = socket.receiveOrNull() ?: break + val data = kotlin.js.JSON.parse<dynamic>(str).result + val par2 = try { + deserializeList<PAR2>(data, PAR2::class.js.name) + } catch (t: NotStandardTypeException) { + try { + @Suppress("UNCHECKED_CAST") + tryDeserializeEnumList(PAR2::class as KClass<Any>, data) as List<PAR2> + } catch (t: NotEnumTypeException) { + JSON.nonstrict.parse(PAR2::class.serializer().list, data) + } + } + responseChannel.send(par2) + } + requestJob.cancel() + handlerJob.cancel() + if (!requestChannel.isClosedForReceive) requestChannel.close() + if (!responseChannel.isClosedForSend) responseChannel.close() + } + handlerJob = launch { + exceptionHelper { + handler(requestChannel, responseChannel) + } + requestJob.cancel() + responseJob.cancel() + if (!requestChannel.isClosedForReceive) requestChannel.close() + if (!responseChannel.isClosedForSend) responseChannel.close() + } + } + } catch (e: Exception) { + console.log(e) + } + if (!requestChannel.isClosedForReceive) requestChannel.close() + if (!responseChannel.isClosedForSend) responseChannel.close() + socket.close() + } + + /** + * @suppress internal function + */ + suspend fun Socket.receiveOrNull(): String? { + return try { + this.receive() + } catch (e: SocketClosedException) { + console.log("Socket was closed: ${e.reason}") + null + } + } + + /** + * @suppress internal function + */ + fun Socket.sendOrFalse(str: String): Boolean { + return try { + this.send(str) + true + } catch (e: SocketClosedException) { + console.log("Socket was closed: ${e.reason}") + false + } + } + + /** + * @suppress internal function + */ + suspend fun exceptionHelper(block: suspend () -> Unit) { + try { + block() + } catch (e: Exception) { + console.log(e) + } + } /** * @suppress diff --git a/kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/KVServiceManager.kt b/kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/KVServiceManager.kt index 4ca6a4a5..0cf72f76 100644 --- a/kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/KVServiceManager.kt +++ b/kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/KVServiceManager.kt @@ -21,6 +21,8 @@ */ package pl.treksoft.kvision.remote +import kotlinx.coroutines.channels.ReceiveChannel +import kotlinx.coroutines.channels.SendChannel import kotlin.reflect.KClass /** @@ -138,6 +140,19 @@ actual open class KVServiceManager<T : Any> actual constructor(serviceClass: KCl } /** + * Binds a given web socket connetion with a function of the receiver. + * @param function a function of the receiver + * @param route a web socket route + */ + protected actual inline fun <reified PAR1 : Any, reified PAR2 : Any> bind( + noinline function: suspend T.(ReceiveChannel<PAR1>, SendChannel<PAR2>) -> Unit, + route: String? + ) { + val routeDef = "route${this::class.simpleName}${counter++}" + calls[function.toString().replace("\\s".toRegex(), "")] = Pair("/kv/$routeDef", HttpMethod.POST) + } + + /** * Returns the map of defined paths. */ fun getCalls(): Map<String, Pair<String, HttpMethod>> = calls diff --git a/kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/Socket.kt b/kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/Socket.kt new file mode 100644 index 00000000..c547ce9b --- /dev/null +++ b/kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/Socket.kt @@ -0,0 +1,171 @@ +/* + * Copyright (c) 2017-present Robert Jaros + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ +package pl.treksoft.kvision.remote + +import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import org.w3c.dom.CloseEvent +import org.w3c.dom.ErrorEvent +import org.w3c.dom.MessageEvent +import org.w3c.dom.WebSocket +import org.w3c.dom.WebSocket.Companion.CLOSED +import org.w3c.dom.WebSocket.Companion.CLOSING +import org.w3c.dom.WebSocket.Companion.OPEN +import org.w3c.dom.events.Event +import kotlin.coroutines.resume +import kotlin.coroutines.suspendCoroutine + +// +// Code taken from: https://discuss.kotlinlang.org/t/js-coroutines-experiements/8245/2 +// + +/** + * Websocket closed exception class. + */ +class SocketClosedException(val reason: String) : Throwable(reason) + +/** + * A websocket client implementation. + */ +class Socket { + private var eventQueue: Channel<Event> = Channel(Channel.UNLIMITED) + private lateinit var ws: WebSocket + val state: Short + get() = ws.readyState + + private fun onWsEvent(event: Event) { + GlobalScope.launch { eventQueue.send(event) } + } + + /** + * Connect to a websocket. + */ + suspend fun connect(url: String, retryDelay: Long = 1000) { + while (true) { + val connected = suspendCoroutine<Boolean> { cont -> + while (eventQueue.poll() != null) {/*drain*/ + } + ws = WebSocket(url) + ws.onopen = { + ws.onclose = ::onWsEvent + ws.onerror = ::onWsEvent + cont.resume(true) + } + ws.onmessage = ::onWsEvent + ws.onerror = { + logError(it) + } + ws.onclose = { + cont.resume(false) + } + } + if (connected) + break + else + delay(retryDelay) + } + } + + /** + * Receive a string from a websocket. + */ + suspend fun receive(): String { + val event = eventQueue.receive() + return when (event) { + is MessageEvent -> { + event.data as String + } + is CloseEvent -> { + val reason = getReason(event.code) + throw SocketClosedException(reason) + } + is ErrorEvent -> { + logError(event) + close() + throw SocketClosedException(event.message) + } + else -> { + val reason = getReason(4001) + console.error(reason) + close() + throw SocketClosedException(reason) + } + } + } + + /** + * Send string to a websocket. + */ + fun send(obj: String) { + when { + isClosed() -> { + console.error(getReason(4002)) + throw SocketClosedException(getReason(4002)) + } + else -> ws.send(obj) + } + } + + /** + * Close a websocket. + */ + fun close(code: Short = 1000) { + when (state) { + OPEN -> ws.close(code, getReason(1000)) + } + } + + /** + * Returns if a websocket is closed. + */ + fun isClosed(): Boolean { + return when (state) { + CLOSED, CLOSING -> true + else -> false + } + } + + private fun logError(event: Event) = console.error("An error %o occurred when connecting to ${ws.url}", event) + + private fun getReason(code: Short): String { + return when (code.toInt()) { // See http://tools.ietf.org/html/rfc6455#section-7.4.1 + 1000 -> "Normal closure" + 1001 -> "An endpoint is \"going away\", such as a server going down or a browser having navigated away from a page." + 1002 -> "An endpoint is terminating the connection due to a protocol error" + 1003 -> "An endpoint is terminating the connection because it has received a type of data it cannot accept (e.g., an endpoint that understands only text data MAY send this if it receives a binary message)." + 1004 -> "Reserved. The specific meaning might be defined in the future." + 1005 -> "No status code was actually present." + 1006 -> "The connection was closed abnormally, e.g., without sending or receiving a Close control frame" + 1007 -> "An endpoint is terminating the connection because it has received data within a message that was not consistent with the type of the message (e.g., non-UTF-8 [http://tools.ietf.org/html/rfc3629] data within a text message)." + 1008 -> "An endpoint is terminating the connection because it has received a message that \"violates its policy\". This reason is given either if there is no other sutible reason, or if there is a need to hide specific details about the policy." + 1009 -> "An endpoint is terminating the connection because it has received a message that is too big for it to process." + 1010 -> "An endpoint (client ) is terminating the connection because it has expected the server to negotiate one or more extension, but the server didn't return them in the response message of the WebSocket handshake. <br /> Specifically, the extensions that are needed are: " + 1011 -> "A server is terminating the connection because it encountered an unexpected condition that prevented it from fulfilling the request." + 1015 -> "The connection was closed due to a failure to perform a TLS handshake (e.g., the server certificate can't be verified)." + 4001 -> "Unexpected event" + 4002 -> "You are trying to use closed socket" + else -> "Unknown reason" + } + } +} diff --git a/kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/Utils.kt b/kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/Utils.kt index 273c2a65..c4f4ed6d 100644 --- a/kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/Utils.kt +++ b/kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/Utils.kt @@ -25,6 +25,7 @@ import kotlinx.serialization.SerializationStrategy import kotlinx.serialization.context.SimpleModule import kotlinx.serialization.json.Json import pl.treksoft.kvision.types.JsonDateSerializer +import kotlin.browser.window import kotlin.js.Date /** @@ -60,3 +61,14 @@ object JSON { return kotlin.js.JSON.parse(plain.stringify(serializer, this)) } } + +/** + * Creates a websocket URL from current window.location and given path. + */ +fun getWebSocketUrl(url: String): String { + val location = window.location + val scheme = if (location.protocol == "https:") "wss" else "ws" + val port = if (location.port == "8088") ":8080" + else if (location.port != "0" && location.port != "") ":${location.port}" else "" + return "$scheme://${location.hostname}$port$url" +} diff --git a/kvision-modules/kvision-server-jooby/src/main/kotlin/pl/treksoft/kvision/remote/KVServiceManager.kt b/kvision-modules/kvision-server-jooby/src/main/kotlin/pl/treksoft/kvision/remote/KVServiceManager.kt index d1e64ce4..cfd9d467 100644 --- a/kvision-modules/kvision-server-jooby/src/main/kotlin/pl/treksoft/kvision/remote/KVServiceManager.kt +++ b/kvision-modules/kvision-server-jooby/src/main/kotlin/pl/treksoft/kvision/remote/KVServiceManager.kt @@ -25,6 +25,8 @@ import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import com.google.inject.Injector import kotlinx.coroutines.CoroutineStart import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.channels.ReceiveChannel +import kotlinx.coroutines.channels.SendChannel import kotlinx.coroutines.launch import org.jooby.Kooby import org.jooby.Request @@ -311,6 +313,19 @@ actual open class KVServiceManager<T : Any> actual constructor(val serviceClass: } /** + * Binds a given web socket connetion with a function of the receiver. + * @param function a function of the receiver + * @param route a route + */ + protected actual inline fun <reified PAR1 : Any, reified PAR2 : Any> bind( + noinline function: suspend T.(ReceiveChannel<PAR1>, SendChannel<PAR2>) -> Unit, + route: String? + ) { + TODO("Not implemented in Jooby module") + } + + + /** * Binds a given function of the receiver as a select options source * @param function a function of the receiver */ diff --git a/kvision-modules/kvision-server-ktor/build.gradle b/kvision-modules/kvision-server-ktor/build.gradle index 62b5fe83..44be9171 100644 --- a/kvision-modules/kvision-server-ktor/build.gradle +++ b/kvision-modules/kvision-server-ktor/build.gradle @@ -10,6 +10,7 @@ dependencies { compile "org.jetbrains.kotlinx:kotlinx-coroutines-core:$coroutinesVersion" compile "io.ktor:ktor-server-core:$ktorVersion" compile "io.ktor:ktor-jackson:$ktorVersion" + compile "io.ktor:ktor-websockets:$ktorVersion" compile "com.google.inject:guice:$guiceVersion" compile "com.fasterxml.jackson.module:jackson-module-kotlin:$jacksonModuleKotlinVersion" testCompile "org.jetbrains.kotlin:kotlin-test:$kotlinVersion" diff --git a/kvision-modules/kvision-server-ktor/src/main/kotlin/pl/treksoft/kvision/remote/KVModules.kt b/kvision-modules/kvision-server-ktor/src/main/kotlin/pl/treksoft/kvision/remote/KVModules.kt index 956e7301..e2edc175 100644 --- a/kvision-modules/kvision-server-ktor/src/main/kotlin/pl/treksoft/kvision/remote/KVModules.kt +++ b/kvision-modules/kvision-server-ktor/src/main/kotlin/pl/treksoft/kvision/remote/KVModules.kt @@ -31,17 +31,25 @@ import io.ktor.application.ApplicationCallPipeline import io.ktor.application.call import io.ktor.application.install import io.ktor.features.ContentNegotiation +import io.ktor.http.cio.websocket.Frame import io.ktor.http.content.defaultResource import io.ktor.http.content.resources import io.ktor.http.content.static import io.ktor.jackson.jackson import io.ktor.routing.routing import io.ktor.util.AttributeKey +import io.ktor.util.KtorExperimentalAPI +import io.ktor.websocket.WebSocketServerSession +import io.ktor.websocket.WebSockets +import kotlinx.coroutines.channels.ReceiveChannel +import kotlinx.coroutines.channels.SendChannel +import kotlin.coroutines.CoroutineContext fun Application.kvisionInit(vararg modules: Module) { install(ContentNegotiation) { jackson() } + install(WebSockets) routing { static("/") { resources("assets") @@ -72,3 +80,50 @@ class MainModule(private val application: Application) : AbstractModule() { bind(Application::class.java).toInstance(application) } } + +class WsSessionModule(private val webSocketSession: WebSocketServerSession) : + AbstractModule() { + override fun configure() { + bind(WebSocketServerSession::class.java).toInstance(webSocketSession) + } +} + +class DummyWsSessionModule() : AbstractModule() { + override fun configure() { + bind(WebSocketServerSession::class.java).toInstance(DummyWebSocketServerSession()) + } +} + +class DummyWebSocketServerSession : WebSocketServerSession { + override val call: ApplicationCall + get() = throw UnsupportedOperationException() + override val coroutineContext: CoroutineContext + get() = throw UnsupportedOperationException() + override val incoming: ReceiveChannel<Frame> + get() = throw UnsupportedOperationException() + override var masking: Boolean + get() = throw UnsupportedOperationException() + set(value) { + throw UnsupportedOperationException() + } + override var maxFrameSize: Long + get() = throw UnsupportedOperationException() + set(value) { + throw UnsupportedOperationException() + } + override val outgoing: SendChannel<Frame> + get() = throw UnsupportedOperationException() + + @UseExperimental(KtorExperimentalAPI::class) + override suspend fun close(cause: Throwable?) { + throw UnsupportedOperationException() + } + + override suspend fun flush() { + throw UnsupportedOperationException() + } + + override fun terminate() { + throw UnsupportedOperationException() + } +} diff --git a/kvision-modules/kvision-server-ktor/src/main/kotlin/pl/treksoft/kvision/remote/KVServiceManager.kt b/kvision-modules/kvision-server-ktor/src/main/kotlin/pl/treksoft/kvision/remote/KVServiceManager.kt index d4985adf..0839a5c5 100644 --- a/kvision-modules/kvision-server-ktor/src/main/kotlin/pl/treksoft/kvision/remote/KVServiceManager.kt +++ b/kvision-modules/kvision-server-ktor/src/main/kotlin/pl/treksoft/kvision/remote/KVServiceManager.kt @@ -24,6 +24,10 @@ package pl.treksoft.kvision.remote import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import io.ktor.application.ApplicationCall import io.ktor.application.call +import io.ktor.http.cio.websocket.CloseReason +import io.ktor.http.cio.websocket.Frame +import io.ktor.http.cio.websocket.close +import io.ktor.http.cio.websocket.readText import io.ktor.request.receive import io.ktor.response.respond import io.ktor.routing.Route @@ -33,7 +37,17 @@ import io.ktor.routing.options import io.ktor.routing.post import io.ktor.routing.put import io.ktor.util.pipeline.PipelineContext +import io.ktor.websocket.WebSocketServerSession +import io.ktor.websocket.webSocket import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.channels.ReceiveChannel +import kotlinx.coroutines.channels.SendChannel +import kotlinx.coroutines.channels.filterNotNull +import kotlinx.coroutines.channels.map +import kotlinx.coroutines.channels.mapNotNull +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.launch import org.slf4j.Logger import org.slf4j.LoggerFactory import kotlin.reflect.KClass @@ -55,6 +69,8 @@ actual open class KVServiceManager<T : Any> actual constructor(val serviceClass: mutableMapOf() val optionsRequests: MutableMap<String, suspend PipelineContext<Unit, ApplicationCall>.(Unit) -> Unit> = mutableMapOf() + val webSocketRequests: MutableMap<String, suspend WebSocketServerSession.() -> Unit> = + mutableMapOf() val mapper = jacksonObjectMapper() var counter: Int = 0 @@ -72,7 +88,7 @@ actual open class KVServiceManager<T : Any> actual constructor(val serviceClass: ) { val routeDef = route ?: "route${this::class.simpleName}${counter++}" addRoute(method, "/kv/$routeDef") { - val service = call.injector.getInstance(serviceClass.java) + val service = call.injector.createChildInjector(DummyWsSessionModule()).getInstance(serviceClass.java) val jsonRpcRequest = if (method == HttpMethod.GET) { JsonRpcRequest(call.request.queryParameters["id"]?.toInt() ?: 0, "", listOf()) } else { @@ -113,7 +129,7 @@ actual open class KVServiceManager<T : Any> actual constructor(val serviceClass: throw UnsupportedOperationException("GET method is only supported for methods without parameters") val routeDef = route ?: "route${this::class.simpleName}${counter++}" addRoute(method, "/kv/$routeDef") { - val service = call.injector.getInstance(serviceClass.java) + val service = call.injector.createChildInjector(DummyWsSessionModule()).getInstance(serviceClass.java) val jsonRpcRequest = call.receive<JsonRpcRequest>() if (jsonRpcRequest.params.size == 1) { val param = getParameter<PAR>(jsonRpcRequest.params[0]) @@ -160,7 +176,7 @@ actual open class KVServiceManager<T : Any> actual constructor(val serviceClass: throw UnsupportedOperationException("GET method is only supported for methods without parameters") val routeDef = route ?: "route${this::class.simpleName}${counter++}" addRoute(method, "/kv/$routeDef") { - val service = call.injector.getInstance(serviceClass.java) + val service = call.injector.createChildInjector(DummyWsSessionModule()).getInstance(serviceClass.java) val jsonRpcRequest = call.receive<JsonRpcRequest>() if (jsonRpcRequest.params.size == 2) { val param1 = getParameter<PAR1>(jsonRpcRequest.params[0]) @@ -208,7 +224,7 @@ actual open class KVServiceManager<T : Any> actual constructor(val serviceClass: throw UnsupportedOperationException("GET method is only supported for methods without parameters") val routeDef = route ?: "route${this::class.simpleName}${counter++}" addRoute(method, "/kv/$routeDef") { - val service = call.injector.getInstance(serviceClass.java) + val service = call.injector.createChildInjector(DummyWsSessionModule()).getInstance(serviceClass.java) val jsonRpcRequest = call.receive<JsonRpcRequest>() @Suppress("MagicNumber") if (jsonRpcRequest.params.size == 3) { @@ -258,7 +274,7 @@ actual open class KVServiceManager<T : Any> actual constructor(val serviceClass: throw UnsupportedOperationException("GET method is only supported for methods without parameters") val routeDef = route ?: "route${this::class.simpleName}${counter++}" addRoute(method, "/kv/$routeDef") { - val service = call.injector.getInstance(serviceClass.java) + val service = call.injector.createChildInjector(DummyWsSessionModule()).getInstance(serviceClass.java) val jsonRpcRequest = call.receive<JsonRpcRequest>() @Suppress("MagicNumber") if (jsonRpcRequest.params.size == 4) { @@ -310,7 +326,7 @@ actual open class KVServiceManager<T : Any> actual constructor(val serviceClass: throw UnsupportedOperationException("GET method is only supported for methods without parameters") val routeDef = route ?: "route${this::class.simpleName}${counter++}" addRoute(method, "/kv/$routeDef") { - val service = call.injector.getInstance(serviceClass.java) + val service = call.injector.createChildInjector(DummyWsSessionModule()).getInstance(serviceClass.java) val jsonRpcRequest = call.receive<JsonRpcRequest>() @Suppress("MagicNumber") if (jsonRpcRequest.params.size == 5) { @@ -348,6 +364,51 @@ actual open class KVServiceManager<T : Any> actual constructor(val serviceClass: } /** + * Binds a given web socket connetion with a function of the receiver. + * @param function a function of the receiver + * @param route a route + */ + protected actual inline fun <reified PAR1 : Any, reified PAR2 : Any> bind( + noinline function: suspend T.(ReceiveChannel<PAR1>, SendChannel<PAR2>) -> Unit, + route: String? + ) { + val routeDef = "route${this::class.simpleName}${counter++}" + webSocketRequests["/kv/$routeDef"] = { + val wsInjector = call.injector.createChildInjector(WsSessionModule(this)) + val service = wsInjector.getInstance(serviceClass.java) + val requestChannel = incoming.mapNotNull { it as? Frame.Text }.map { + val jsonRpcRequest = getParameter<JsonRpcRequest>(it.readText()) + if (jsonRpcRequest.params.size == 1) { + getParameter<PAR1>(jsonRpcRequest.params[0]) + } else { + null + } + }.filterNotNull() + val responseChannel = Channel<PAR2>() + val session = this + coroutineScope { + launch { + for (p in responseChannel) { + val text = mapper.writeValueAsString( + JsonRpcResponse( + id = 0, + result = mapper.writeValueAsString(p) + ) + ) + outgoing.send(Frame.Text(text)) + } + session.close(CloseReason(CloseReason.Codes.NORMAL, "")) + session.close() + } + launch { + function.invoke(service, requestChannel, responseChannel) + if (!responseChannel.isClosedForReceive) responseChannel.close() + } + } + } + } + + /** * Binds a given function of the receiver as a select options source * @param function a function of the receiver */ @@ -357,7 +418,7 @@ actual open class KVServiceManager<T : Any> actual constructor(val serviceClass: ) { val routeDef = "route${this::class.simpleName}${counter++}" addRoute(HttpMethod.POST, "/kv/$routeDef") { - val service = call.injector.getInstance(serviceClass.java) + val service = call.injector.createChildInjector(DummyWsSessionModule()).getInstance(serviceClass.java) val jsonRpcRequest = call.receive<JsonRpcRequest>() if (jsonRpcRequest.params.size == 2) { val param1 = getParameter<String?>(jsonRpcRequest.params[0]) @@ -431,4 +492,9 @@ fun <T : Any> Route.applyRoutes(serviceManager: KVServiceManager<T>) { serviceManager.optionsRequests.forEach { (path, handler) -> options(path, handler) } + serviceManager.webSocketRequests.forEach { (path, handler) -> + this.webSocket(path) { + handler() + } + } } diff --git a/kvision-modules/kvision-server-spring-boot/src/main/kotlin/pl/treksoft/kvision/remote/KVServiceManager.kt b/kvision-modules/kvision-server-spring-boot/src/main/kotlin/pl/treksoft/kvision/remote/KVServiceManager.kt index 66dc4b99..66429acf 100644 --- a/kvision-modules/kvision-server-spring-boot/src/main/kotlin/pl/treksoft/kvision/remote/KVServiceManager.kt +++ b/kvision-modules/kvision-server-spring-boot/src/main/kotlin/pl/treksoft/kvision/remote/KVServiceManager.kt @@ -25,6 +25,8 @@ import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import kotlinx.coroutines.CoroutineStart import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.channels.ReceiveChannel +import kotlinx.coroutines.channels.SendChannel import kotlinx.coroutines.launch import org.slf4j.Logger import org.slf4j.LoggerFactory @@ -392,6 +394,18 @@ actual open class KVServiceManager<T : Any> actual constructor(val serviceClass: } /** + * Binds a given web socket connetion with a function of the receiver. + * @param function a function of the receiver + * @param route a route + */ + protected actual inline fun <reified PAR1 : Any, reified PAR2 : Any> bind( + noinline function: suspend T.(ReceiveChannel<PAR1>, SendChannel<PAR2>) -> Unit, + route: String? + ) { + TODO("Not implemented in Spring Boot module") + } + + /** * Binds a given function of the receiver as a select options source * @param function a function of the receiver */ |