From f1f0423f63cc9f39fa74fc9b68bb680f1d1abd31 Mon Sep 17 00:00:00 2001 From: Robert Jaros Date: Sun, 31 Mar 2019 00:06:25 +0100 Subject: Websockets implementation --- .../pl/treksoft/kvision/remote/KVRemoteAgent.kt | 180 +++++++++++++++++++++ .../pl/treksoft/kvision/remote/KVServiceManager.kt | 15 ++ .../kotlin/pl/treksoft/kvision/remote/Socket.kt | 171 ++++++++++++++++++++ .../kotlin/pl/treksoft/kvision/remote/Utils.kt | 12 ++ 4 files changed, 378 insertions(+) create mode 100644 kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/Socket.kt (limited to 'kvision-modules/kvision-remote') diff --git a/kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/KVRemoteAgent.kt b/kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/KVRemoteAgent.kt index 6054592e..3e538bd1 100644 --- a/kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/KVRemoteAgent.kt +++ b/kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/KVRemoteAgent.kt @@ -21,10 +21,18 @@ */ package pl.treksoft.kvision.remote +import kotlinx.coroutines.Job import kotlinx.coroutines.asDeferred +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.channels.ReceiveChannel +import kotlinx.coroutines.channels.SendChannel +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.launch import kotlinx.serialization.ImplicitReflectionSerializer import kotlinx.serialization.list import kotlinx.serialization.serializer +import kotlinx.serialization.stringify +import kotlin.js.console import kotlin.js.js import kotlin.reflect.KClass import kotlin.js.JSON as NativeJSON @@ -366,6 +374,178 @@ open class KVRemoteAgent(val serviceManager: KVServiceManager) : Rem }.asDeferred().await() } + /** + * Executes defined web socket connection + */ + suspend inline fun webSocket( + noinline function: suspend T.(ReceiveChannel, SendChannel) -> Unit, + noinline handler: suspend (SendChannel, ReceiveChannel) -> Unit + ) { + val (url, _) = + serviceManager.getCalls()[function.toString().replace("\\s".toRegex(), "")] + ?: throw IllegalStateException("Function not specified!") + val socket = Socket() + val requestChannel = Channel() + val responseChannel = Channel() + try { + coroutineScope { + socket.connect(getWebSocketUrl(url)) + lateinit var responseJob: Job + lateinit var handlerJob: Job + val requestJob = launch { + for (par1 in requestChannel) { + val param = serialize(par1) + val str = JSON.plain.stringify(JsonRpcRequest(0, url, listOf(param))) + if (!socket.sendOrFalse(str)) break + } + responseJob.cancel() + handlerJob.cancel() + if (!requestChannel.isClosedForReceive) requestChannel.close() + if (!responseChannel.isClosedForSend) responseChannel.close() + } + responseJob = launch { + while (true) { + val str = socket.receiveOrNull() ?: break + val data = kotlin.js.JSON.parse(str).result + val par2 = try { + @Suppress("UNCHECKED_CAST") + deserialize(data, PAR2::class.js.name) + } catch (t: NotStandardTypeException) { + try { + @Suppress("UNCHECKED_CAST") + tryDeserializeEnum(PAR2::class as KClass, data) as PAR2 + } catch (t: NotEnumTypeException) { + JSON.nonstrict.parse(PAR2::class.serializer(), data) + } + } + responseChannel.send(par2) + } + requestJob.cancel() + handlerJob.cancel() + if (!requestChannel.isClosedForReceive) requestChannel.close() + if (!responseChannel.isClosedForSend) responseChannel.close() + } + handlerJob = launch { + exceptionHelper { + handler(requestChannel, responseChannel) + } + requestJob.cancel() + responseJob.cancel() + if (!requestChannel.isClosedForReceive) requestChannel.close() + if (!responseChannel.isClosedForSend) responseChannel.close() + } + } + } catch (e: Exception) { + console.log(e) + } + if (!requestChannel.isClosedForReceive) requestChannel.close() + if (!responseChannel.isClosedForSend) responseChannel.close() + socket.close() + } + + /** + * Executes defined web socket connection returning list objects + */ + suspend inline fun webSocket( + noinline function: suspend T.(ReceiveChannel, SendChannel>) -> Unit, + noinline handler: suspend (SendChannel, ReceiveChannel>) -> Unit + ) { + val (url, _) = + serviceManager.getCalls()[function.toString().replace("\\s".toRegex(), "")] + ?: throw IllegalStateException("Function not specified!") + val socket = Socket() + val requestChannel = Channel() + val responseChannel = Channel>() + try { + coroutineScope { + socket.connect(getWebSocketUrl(url)) + lateinit var responseJob: Job + lateinit var handlerJob: Job + val requestJob = launch { + for (par1 in requestChannel) { + val param = serialize(par1) + val str = JSON.plain.stringify(JsonRpcRequest(0, url, listOf(param))) + if (!socket.sendOrFalse(str)) break + } + responseJob.cancel() + handlerJob.cancel() + if (!requestChannel.isClosedForReceive) requestChannel.close() + if (!responseChannel.isClosedForSend) responseChannel.close() + } + responseJob = launch { + while (true) { + val str = socket.receiveOrNull() ?: break + val data = kotlin.js.JSON.parse(str).result + val par2 = try { + deserializeList(data, PAR2::class.js.name) + } catch (t: NotStandardTypeException) { + try { + @Suppress("UNCHECKED_CAST") + tryDeserializeEnumList(PAR2::class as KClass, data) as List + } catch (t: NotEnumTypeException) { + JSON.nonstrict.parse(PAR2::class.serializer().list, data) + } + } + responseChannel.send(par2) + } + requestJob.cancel() + handlerJob.cancel() + if (!requestChannel.isClosedForReceive) requestChannel.close() + if (!responseChannel.isClosedForSend) responseChannel.close() + } + handlerJob = launch { + exceptionHelper { + handler(requestChannel, responseChannel) + } + requestJob.cancel() + responseJob.cancel() + if (!requestChannel.isClosedForReceive) requestChannel.close() + if (!responseChannel.isClosedForSend) responseChannel.close() + } + } + } catch (e: Exception) { + console.log(e) + } + if (!requestChannel.isClosedForReceive) requestChannel.close() + if (!responseChannel.isClosedForSend) responseChannel.close() + socket.close() + } + + /** + * @suppress internal function + */ + suspend fun Socket.receiveOrNull(): String? { + return try { + this.receive() + } catch (e: SocketClosedException) { + console.log("Socket was closed: ${e.reason}") + null + } + } + + /** + * @suppress internal function + */ + fun Socket.sendOrFalse(str: String): Boolean { + return try { + this.send(str) + true + } catch (e: SocketClosedException) { + console.log("Socket was closed: ${e.reason}") + false + } + } + + /** + * @suppress internal function + */ + suspend fun exceptionHelper(block: suspend () -> Unit) { + try { + block() + } catch (e: Exception) { + console.log(e) + } + } /** * @suppress diff --git a/kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/KVServiceManager.kt b/kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/KVServiceManager.kt index 4ca6a4a5..0cf72f76 100644 --- a/kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/KVServiceManager.kt +++ b/kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/KVServiceManager.kt @@ -21,6 +21,8 @@ */ package pl.treksoft.kvision.remote +import kotlinx.coroutines.channels.ReceiveChannel +import kotlinx.coroutines.channels.SendChannel import kotlin.reflect.KClass /** @@ -137,6 +139,19 @@ actual open class KVServiceManager actual constructor(serviceClass: KCl calls[function.toString().replace("\\s".toRegex(), "")] = Pair("/kv/$routeDef", HttpMethod.POST) } + /** + * Binds a given web socket connetion with a function of the receiver. + * @param function a function of the receiver + * @param route a web socket route + */ + protected actual inline fun bind( + noinline function: suspend T.(ReceiveChannel, SendChannel) -> Unit, + route: String? + ) { + val routeDef = "route${this::class.simpleName}${counter++}" + calls[function.toString().replace("\\s".toRegex(), "")] = Pair("/kv/$routeDef", HttpMethod.POST) + } + /** * Returns the map of defined paths. */ diff --git a/kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/Socket.kt b/kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/Socket.kt new file mode 100644 index 00000000..c547ce9b --- /dev/null +++ b/kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/Socket.kt @@ -0,0 +1,171 @@ +/* + * Copyright (c) 2017-present Robert Jaros + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ +package pl.treksoft.kvision.remote + +import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import org.w3c.dom.CloseEvent +import org.w3c.dom.ErrorEvent +import org.w3c.dom.MessageEvent +import org.w3c.dom.WebSocket +import org.w3c.dom.WebSocket.Companion.CLOSED +import org.w3c.dom.WebSocket.Companion.CLOSING +import org.w3c.dom.WebSocket.Companion.OPEN +import org.w3c.dom.events.Event +import kotlin.coroutines.resume +import kotlin.coroutines.suspendCoroutine + +// +// Code taken from: https://discuss.kotlinlang.org/t/js-coroutines-experiements/8245/2 +// + +/** + * Websocket closed exception class. + */ +class SocketClosedException(val reason: String) : Throwable(reason) + +/** + * A websocket client implementation. + */ +class Socket { + private var eventQueue: Channel = Channel(Channel.UNLIMITED) + private lateinit var ws: WebSocket + val state: Short + get() = ws.readyState + + private fun onWsEvent(event: Event) { + GlobalScope.launch { eventQueue.send(event) } + } + + /** + * Connect to a websocket. + */ + suspend fun connect(url: String, retryDelay: Long = 1000) { + while (true) { + val connected = suspendCoroutine { cont -> + while (eventQueue.poll() != null) {/*drain*/ + } + ws = WebSocket(url) + ws.onopen = { + ws.onclose = ::onWsEvent + ws.onerror = ::onWsEvent + cont.resume(true) + } + ws.onmessage = ::onWsEvent + ws.onerror = { + logError(it) + } + ws.onclose = { + cont.resume(false) + } + } + if (connected) + break + else + delay(retryDelay) + } + } + + /** + * Receive a string from a websocket. + */ + suspend fun receive(): String { + val event = eventQueue.receive() + return when (event) { + is MessageEvent -> { + event.data as String + } + is CloseEvent -> { + val reason = getReason(event.code) + throw SocketClosedException(reason) + } + is ErrorEvent -> { + logError(event) + close() + throw SocketClosedException(event.message) + } + else -> { + val reason = getReason(4001) + console.error(reason) + close() + throw SocketClosedException(reason) + } + } + } + + /** + * Send string to a websocket. + */ + fun send(obj: String) { + when { + isClosed() -> { + console.error(getReason(4002)) + throw SocketClosedException(getReason(4002)) + } + else -> ws.send(obj) + } + } + + /** + * Close a websocket. + */ + fun close(code: Short = 1000) { + when (state) { + OPEN -> ws.close(code, getReason(1000)) + } + } + + /** + * Returns if a websocket is closed. + */ + fun isClosed(): Boolean { + return when (state) { + CLOSED, CLOSING -> true + else -> false + } + } + + private fun logError(event: Event) = console.error("An error %o occurred when connecting to ${ws.url}", event) + + private fun getReason(code: Short): String { + return when (code.toInt()) { // See http://tools.ietf.org/html/rfc6455#section-7.4.1 + 1000 -> "Normal closure" + 1001 -> "An endpoint is \"going away\", such as a server going down or a browser having navigated away from a page." + 1002 -> "An endpoint is terminating the connection due to a protocol error" + 1003 -> "An endpoint is terminating the connection because it has received a type of data it cannot accept (e.g., an endpoint that understands only text data MAY send this if it receives a binary message)." + 1004 -> "Reserved. The specific meaning might be defined in the future." + 1005 -> "No status code was actually present." + 1006 -> "The connection was closed abnormally, e.g., without sending or receiving a Close control frame" + 1007 -> "An endpoint is terminating the connection because it has received data within a message that was not consistent with the type of the message (e.g., non-UTF-8 [http://tools.ietf.org/html/rfc3629] data within a text message)." + 1008 -> "An endpoint is terminating the connection because it has received a message that \"violates its policy\". This reason is given either if there is no other sutible reason, or if there is a need to hide specific details about the policy." + 1009 -> "An endpoint is terminating the connection because it has received a message that is too big for it to process." + 1010 -> "An endpoint (client ) is terminating the connection because it has expected the server to negotiate one or more extension, but the server didn't return them in the response message of the WebSocket handshake.
Specifically, the extensions that are needed are: " + 1011 -> "A server is terminating the connection because it encountered an unexpected condition that prevented it from fulfilling the request." + 1015 -> "The connection was closed due to a failure to perform a TLS handshake (e.g., the server certificate can't be verified)." + 4001 -> "Unexpected event" + 4002 -> "You are trying to use closed socket" + else -> "Unknown reason" + } + } +} diff --git a/kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/Utils.kt b/kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/Utils.kt index 273c2a65..c4f4ed6d 100644 --- a/kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/Utils.kt +++ b/kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/Utils.kt @@ -25,6 +25,7 @@ import kotlinx.serialization.SerializationStrategy import kotlinx.serialization.context.SimpleModule import kotlinx.serialization.json.Json import pl.treksoft.kvision.types.JsonDateSerializer +import kotlin.browser.window import kotlin.js.Date /** @@ -60,3 +61,14 @@ object JSON { return kotlin.js.JSON.parse(plain.stringify(serializer, this)) } } + +/** + * Creates a websocket URL from current window.location and given path. + */ +fun getWebSocketUrl(url: String): String { + val location = window.location + val scheme = if (location.protocol == "https:") "wss" else "ws" + val port = if (location.port == "8088") ":8080" + else if (location.port != "0" && location.port != "") ":${location.port}" else "" + return "$scheme://${location.hostname}$port$url" +} -- cgit