diff options
author | Robert Jaros <rjaros@finn.pl> | 2019-03-31 00:06:25 +0100 |
---|---|---|
committer | Robert Jaros <rjaros@finn.pl> | 2019-03-31 00:06:25 +0100 |
commit | f1f0423f63cc9f39fa74fc9b68bb680f1d1abd31 (patch) | |
tree | dc083b68749b09d7d8c0ed4bfa90262feac83624 /kvision-modules/kvision-remote | |
parent | 967f6278ded0d540ce3d7613bd0d39338ef14d63 (diff) | |
download | kvision-f1f0423f63cc9f39fa74fc9b68bb680f1d1abd31.tar.gz kvision-f1f0423f63cc9f39fa74fc9b68bb680f1d1abd31.tar.bz2 kvision-f1f0423f63cc9f39fa74fc9b68bb680f1d1abd31.zip |
Websockets implementation
Diffstat (limited to 'kvision-modules/kvision-remote')
4 files changed, 378 insertions, 0 deletions
diff --git a/kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/KVRemoteAgent.kt b/kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/KVRemoteAgent.kt index 6054592e..3e538bd1 100644 --- a/kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/KVRemoteAgent.kt +++ b/kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/KVRemoteAgent.kt @@ -21,10 +21,18 @@ */ package pl.treksoft.kvision.remote +import kotlinx.coroutines.Job import kotlinx.coroutines.asDeferred +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.channels.ReceiveChannel +import kotlinx.coroutines.channels.SendChannel +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.launch import kotlinx.serialization.ImplicitReflectionSerializer import kotlinx.serialization.list import kotlinx.serialization.serializer +import kotlinx.serialization.stringify +import kotlin.js.console import kotlin.js.js import kotlin.reflect.KClass import kotlin.js.JSON as NativeJSON @@ -366,6 +374,178 @@ open class KVRemoteAgent<T : Any>(val serviceManager: KVServiceManager<T>) : Rem }.asDeferred().await() } + /** + * Executes defined web socket connection + */ + suspend inline fun <reified PAR1 : Any, reified PAR2 : Any> webSocket( + noinline function: suspend T.(ReceiveChannel<PAR1>, SendChannel<PAR2>) -> Unit, + noinline handler: suspend (SendChannel<PAR1>, ReceiveChannel<PAR2>) -> Unit + ) { + val (url, _) = + serviceManager.getCalls()[function.toString().replace("\\s".toRegex(), "")] + ?: throw IllegalStateException("Function not specified!") + val socket = Socket() + val requestChannel = Channel<PAR1>() + val responseChannel = Channel<PAR2>() + try { + coroutineScope { + socket.connect(getWebSocketUrl(url)) + lateinit var responseJob: Job + lateinit var handlerJob: Job + val requestJob = launch { + for (par1 in requestChannel) { + val param = serialize(par1) + val str = JSON.plain.stringify(JsonRpcRequest(0, url, listOf(param))) + if (!socket.sendOrFalse(str)) break + } + responseJob.cancel() + handlerJob.cancel() + if (!requestChannel.isClosedForReceive) requestChannel.close() + if (!responseChannel.isClosedForSend) responseChannel.close() + } + responseJob = launch { + while (true) { + val str = socket.receiveOrNull() ?: break + val data = kotlin.js.JSON.parse<dynamic>(str).result + val par2 = try { + @Suppress("UNCHECKED_CAST") + deserialize<PAR2>(data, PAR2::class.js.name) + } catch (t: NotStandardTypeException) { + try { + @Suppress("UNCHECKED_CAST") + tryDeserializeEnum(PAR2::class as KClass<Any>, data) as PAR2 + } catch (t: NotEnumTypeException) { + JSON.nonstrict.parse(PAR2::class.serializer(), data) + } + } + responseChannel.send(par2) + } + requestJob.cancel() + handlerJob.cancel() + if (!requestChannel.isClosedForReceive) requestChannel.close() + if (!responseChannel.isClosedForSend) responseChannel.close() + } + handlerJob = launch { + exceptionHelper { + handler(requestChannel, responseChannel) + } + requestJob.cancel() + responseJob.cancel() + if (!requestChannel.isClosedForReceive) requestChannel.close() + if (!responseChannel.isClosedForSend) responseChannel.close() + } + } + } catch (e: Exception) { + console.log(e) + } + if (!requestChannel.isClosedForReceive) requestChannel.close() + if (!responseChannel.isClosedForSend) responseChannel.close() + socket.close() + } + + /** + * Executes defined web socket connection returning list objects + */ + suspend inline fun <reified PAR1 : Any, reified PAR2 : Any> webSocket( + noinline function: suspend T.(ReceiveChannel<PAR1>, SendChannel<List<PAR2>>) -> Unit, + noinline handler: suspend (SendChannel<PAR1>, ReceiveChannel<List<PAR2>>) -> Unit + ) { + val (url, _) = + serviceManager.getCalls()[function.toString().replace("\\s".toRegex(), "")] + ?: throw IllegalStateException("Function not specified!") + val socket = Socket() + val requestChannel = Channel<PAR1>() + val responseChannel = Channel<List<PAR2>>() + try { + coroutineScope { + socket.connect(getWebSocketUrl(url)) + lateinit var responseJob: Job + lateinit var handlerJob: Job + val requestJob = launch { + for (par1 in requestChannel) { + val param = serialize(par1) + val str = JSON.plain.stringify(JsonRpcRequest(0, url, listOf(param))) + if (!socket.sendOrFalse(str)) break + } + responseJob.cancel() + handlerJob.cancel() + if (!requestChannel.isClosedForReceive) requestChannel.close() + if (!responseChannel.isClosedForSend) responseChannel.close() + } + responseJob = launch { + while (true) { + val str = socket.receiveOrNull() ?: break + val data = kotlin.js.JSON.parse<dynamic>(str).result + val par2 = try { + deserializeList<PAR2>(data, PAR2::class.js.name) + } catch (t: NotStandardTypeException) { + try { + @Suppress("UNCHECKED_CAST") + tryDeserializeEnumList(PAR2::class as KClass<Any>, data) as List<PAR2> + } catch (t: NotEnumTypeException) { + JSON.nonstrict.parse(PAR2::class.serializer().list, data) + } + } + responseChannel.send(par2) + } + requestJob.cancel() + handlerJob.cancel() + if (!requestChannel.isClosedForReceive) requestChannel.close() + if (!responseChannel.isClosedForSend) responseChannel.close() + } + handlerJob = launch { + exceptionHelper { + handler(requestChannel, responseChannel) + } + requestJob.cancel() + responseJob.cancel() + if (!requestChannel.isClosedForReceive) requestChannel.close() + if (!responseChannel.isClosedForSend) responseChannel.close() + } + } + } catch (e: Exception) { + console.log(e) + } + if (!requestChannel.isClosedForReceive) requestChannel.close() + if (!responseChannel.isClosedForSend) responseChannel.close() + socket.close() + } + + /** + * @suppress internal function + */ + suspend fun Socket.receiveOrNull(): String? { + return try { + this.receive() + } catch (e: SocketClosedException) { + console.log("Socket was closed: ${e.reason}") + null + } + } + + /** + * @suppress internal function + */ + fun Socket.sendOrFalse(str: String): Boolean { + return try { + this.send(str) + true + } catch (e: SocketClosedException) { + console.log("Socket was closed: ${e.reason}") + false + } + } + + /** + * @suppress internal function + */ + suspend fun exceptionHelper(block: suspend () -> Unit) { + try { + block() + } catch (e: Exception) { + console.log(e) + } + } /** * @suppress diff --git a/kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/KVServiceManager.kt b/kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/KVServiceManager.kt index 4ca6a4a5..0cf72f76 100644 --- a/kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/KVServiceManager.kt +++ b/kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/KVServiceManager.kt @@ -21,6 +21,8 @@ */ package pl.treksoft.kvision.remote +import kotlinx.coroutines.channels.ReceiveChannel +import kotlinx.coroutines.channels.SendChannel import kotlin.reflect.KClass /** @@ -138,6 +140,19 @@ actual open class KVServiceManager<T : Any> actual constructor(serviceClass: KCl } /** + * Binds a given web socket connetion with a function of the receiver. + * @param function a function of the receiver + * @param route a web socket route + */ + protected actual inline fun <reified PAR1 : Any, reified PAR2 : Any> bind( + noinline function: suspend T.(ReceiveChannel<PAR1>, SendChannel<PAR2>) -> Unit, + route: String? + ) { + val routeDef = "route${this::class.simpleName}${counter++}" + calls[function.toString().replace("\\s".toRegex(), "")] = Pair("/kv/$routeDef", HttpMethod.POST) + } + + /** * Returns the map of defined paths. */ fun getCalls(): Map<String, Pair<String, HttpMethod>> = calls diff --git a/kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/Socket.kt b/kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/Socket.kt new file mode 100644 index 00000000..c547ce9b --- /dev/null +++ b/kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/Socket.kt @@ -0,0 +1,171 @@ +/* + * Copyright (c) 2017-present Robert Jaros + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ +package pl.treksoft.kvision.remote + +import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import org.w3c.dom.CloseEvent +import org.w3c.dom.ErrorEvent +import org.w3c.dom.MessageEvent +import org.w3c.dom.WebSocket +import org.w3c.dom.WebSocket.Companion.CLOSED +import org.w3c.dom.WebSocket.Companion.CLOSING +import org.w3c.dom.WebSocket.Companion.OPEN +import org.w3c.dom.events.Event +import kotlin.coroutines.resume +import kotlin.coroutines.suspendCoroutine + +// +// Code taken from: https://discuss.kotlinlang.org/t/js-coroutines-experiements/8245/2 +// + +/** + * Websocket closed exception class. + */ +class SocketClosedException(val reason: String) : Throwable(reason) + +/** + * A websocket client implementation. + */ +class Socket { + private var eventQueue: Channel<Event> = Channel(Channel.UNLIMITED) + private lateinit var ws: WebSocket + val state: Short + get() = ws.readyState + + private fun onWsEvent(event: Event) { + GlobalScope.launch { eventQueue.send(event) } + } + + /** + * Connect to a websocket. + */ + suspend fun connect(url: String, retryDelay: Long = 1000) { + while (true) { + val connected = suspendCoroutine<Boolean> { cont -> + while (eventQueue.poll() != null) {/*drain*/ + } + ws = WebSocket(url) + ws.onopen = { + ws.onclose = ::onWsEvent + ws.onerror = ::onWsEvent + cont.resume(true) + } + ws.onmessage = ::onWsEvent + ws.onerror = { + logError(it) + } + ws.onclose = { + cont.resume(false) + } + } + if (connected) + break + else + delay(retryDelay) + } + } + + /** + * Receive a string from a websocket. + */ + suspend fun receive(): String { + val event = eventQueue.receive() + return when (event) { + is MessageEvent -> { + event.data as String + } + is CloseEvent -> { + val reason = getReason(event.code) + throw SocketClosedException(reason) + } + is ErrorEvent -> { + logError(event) + close() + throw SocketClosedException(event.message) + } + else -> { + val reason = getReason(4001) + console.error(reason) + close() + throw SocketClosedException(reason) + } + } + } + + /** + * Send string to a websocket. + */ + fun send(obj: String) { + when { + isClosed() -> { + console.error(getReason(4002)) + throw SocketClosedException(getReason(4002)) + } + else -> ws.send(obj) + } + } + + /** + * Close a websocket. + */ + fun close(code: Short = 1000) { + when (state) { + OPEN -> ws.close(code, getReason(1000)) + } + } + + /** + * Returns if a websocket is closed. + */ + fun isClosed(): Boolean { + return when (state) { + CLOSED, CLOSING -> true + else -> false + } + } + + private fun logError(event: Event) = console.error("An error %o occurred when connecting to ${ws.url}", event) + + private fun getReason(code: Short): String { + return when (code.toInt()) { // See http://tools.ietf.org/html/rfc6455#section-7.4.1 + 1000 -> "Normal closure" + 1001 -> "An endpoint is \"going away\", such as a server going down or a browser having navigated away from a page." + 1002 -> "An endpoint is terminating the connection due to a protocol error" + 1003 -> "An endpoint is terminating the connection because it has received a type of data it cannot accept (e.g., an endpoint that understands only text data MAY send this if it receives a binary message)." + 1004 -> "Reserved. The specific meaning might be defined in the future." + 1005 -> "No status code was actually present." + 1006 -> "The connection was closed abnormally, e.g., without sending or receiving a Close control frame" + 1007 -> "An endpoint is terminating the connection because it has received data within a message that was not consistent with the type of the message (e.g., non-UTF-8 [http://tools.ietf.org/html/rfc3629] data within a text message)." + 1008 -> "An endpoint is terminating the connection because it has received a message that \"violates its policy\". This reason is given either if there is no other sutible reason, or if there is a need to hide specific details about the policy." + 1009 -> "An endpoint is terminating the connection because it has received a message that is too big for it to process." + 1010 -> "An endpoint (client ) is terminating the connection because it has expected the server to negotiate one or more extension, but the server didn't return them in the response message of the WebSocket handshake. <br /> Specifically, the extensions that are needed are: " + 1011 -> "A server is terminating the connection because it encountered an unexpected condition that prevented it from fulfilling the request." + 1015 -> "The connection was closed due to a failure to perform a TLS handshake (e.g., the server certificate can't be verified)." + 4001 -> "Unexpected event" + 4002 -> "You are trying to use closed socket" + else -> "Unknown reason" + } + } +} diff --git a/kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/Utils.kt b/kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/Utils.kt index 273c2a65..c4f4ed6d 100644 --- a/kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/Utils.kt +++ b/kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/Utils.kt @@ -25,6 +25,7 @@ import kotlinx.serialization.SerializationStrategy import kotlinx.serialization.context.SimpleModule import kotlinx.serialization.json.Json import pl.treksoft.kvision.types.JsonDateSerializer +import kotlin.browser.window import kotlin.js.Date /** @@ -60,3 +61,14 @@ object JSON { return kotlin.js.JSON.parse(plain.stringify(serializer, this)) } } + +/** + * Creates a websocket URL from current window.location and given path. + */ +fun getWebSocketUrl(url: String): String { + val location = window.location + val scheme = if (location.protocol == "https:") "wss" else "ws" + val port = if (location.port == "8088") ":8080" + else if (location.port != "0" && location.port != "") ":${location.port}" else "" + return "$scheme://${location.hostname}$port$url" +} |