aboutsummaryrefslogtreecommitdiff
path: root/kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft
diff options
context:
space:
mode:
authorRobert Jaros <rjaros@finn.pl>2019-03-31 00:06:25 +0100
committerRobert Jaros <rjaros@finn.pl>2019-03-31 00:06:25 +0100
commitf1f0423f63cc9f39fa74fc9b68bb680f1d1abd31 (patch)
treedc083b68749b09d7d8c0ed4bfa90262feac83624 /kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft
parent967f6278ded0d540ce3d7613bd0d39338ef14d63 (diff)
downloadkvision-f1f0423f63cc9f39fa74fc9b68bb680f1d1abd31.tar.gz
kvision-f1f0423f63cc9f39fa74fc9b68bb680f1d1abd31.tar.bz2
kvision-f1f0423f63cc9f39fa74fc9b68bb680f1d1abd31.zip
Websockets implementation
Diffstat (limited to 'kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft')
-rw-r--r--kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/KVRemoteAgent.kt180
-rw-r--r--kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/KVServiceManager.kt15
-rw-r--r--kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/Socket.kt171
-rw-r--r--kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/Utils.kt12
4 files changed, 378 insertions, 0 deletions
diff --git a/kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/KVRemoteAgent.kt b/kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/KVRemoteAgent.kt
index 6054592e..3e538bd1 100644
--- a/kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/KVRemoteAgent.kt
+++ b/kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/KVRemoteAgent.kt
@@ -21,10 +21,18 @@
*/
package pl.treksoft.kvision.remote
+import kotlinx.coroutines.Job
import kotlinx.coroutines.asDeferred
+import kotlinx.coroutines.channels.Channel
+import kotlinx.coroutines.channels.ReceiveChannel
+import kotlinx.coroutines.channels.SendChannel
+import kotlinx.coroutines.coroutineScope
+import kotlinx.coroutines.launch
import kotlinx.serialization.ImplicitReflectionSerializer
import kotlinx.serialization.list
import kotlinx.serialization.serializer
+import kotlinx.serialization.stringify
+import kotlin.js.console
import kotlin.js.js
import kotlin.reflect.KClass
import kotlin.js.JSON as NativeJSON
@@ -366,6 +374,178 @@ open class KVRemoteAgent<T : Any>(val serviceManager: KVServiceManager<T>) : Rem
}.asDeferred().await()
}
+ /**
+ * Executes defined web socket connection
+ */
+ suspend inline fun <reified PAR1 : Any, reified PAR2 : Any> webSocket(
+ noinline function: suspend T.(ReceiveChannel<PAR1>, SendChannel<PAR2>) -> Unit,
+ noinline handler: suspend (SendChannel<PAR1>, ReceiveChannel<PAR2>) -> Unit
+ ) {
+ val (url, _) =
+ serviceManager.getCalls()[function.toString().replace("\\s".toRegex(), "")]
+ ?: throw IllegalStateException("Function not specified!")
+ val socket = Socket()
+ val requestChannel = Channel<PAR1>()
+ val responseChannel = Channel<PAR2>()
+ try {
+ coroutineScope {
+ socket.connect(getWebSocketUrl(url))
+ lateinit var responseJob: Job
+ lateinit var handlerJob: Job
+ val requestJob = launch {
+ for (par1 in requestChannel) {
+ val param = serialize(par1)
+ val str = JSON.plain.stringify(JsonRpcRequest(0, url, listOf(param)))
+ if (!socket.sendOrFalse(str)) break
+ }
+ responseJob.cancel()
+ handlerJob.cancel()
+ if (!requestChannel.isClosedForReceive) requestChannel.close()
+ if (!responseChannel.isClosedForSend) responseChannel.close()
+ }
+ responseJob = launch {
+ while (true) {
+ val str = socket.receiveOrNull() ?: break
+ val data = kotlin.js.JSON.parse<dynamic>(str).result
+ val par2 = try {
+ @Suppress("UNCHECKED_CAST")
+ deserialize<PAR2>(data, PAR2::class.js.name)
+ } catch (t: NotStandardTypeException) {
+ try {
+ @Suppress("UNCHECKED_CAST")
+ tryDeserializeEnum(PAR2::class as KClass<Any>, data) as PAR2
+ } catch (t: NotEnumTypeException) {
+ JSON.nonstrict.parse(PAR2::class.serializer(), data)
+ }
+ }
+ responseChannel.send(par2)
+ }
+ requestJob.cancel()
+ handlerJob.cancel()
+ if (!requestChannel.isClosedForReceive) requestChannel.close()
+ if (!responseChannel.isClosedForSend) responseChannel.close()
+ }
+ handlerJob = launch {
+ exceptionHelper {
+ handler(requestChannel, responseChannel)
+ }
+ requestJob.cancel()
+ responseJob.cancel()
+ if (!requestChannel.isClosedForReceive) requestChannel.close()
+ if (!responseChannel.isClosedForSend) responseChannel.close()
+ }
+ }
+ } catch (e: Exception) {
+ console.log(e)
+ }
+ if (!requestChannel.isClosedForReceive) requestChannel.close()
+ if (!responseChannel.isClosedForSend) responseChannel.close()
+ socket.close()
+ }
+
+ /**
+ * Executes defined web socket connection returning list objects
+ */
+ suspend inline fun <reified PAR1 : Any, reified PAR2 : Any> webSocket(
+ noinline function: suspend T.(ReceiveChannel<PAR1>, SendChannel<List<PAR2>>) -> Unit,
+ noinline handler: suspend (SendChannel<PAR1>, ReceiveChannel<List<PAR2>>) -> Unit
+ ) {
+ val (url, _) =
+ serviceManager.getCalls()[function.toString().replace("\\s".toRegex(), "")]
+ ?: throw IllegalStateException("Function not specified!")
+ val socket = Socket()
+ val requestChannel = Channel<PAR1>()
+ val responseChannel = Channel<List<PAR2>>()
+ try {
+ coroutineScope {
+ socket.connect(getWebSocketUrl(url))
+ lateinit var responseJob: Job
+ lateinit var handlerJob: Job
+ val requestJob = launch {
+ for (par1 in requestChannel) {
+ val param = serialize(par1)
+ val str = JSON.plain.stringify(JsonRpcRequest(0, url, listOf(param)))
+ if (!socket.sendOrFalse(str)) break
+ }
+ responseJob.cancel()
+ handlerJob.cancel()
+ if (!requestChannel.isClosedForReceive) requestChannel.close()
+ if (!responseChannel.isClosedForSend) responseChannel.close()
+ }
+ responseJob = launch {
+ while (true) {
+ val str = socket.receiveOrNull() ?: break
+ val data = kotlin.js.JSON.parse<dynamic>(str).result
+ val par2 = try {
+ deserializeList<PAR2>(data, PAR2::class.js.name)
+ } catch (t: NotStandardTypeException) {
+ try {
+ @Suppress("UNCHECKED_CAST")
+ tryDeserializeEnumList(PAR2::class as KClass<Any>, data) as List<PAR2>
+ } catch (t: NotEnumTypeException) {
+ JSON.nonstrict.parse(PAR2::class.serializer().list, data)
+ }
+ }
+ responseChannel.send(par2)
+ }
+ requestJob.cancel()
+ handlerJob.cancel()
+ if (!requestChannel.isClosedForReceive) requestChannel.close()
+ if (!responseChannel.isClosedForSend) responseChannel.close()
+ }
+ handlerJob = launch {
+ exceptionHelper {
+ handler(requestChannel, responseChannel)
+ }
+ requestJob.cancel()
+ responseJob.cancel()
+ if (!requestChannel.isClosedForReceive) requestChannel.close()
+ if (!responseChannel.isClosedForSend) responseChannel.close()
+ }
+ }
+ } catch (e: Exception) {
+ console.log(e)
+ }
+ if (!requestChannel.isClosedForReceive) requestChannel.close()
+ if (!responseChannel.isClosedForSend) responseChannel.close()
+ socket.close()
+ }
+
+ /**
+ * @suppress internal function
+ */
+ suspend fun Socket.receiveOrNull(): String? {
+ return try {
+ this.receive()
+ } catch (e: SocketClosedException) {
+ console.log("Socket was closed: ${e.reason}")
+ null
+ }
+ }
+
+ /**
+ * @suppress internal function
+ */
+ fun Socket.sendOrFalse(str: String): Boolean {
+ return try {
+ this.send(str)
+ true
+ } catch (e: SocketClosedException) {
+ console.log("Socket was closed: ${e.reason}")
+ false
+ }
+ }
+
+ /**
+ * @suppress internal function
+ */
+ suspend fun exceptionHelper(block: suspend () -> Unit) {
+ try {
+ block()
+ } catch (e: Exception) {
+ console.log(e)
+ }
+ }
/**
* @suppress
diff --git a/kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/KVServiceManager.kt b/kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/KVServiceManager.kt
index 4ca6a4a5..0cf72f76 100644
--- a/kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/KVServiceManager.kt
+++ b/kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/KVServiceManager.kt
@@ -21,6 +21,8 @@
*/
package pl.treksoft.kvision.remote
+import kotlinx.coroutines.channels.ReceiveChannel
+import kotlinx.coroutines.channels.SendChannel
import kotlin.reflect.KClass
/**
@@ -138,6 +140,19 @@ actual open class KVServiceManager<T : Any> actual constructor(serviceClass: KCl
}
/**
+ * Binds a given web socket connetion with a function of the receiver.
+ * @param function a function of the receiver
+ * @param route a web socket route
+ */
+ protected actual inline fun <reified PAR1 : Any, reified PAR2 : Any> bind(
+ noinline function: suspend T.(ReceiveChannel<PAR1>, SendChannel<PAR2>) -> Unit,
+ route: String?
+ ) {
+ val routeDef = "route${this::class.simpleName}${counter++}"
+ calls[function.toString().replace("\\s".toRegex(), "")] = Pair("/kv/$routeDef", HttpMethod.POST)
+ }
+
+ /**
* Returns the map of defined paths.
*/
fun getCalls(): Map<String, Pair<String, HttpMethod>> = calls
diff --git a/kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/Socket.kt b/kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/Socket.kt
new file mode 100644
index 00000000..c547ce9b
--- /dev/null
+++ b/kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/Socket.kt
@@ -0,0 +1,171 @@
+/*
+ * Copyright (c) 2017-present Robert Jaros
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+package pl.treksoft.kvision.remote
+
+import kotlinx.coroutines.GlobalScope
+import kotlinx.coroutines.channels.Channel
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.launch
+import org.w3c.dom.CloseEvent
+import org.w3c.dom.ErrorEvent
+import org.w3c.dom.MessageEvent
+import org.w3c.dom.WebSocket
+import org.w3c.dom.WebSocket.Companion.CLOSED
+import org.w3c.dom.WebSocket.Companion.CLOSING
+import org.w3c.dom.WebSocket.Companion.OPEN
+import org.w3c.dom.events.Event
+import kotlin.coroutines.resume
+import kotlin.coroutines.suspendCoroutine
+
+//
+// Code taken from: https://discuss.kotlinlang.org/t/js-coroutines-experiements/8245/2
+//
+
+/**
+ * Websocket closed exception class.
+ */
+class SocketClosedException(val reason: String) : Throwable(reason)
+
+/**
+ * A websocket client implementation.
+ */
+class Socket {
+ private var eventQueue: Channel<Event> = Channel(Channel.UNLIMITED)
+ private lateinit var ws: WebSocket
+ val state: Short
+ get() = ws.readyState
+
+ private fun onWsEvent(event: Event) {
+ GlobalScope.launch { eventQueue.send(event) }
+ }
+
+ /**
+ * Connect to a websocket.
+ */
+ suspend fun connect(url: String, retryDelay: Long = 1000) {
+ while (true) {
+ val connected = suspendCoroutine<Boolean> { cont ->
+ while (eventQueue.poll() != null) {/*drain*/
+ }
+ ws = WebSocket(url)
+ ws.onopen = {
+ ws.onclose = ::onWsEvent
+ ws.onerror = ::onWsEvent
+ cont.resume(true)
+ }
+ ws.onmessage = ::onWsEvent
+ ws.onerror = {
+ logError(it)
+ }
+ ws.onclose = {
+ cont.resume(false)
+ }
+ }
+ if (connected)
+ break
+ else
+ delay(retryDelay)
+ }
+ }
+
+ /**
+ * Receive a string from a websocket.
+ */
+ suspend fun receive(): String {
+ val event = eventQueue.receive()
+ return when (event) {
+ is MessageEvent -> {
+ event.data as String
+ }
+ is CloseEvent -> {
+ val reason = getReason(event.code)
+ throw SocketClosedException(reason)
+ }
+ is ErrorEvent -> {
+ logError(event)
+ close()
+ throw SocketClosedException(event.message)
+ }
+ else -> {
+ val reason = getReason(4001)
+ console.error(reason)
+ close()
+ throw SocketClosedException(reason)
+ }
+ }
+ }
+
+ /**
+ * Send string to a websocket.
+ */
+ fun send(obj: String) {
+ when {
+ isClosed() -> {
+ console.error(getReason(4002))
+ throw SocketClosedException(getReason(4002))
+ }
+ else -> ws.send(obj)
+ }
+ }
+
+ /**
+ * Close a websocket.
+ */
+ fun close(code: Short = 1000) {
+ when (state) {
+ OPEN -> ws.close(code, getReason(1000))
+ }
+ }
+
+ /**
+ * Returns if a websocket is closed.
+ */
+ fun isClosed(): Boolean {
+ return when (state) {
+ CLOSED, CLOSING -> true
+ else -> false
+ }
+ }
+
+ private fun logError(event: Event) = console.error("An error %o occurred when connecting to ${ws.url}", event)
+
+ private fun getReason(code: Short): String {
+ return when (code.toInt()) { // See http://tools.ietf.org/html/rfc6455#section-7.4.1
+ 1000 -> "Normal closure"
+ 1001 -> "An endpoint is \"going away\", such as a server going down or a browser having navigated away from a page."
+ 1002 -> "An endpoint is terminating the connection due to a protocol error"
+ 1003 -> "An endpoint is terminating the connection because it has received a type of data it cannot accept (e.g., an endpoint that understands only text data MAY send this if it receives a binary message)."
+ 1004 -> "Reserved. The specific meaning might be defined in the future."
+ 1005 -> "No status code was actually present."
+ 1006 -> "The connection was closed abnormally, e.g., without sending or receiving a Close control frame"
+ 1007 -> "An endpoint is terminating the connection because it has received data within a message that was not consistent with the type of the message (e.g., non-UTF-8 [http://tools.ietf.org/html/rfc3629] data within a text message)."
+ 1008 -> "An endpoint is terminating the connection because it has received a message that \"violates its policy\". This reason is given either if there is no other sutible reason, or if there is a need to hide specific details about the policy."
+ 1009 -> "An endpoint is terminating the connection because it has received a message that is too big for it to process."
+ 1010 -> "An endpoint (client ) is terminating the connection because it has expected the server to negotiate one or more extension, but the server didn't return them in the response message of the WebSocket handshake. <br /> Specifically, the extensions that are needed are: "
+ 1011 -> "A server is terminating the connection because it encountered an unexpected condition that prevented it from fulfilling the request."
+ 1015 -> "The connection was closed due to a failure to perform a TLS handshake (e.g., the server certificate can't be verified)."
+ 4001 -> "Unexpected event"
+ 4002 -> "You are trying to use closed socket"
+ else -> "Unknown reason"
+ }
+ }
+}
diff --git a/kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/Utils.kt b/kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/Utils.kt
index 273c2a65..c4f4ed6d 100644
--- a/kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/Utils.kt
+++ b/kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/Utils.kt
@@ -25,6 +25,7 @@ import kotlinx.serialization.SerializationStrategy
import kotlinx.serialization.context.SimpleModule
import kotlinx.serialization.json.Json
import pl.treksoft.kvision.types.JsonDateSerializer
+import kotlin.browser.window
import kotlin.js.Date
/**
@@ -60,3 +61,14 @@ object JSON {
return kotlin.js.JSON.parse(plain.stringify(serializer, this))
}
}
+
+/**
+ * Creates a websocket URL from current window.location and given path.
+ */
+fun getWebSocketUrl(url: String): String {
+ val location = window.location
+ val scheme = if (location.protocol == "https:") "wss" else "ws"
+ val port = if (location.port == "8088") ":8080"
+ else if (location.port != "0" && location.port != "") ":${location.port}" else ""
+ return "$scheme://${location.hostname}$port$url"
+}