aboutsummaryrefslogtreecommitdiff
path: root/kvision-modules
diff options
context:
space:
mode:
authorRobert Jaros <rjaros@finn.pl>2019-03-31 00:06:25 +0100
committerRobert Jaros <rjaros@finn.pl>2019-03-31 00:06:25 +0100
commitf1f0423f63cc9f39fa74fc9b68bb680f1d1abd31 (patch)
treedc083b68749b09d7d8c0ed4bfa90262feac83624 /kvision-modules
parent967f6278ded0d540ce3d7613bd0d39338ef14d63 (diff)
downloadkvision-f1f0423f63cc9f39fa74fc9b68bb680f1d1abd31.tar.gz
kvision-f1f0423f63cc9f39fa74fc9b68bb680f1d1abd31.tar.bz2
kvision-f1f0423f63cc9f39fa74fc9b68bb680f1d1abd31.zip
Websockets implementation
Diffstat (limited to 'kvision-modules')
-rw-r--r--kvision-modules/kvision-common-remote/src/main/kotlin/pl/treksoft/kvision/remote/KVServiceManager.kt11
-rw-r--r--kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/KVRemoteAgent.kt180
-rw-r--r--kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/KVServiceManager.kt15
-rw-r--r--kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/Socket.kt171
-rw-r--r--kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/Utils.kt12
-rw-r--r--kvision-modules/kvision-server-jooby/src/main/kotlin/pl/treksoft/kvision/remote/KVServiceManager.kt15
-rw-r--r--kvision-modules/kvision-server-ktor/build.gradle1
-rw-r--r--kvision-modules/kvision-server-ktor/src/main/kotlin/pl/treksoft/kvision/remote/KVModules.kt55
-rw-r--r--kvision-modules/kvision-server-ktor/src/main/kotlin/pl/treksoft/kvision/remote/KVServiceManager.kt80
-rw-r--r--kvision-modules/kvision-server-spring-boot/src/main/kotlin/pl/treksoft/kvision/remote/KVServiceManager.kt14
10 files changed, 547 insertions, 7 deletions
diff --git a/kvision-modules/kvision-common-remote/src/main/kotlin/pl/treksoft/kvision/remote/KVServiceManager.kt b/kvision-modules/kvision-common-remote/src/main/kotlin/pl/treksoft/kvision/remote/KVServiceManager.kt
index 514fb07d..e89744f6 100644
--- a/kvision-modules/kvision-common-remote/src/main/kotlin/pl/treksoft/kvision/remote/KVServiceManager.kt
+++ b/kvision-modules/kvision-common-remote/src/main/kotlin/pl/treksoft/kvision/remote/KVServiceManager.kt
@@ -21,6 +21,8 @@
*/
package pl.treksoft.kvision.remote
+import kotlinx.coroutines.channels.ReceiveChannel
+import kotlinx.coroutines.channels.SendChannel
import kotlin.reflect.KClass
enum class HttpMethod {
@@ -115,4 +117,13 @@ expect open class KVServiceManager<T : Any>(serviceClass: KClass<T>) {
protected fun bind(
function: T.(String?, String?) -> List<RemoteSelectOption>
)
+
+ /**
+ * Binds a given function of the receiver as a web socket connection
+ * @param function a function of the receiver
+ */
+ protected inline fun <reified PAR1 : Any, reified PAR2 : Any> bind(
+ noinline function: suspend T.(ReceiveChannel<PAR1>, SendChannel<PAR2>) -> Unit,
+ route: String? = null
+ )
}
diff --git a/kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/KVRemoteAgent.kt b/kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/KVRemoteAgent.kt
index 6054592e..3e538bd1 100644
--- a/kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/KVRemoteAgent.kt
+++ b/kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/KVRemoteAgent.kt
@@ -21,10 +21,18 @@
*/
package pl.treksoft.kvision.remote
+import kotlinx.coroutines.Job
import kotlinx.coroutines.asDeferred
+import kotlinx.coroutines.channels.Channel
+import kotlinx.coroutines.channels.ReceiveChannel
+import kotlinx.coroutines.channels.SendChannel
+import kotlinx.coroutines.coroutineScope
+import kotlinx.coroutines.launch
import kotlinx.serialization.ImplicitReflectionSerializer
import kotlinx.serialization.list
import kotlinx.serialization.serializer
+import kotlinx.serialization.stringify
+import kotlin.js.console
import kotlin.js.js
import kotlin.reflect.KClass
import kotlin.js.JSON as NativeJSON
@@ -366,6 +374,178 @@ open class KVRemoteAgent<T : Any>(val serviceManager: KVServiceManager<T>) : Rem
}.asDeferred().await()
}
+ /**
+ * Executes defined web socket connection
+ */
+ suspend inline fun <reified PAR1 : Any, reified PAR2 : Any> webSocket(
+ noinline function: suspend T.(ReceiveChannel<PAR1>, SendChannel<PAR2>) -> Unit,
+ noinline handler: suspend (SendChannel<PAR1>, ReceiveChannel<PAR2>) -> Unit
+ ) {
+ val (url, _) =
+ serviceManager.getCalls()[function.toString().replace("\\s".toRegex(), "")]
+ ?: throw IllegalStateException("Function not specified!")
+ val socket = Socket()
+ val requestChannel = Channel<PAR1>()
+ val responseChannel = Channel<PAR2>()
+ try {
+ coroutineScope {
+ socket.connect(getWebSocketUrl(url))
+ lateinit var responseJob: Job
+ lateinit var handlerJob: Job
+ val requestJob = launch {
+ for (par1 in requestChannel) {
+ val param = serialize(par1)
+ val str = JSON.plain.stringify(JsonRpcRequest(0, url, listOf(param)))
+ if (!socket.sendOrFalse(str)) break
+ }
+ responseJob.cancel()
+ handlerJob.cancel()
+ if (!requestChannel.isClosedForReceive) requestChannel.close()
+ if (!responseChannel.isClosedForSend) responseChannel.close()
+ }
+ responseJob = launch {
+ while (true) {
+ val str = socket.receiveOrNull() ?: break
+ val data = kotlin.js.JSON.parse<dynamic>(str).result
+ val par2 = try {
+ @Suppress("UNCHECKED_CAST")
+ deserialize<PAR2>(data, PAR2::class.js.name)
+ } catch (t: NotStandardTypeException) {
+ try {
+ @Suppress("UNCHECKED_CAST")
+ tryDeserializeEnum(PAR2::class as KClass<Any>, data) as PAR2
+ } catch (t: NotEnumTypeException) {
+ JSON.nonstrict.parse(PAR2::class.serializer(), data)
+ }
+ }
+ responseChannel.send(par2)
+ }
+ requestJob.cancel()
+ handlerJob.cancel()
+ if (!requestChannel.isClosedForReceive) requestChannel.close()
+ if (!responseChannel.isClosedForSend) responseChannel.close()
+ }
+ handlerJob = launch {
+ exceptionHelper {
+ handler(requestChannel, responseChannel)
+ }
+ requestJob.cancel()
+ responseJob.cancel()
+ if (!requestChannel.isClosedForReceive) requestChannel.close()
+ if (!responseChannel.isClosedForSend) responseChannel.close()
+ }
+ }
+ } catch (e: Exception) {
+ console.log(e)
+ }
+ if (!requestChannel.isClosedForReceive) requestChannel.close()
+ if (!responseChannel.isClosedForSend) responseChannel.close()
+ socket.close()
+ }
+
+ /**
+ * Executes defined web socket connection returning list objects
+ */
+ suspend inline fun <reified PAR1 : Any, reified PAR2 : Any> webSocket(
+ noinline function: suspend T.(ReceiveChannel<PAR1>, SendChannel<List<PAR2>>) -> Unit,
+ noinline handler: suspend (SendChannel<PAR1>, ReceiveChannel<List<PAR2>>) -> Unit
+ ) {
+ val (url, _) =
+ serviceManager.getCalls()[function.toString().replace("\\s".toRegex(), "")]
+ ?: throw IllegalStateException("Function not specified!")
+ val socket = Socket()
+ val requestChannel = Channel<PAR1>()
+ val responseChannel = Channel<List<PAR2>>()
+ try {
+ coroutineScope {
+ socket.connect(getWebSocketUrl(url))
+ lateinit var responseJob: Job
+ lateinit var handlerJob: Job
+ val requestJob = launch {
+ for (par1 in requestChannel) {
+ val param = serialize(par1)
+ val str = JSON.plain.stringify(JsonRpcRequest(0, url, listOf(param)))
+ if (!socket.sendOrFalse(str)) break
+ }
+ responseJob.cancel()
+ handlerJob.cancel()
+ if (!requestChannel.isClosedForReceive) requestChannel.close()
+ if (!responseChannel.isClosedForSend) responseChannel.close()
+ }
+ responseJob = launch {
+ while (true) {
+ val str = socket.receiveOrNull() ?: break
+ val data = kotlin.js.JSON.parse<dynamic>(str).result
+ val par2 = try {
+ deserializeList<PAR2>(data, PAR2::class.js.name)
+ } catch (t: NotStandardTypeException) {
+ try {
+ @Suppress("UNCHECKED_CAST")
+ tryDeserializeEnumList(PAR2::class as KClass<Any>, data) as List<PAR2>
+ } catch (t: NotEnumTypeException) {
+ JSON.nonstrict.parse(PAR2::class.serializer().list, data)
+ }
+ }
+ responseChannel.send(par2)
+ }
+ requestJob.cancel()
+ handlerJob.cancel()
+ if (!requestChannel.isClosedForReceive) requestChannel.close()
+ if (!responseChannel.isClosedForSend) responseChannel.close()
+ }
+ handlerJob = launch {
+ exceptionHelper {
+ handler(requestChannel, responseChannel)
+ }
+ requestJob.cancel()
+ responseJob.cancel()
+ if (!requestChannel.isClosedForReceive) requestChannel.close()
+ if (!responseChannel.isClosedForSend) responseChannel.close()
+ }
+ }
+ } catch (e: Exception) {
+ console.log(e)
+ }
+ if (!requestChannel.isClosedForReceive) requestChannel.close()
+ if (!responseChannel.isClosedForSend) responseChannel.close()
+ socket.close()
+ }
+
+ /**
+ * @suppress internal function
+ */
+ suspend fun Socket.receiveOrNull(): String? {
+ return try {
+ this.receive()
+ } catch (e: SocketClosedException) {
+ console.log("Socket was closed: ${e.reason}")
+ null
+ }
+ }
+
+ /**
+ * @suppress internal function
+ */
+ fun Socket.sendOrFalse(str: String): Boolean {
+ return try {
+ this.send(str)
+ true
+ } catch (e: SocketClosedException) {
+ console.log("Socket was closed: ${e.reason}")
+ false
+ }
+ }
+
+ /**
+ * @suppress internal function
+ */
+ suspend fun exceptionHelper(block: suspend () -> Unit) {
+ try {
+ block()
+ } catch (e: Exception) {
+ console.log(e)
+ }
+ }
/**
* @suppress
diff --git a/kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/KVServiceManager.kt b/kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/KVServiceManager.kt
index 4ca6a4a5..0cf72f76 100644
--- a/kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/KVServiceManager.kt
+++ b/kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/KVServiceManager.kt
@@ -21,6 +21,8 @@
*/
package pl.treksoft.kvision.remote
+import kotlinx.coroutines.channels.ReceiveChannel
+import kotlinx.coroutines.channels.SendChannel
import kotlin.reflect.KClass
/**
@@ -138,6 +140,19 @@ actual open class KVServiceManager<T : Any> actual constructor(serviceClass: KCl
}
/**
+ * Binds a given web socket connetion with a function of the receiver.
+ * @param function a function of the receiver
+ * @param route a web socket route
+ */
+ protected actual inline fun <reified PAR1 : Any, reified PAR2 : Any> bind(
+ noinline function: suspend T.(ReceiveChannel<PAR1>, SendChannel<PAR2>) -> Unit,
+ route: String?
+ ) {
+ val routeDef = "route${this::class.simpleName}${counter++}"
+ calls[function.toString().replace("\\s".toRegex(), "")] = Pair("/kv/$routeDef", HttpMethod.POST)
+ }
+
+ /**
* Returns the map of defined paths.
*/
fun getCalls(): Map<String, Pair<String, HttpMethod>> = calls
diff --git a/kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/Socket.kt b/kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/Socket.kt
new file mode 100644
index 00000000..c547ce9b
--- /dev/null
+++ b/kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/Socket.kt
@@ -0,0 +1,171 @@
+/*
+ * Copyright (c) 2017-present Robert Jaros
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+package pl.treksoft.kvision.remote
+
+import kotlinx.coroutines.GlobalScope
+import kotlinx.coroutines.channels.Channel
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.launch
+import org.w3c.dom.CloseEvent
+import org.w3c.dom.ErrorEvent
+import org.w3c.dom.MessageEvent
+import org.w3c.dom.WebSocket
+import org.w3c.dom.WebSocket.Companion.CLOSED
+import org.w3c.dom.WebSocket.Companion.CLOSING
+import org.w3c.dom.WebSocket.Companion.OPEN
+import org.w3c.dom.events.Event
+import kotlin.coroutines.resume
+import kotlin.coroutines.suspendCoroutine
+
+//
+// Code taken from: https://discuss.kotlinlang.org/t/js-coroutines-experiements/8245/2
+//
+
+/**
+ * Websocket closed exception class.
+ */
+class SocketClosedException(val reason: String) : Throwable(reason)
+
+/**
+ * A websocket client implementation.
+ */
+class Socket {
+ private var eventQueue: Channel<Event> = Channel(Channel.UNLIMITED)
+ private lateinit var ws: WebSocket
+ val state: Short
+ get() = ws.readyState
+
+ private fun onWsEvent(event: Event) {
+ GlobalScope.launch { eventQueue.send(event) }
+ }
+
+ /**
+ * Connect to a websocket.
+ */
+ suspend fun connect(url: String, retryDelay: Long = 1000) {
+ while (true) {
+ val connected = suspendCoroutine<Boolean> { cont ->
+ while (eventQueue.poll() != null) {/*drain*/
+ }
+ ws = WebSocket(url)
+ ws.onopen = {
+ ws.onclose = ::onWsEvent
+ ws.onerror = ::onWsEvent
+ cont.resume(true)
+ }
+ ws.onmessage = ::onWsEvent
+ ws.onerror = {
+ logError(it)
+ }
+ ws.onclose = {
+ cont.resume(false)
+ }
+ }
+ if (connected)
+ break
+ else
+ delay(retryDelay)
+ }
+ }
+
+ /**
+ * Receive a string from a websocket.
+ */
+ suspend fun receive(): String {
+ val event = eventQueue.receive()
+ return when (event) {
+ is MessageEvent -> {
+ event.data as String
+ }
+ is CloseEvent -> {
+ val reason = getReason(event.code)
+ throw SocketClosedException(reason)
+ }
+ is ErrorEvent -> {
+ logError(event)
+ close()
+ throw SocketClosedException(event.message)
+ }
+ else -> {
+ val reason = getReason(4001)
+ console.error(reason)
+ close()
+ throw SocketClosedException(reason)
+ }
+ }
+ }
+
+ /**
+ * Send string to a websocket.
+ */
+ fun send(obj: String) {
+ when {
+ isClosed() -> {
+ console.error(getReason(4002))
+ throw SocketClosedException(getReason(4002))
+ }
+ else -> ws.send(obj)
+ }
+ }
+
+ /**
+ * Close a websocket.
+ */
+ fun close(code: Short = 1000) {
+ when (state) {
+ OPEN -> ws.close(code, getReason(1000))
+ }
+ }
+
+ /**
+ * Returns if a websocket is closed.
+ */
+ fun isClosed(): Boolean {
+ return when (state) {
+ CLOSED, CLOSING -> true
+ else -> false
+ }
+ }
+
+ private fun logError(event: Event) = console.error("An error %o occurred when connecting to ${ws.url}", event)
+
+ private fun getReason(code: Short): String {
+ return when (code.toInt()) { // See http://tools.ietf.org/html/rfc6455#section-7.4.1
+ 1000 -> "Normal closure"
+ 1001 -> "An endpoint is \"going away\", such as a server going down or a browser having navigated away from a page."
+ 1002 -> "An endpoint is terminating the connection due to a protocol error"
+ 1003 -> "An endpoint is terminating the connection because it has received a type of data it cannot accept (e.g., an endpoint that understands only text data MAY send this if it receives a binary message)."
+ 1004 -> "Reserved. The specific meaning might be defined in the future."
+ 1005 -> "No status code was actually present."
+ 1006 -> "The connection was closed abnormally, e.g., without sending or receiving a Close control frame"
+ 1007 -> "An endpoint is terminating the connection because it has received data within a message that was not consistent with the type of the message (e.g., non-UTF-8 [http://tools.ietf.org/html/rfc3629] data within a text message)."
+ 1008 -> "An endpoint is terminating the connection because it has received a message that \"violates its policy\". This reason is given either if there is no other sutible reason, or if there is a need to hide specific details about the policy."
+ 1009 -> "An endpoint is terminating the connection because it has received a message that is too big for it to process."
+ 1010 -> "An endpoint (client ) is terminating the connection because it has expected the server to negotiate one or more extension, but the server didn't return them in the response message of the WebSocket handshake. <br /> Specifically, the extensions that are needed are: "
+ 1011 -> "A server is terminating the connection because it encountered an unexpected condition that prevented it from fulfilling the request."
+ 1015 -> "The connection was closed due to a failure to perform a TLS handshake (e.g., the server certificate can't be verified)."
+ 4001 -> "Unexpected event"
+ 4002 -> "You are trying to use closed socket"
+ else -> "Unknown reason"
+ }
+ }
+}
diff --git a/kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/Utils.kt b/kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/Utils.kt
index 273c2a65..c4f4ed6d 100644
--- a/kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/Utils.kt
+++ b/kvision-modules/kvision-remote/src/main/kotlin/pl/treksoft/kvision/remote/Utils.kt
@@ -25,6 +25,7 @@ import kotlinx.serialization.SerializationStrategy
import kotlinx.serialization.context.SimpleModule
import kotlinx.serialization.json.Json
import pl.treksoft.kvision.types.JsonDateSerializer
+import kotlin.browser.window
import kotlin.js.Date
/**
@@ -60,3 +61,14 @@ object JSON {
return kotlin.js.JSON.parse(plain.stringify(serializer, this))
}
}
+
+/**
+ * Creates a websocket URL from current window.location and given path.
+ */
+fun getWebSocketUrl(url: String): String {
+ val location = window.location
+ val scheme = if (location.protocol == "https:") "wss" else "ws"
+ val port = if (location.port == "8088") ":8080"
+ else if (location.port != "0" && location.port != "") ":${location.port}" else ""
+ return "$scheme://${location.hostname}$port$url"
+}
diff --git a/kvision-modules/kvision-server-jooby/src/main/kotlin/pl/treksoft/kvision/remote/KVServiceManager.kt b/kvision-modules/kvision-server-jooby/src/main/kotlin/pl/treksoft/kvision/remote/KVServiceManager.kt
index d1e64ce4..cfd9d467 100644
--- a/kvision-modules/kvision-server-jooby/src/main/kotlin/pl/treksoft/kvision/remote/KVServiceManager.kt
+++ b/kvision-modules/kvision-server-jooby/src/main/kotlin/pl/treksoft/kvision/remote/KVServiceManager.kt
@@ -25,6 +25,8 @@ import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import com.google.inject.Injector
import kotlinx.coroutines.CoroutineStart
import kotlinx.coroutines.GlobalScope
+import kotlinx.coroutines.channels.ReceiveChannel
+import kotlinx.coroutines.channels.SendChannel
import kotlinx.coroutines.launch
import org.jooby.Kooby
import org.jooby.Request
@@ -311,6 +313,19 @@ actual open class KVServiceManager<T : Any> actual constructor(val serviceClass:
}
/**
+ * Binds a given web socket connetion with a function of the receiver.
+ * @param function a function of the receiver
+ * @param route a route
+ */
+ protected actual inline fun <reified PAR1 : Any, reified PAR2 : Any> bind(
+ noinline function: suspend T.(ReceiveChannel<PAR1>, SendChannel<PAR2>) -> Unit,
+ route: String?
+ ) {
+ TODO("Not implemented in Jooby module")
+ }
+
+
+ /**
* Binds a given function of the receiver as a select options source
* @param function a function of the receiver
*/
diff --git a/kvision-modules/kvision-server-ktor/build.gradle b/kvision-modules/kvision-server-ktor/build.gradle
index 62b5fe83..44be9171 100644
--- a/kvision-modules/kvision-server-ktor/build.gradle
+++ b/kvision-modules/kvision-server-ktor/build.gradle
@@ -10,6 +10,7 @@ dependencies {
compile "org.jetbrains.kotlinx:kotlinx-coroutines-core:$coroutinesVersion"
compile "io.ktor:ktor-server-core:$ktorVersion"
compile "io.ktor:ktor-jackson:$ktorVersion"
+ compile "io.ktor:ktor-websockets:$ktorVersion"
compile "com.google.inject:guice:$guiceVersion"
compile "com.fasterxml.jackson.module:jackson-module-kotlin:$jacksonModuleKotlinVersion"
testCompile "org.jetbrains.kotlin:kotlin-test:$kotlinVersion"
diff --git a/kvision-modules/kvision-server-ktor/src/main/kotlin/pl/treksoft/kvision/remote/KVModules.kt b/kvision-modules/kvision-server-ktor/src/main/kotlin/pl/treksoft/kvision/remote/KVModules.kt
index 956e7301..e2edc175 100644
--- a/kvision-modules/kvision-server-ktor/src/main/kotlin/pl/treksoft/kvision/remote/KVModules.kt
+++ b/kvision-modules/kvision-server-ktor/src/main/kotlin/pl/treksoft/kvision/remote/KVModules.kt
@@ -31,17 +31,25 @@ import io.ktor.application.ApplicationCallPipeline
import io.ktor.application.call
import io.ktor.application.install
import io.ktor.features.ContentNegotiation
+import io.ktor.http.cio.websocket.Frame
import io.ktor.http.content.defaultResource
import io.ktor.http.content.resources
import io.ktor.http.content.static
import io.ktor.jackson.jackson
import io.ktor.routing.routing
import io.ktor.util.AttributeKey
+import io.ktor.util.KtorExperimentalAPI
+import io.ktor.websocket.WebSocketServerSession
+import io.ktor.websocket.WebSockets
+import kotlinx.coroutines.channels.ReceiveChannel
+import kotlinx.coroutines.channels.SendChannel
+import kotlin.coroutines.CoroutineContext
fun Application.kvisionInit(vararg modules: Module) {
install(ContentNegotiation) {
jackson()
}
+ install(WebSockets)
routing {
static("/") {
resources("assets")
@@ -72,3 +80,50 @@ class MainModule(private val application: Application) : AbstractModule() {
bind(Application::class.java).toInstance(application)
}
}
+
+class WsSessionModule(private val webSocketSession: WebSocketServerSession) :
+ AbstractModule() {
+ override fun configure() {
+ bind(WebSocketServerSession::class.java).toInstance(webSocketSession)
+ }
+}
+
+class DummyWsSessionModule() : AbstractModule() {
+ override fun configure() {
+ bind(WebSocketServerSession::class.java).toInstance(DummyWebSocketServerSession())
+ }
+}
+
+class DummyWebSocketServerSession : WebSocketServerSession {
+ override val call: ApplicationCall
+ get() = throw UnsupportedOperationException()
+ override val coroutineContext: CoroutineContext
+ get() = throw UnsupportedOperationException()
+ override val incoming: ReceiveChannel<Frame>
+ get() = throw UnsupportedOperationException()
+ override var masking: Boolean
+ get() = throw UnsupportedOperationException()
+ set(value) {
+ throw UnsupportedOperationException()
+ }
+ override var maxFrameSize: Long
+ get() = throw UnsupportedOperationException()
+ set(value) {
+ throw UnsupportedOperationException()
+ }
+ override val outgoing: SendChannel<Frame>
+ get() = throw UnsupportedOperationException()
+
+ @UseExperimental(KtorExperimentalAPI::class)
+ override suspend fun close(cause: Throwable?) {
+ throw UnsupportedOperationException()
+ }
+
+ override suspend fun flush() {
+ throw UnsupportedOperationException()
+ }
+
+ override fun terminate() {
+ throw UnsupportedOperationException()
+ }
+}
diff --git a/kvision-modules/kvision-server-ktor/src/main/kotlin/pl/treksoft/kvision/remote/KVServiceManager.kt b/kvision-modules/kvision-server-ktor/src/main/kotlin/pl/treksoft/kvision/remote/KVServiceManager.kt
index d4985adf..0839a5c5 100644
--- a/kvision-modules/kvision-server-ktor/src/main/kotlin/pl/treksoft/kvision/remote/KVServiceManager.kt
+++ b/kvision-modules/kvision-server-ktor/src/main/kotlin/pl/treksoft/kvision/remote/KVServiceManager.kt
@@ -24,6 +24,10 @@ package pl.treksoft.kvision.remote
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import io.ktor.application.ApplicationCall
import io.ktor.application.call
+import io.ktor.http.cio.websocket.CloseReason
+import io.ktor.http.cio.websocket.Frame
+import io.ktor.http.cio.websocket.close
+import io.ktor.http.cio.websocket.readText
import io.ktor.request.receive
import io.ktor.response.respond
import io.ktor.routing.Route
@@ -33,7 +37,17 @@ import io.ktor.routing.options
import io.ktor.routing.post
import io.ktor.routing.put
import io.ktor.util.pipeline.PipelineContext
+import io.ktor.websocket.WebSocketServerSession
+import io.ktor.websocket.webSocket
import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.channels.Channel
+import kotlinx.coroutines.channels.ReceiveChannel
+import kotlinx.coroutines.channels.SendChannel
+import kotlinx.coroutines.channels.filterNotNull
+import kotlinx.coroutines.channels.map
+import kotlinx.coroutines.channels.mapNotNull
+import kotlinx.coroutines.coroutineScope
+import kotlinx.coroutines.launch
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import kotlin.reflect.KClass
@@ -55,6 +69,8 @@ actual open class KVServiceManager<T : Any> actual constructor(val serviceClass:
mutableMapOf()
val optionsRequests: MutableMap<String, suspend PipelineContext<Unit, ApplicationCall>.(Unit) -> Unit> =
mutableMapOf()
+ val webSocketRequests: MutableMap<String, suspend WebSocketServerSession.() -> Unit> =
+ mutableMapOf()
val mapper = jacksonObjectMapper()
var counter: Int = 0
@@ -72,7 +88,7 @@ actual open class KVServiceManager<T : Any> actual constructor(val serviceClass:
) {
val routeDef = route ?: "route${this::class.simpleName}${counter++}"
addRoute(method, "/kv/$routeDef") {
- val service = call.injector.getInstance(serviceClass.java)
+ val service = call.injector.createChildInjector(DummyWsSessionModule()).getInstance(serviceClass.java)
val jsonRpcRequest = if (method == HttpMethod.GET) {
JsonRpcRequest(call.request.queryParameters["id"]?.toInt() ?: 0, "", listOf())
} else {
@@ -113,7 +129,7 @@ actual open class KVServiceManager<T : Any> actual constructor(val serviceClass:
throw UnsupportedOperationException("GET method is only supported for methods without parameters")
val routeDef = route ?: "route${this::class.simpleName}${counter++}"
addRoute(method, "/kv/$routeDef") {
- val service = call.injector.getInstance(serviceClass.java)
+ val service = call.injector.createChildInjector(DummyWsSessionModule()).getInstance(serviceClass.java)
val jsonRpcRequest = call.receive<JsonRpcRequest>()
if (jsonRpcRequest.params.size == 1) {
val param = getParameter<PAR>(jsonRpcRequest.params[0])
@@ -160,7 +176,7 @@ actual open class KVServiceManager<T : Any> actual constructor(val serviceClass:
throw UnsupportedOperationException("GET method is only supported for methods without parameters")
val routeDef = route ?: "route${this::class.simpleName}${counter++}"
addRoute(method, "/kv/$routeDef") {
- val service = call.injector.getInstance(serviceClass.java)
+ val service = call.injector.createChildInjector(DummyWsSessionModule()).getInstance(serviceClass.java)
val jsonRpcRequest = call.receive<JsonRpcRequest>()
if (jsonRpcRequest.params.size == 2) {
val param1 = getParameter<PAR1>(jsonRpcRequest.params[0])
@@ -208,7 +224,7 @@ actual open class KVServiceManager<T : Any> actual constructor(val serviceClass:
throw UnsupportedOperationException("GET method is only supported for methods without parameters")
val routeDef = route ?: "route${this::class.simpleName}${counter++}"
addRoute(method, "/kv/$routeDef") {
- val service = call.injector.getInstance(serviceClass.java)
+ val service = call.injector.createChildInjector(DummyWsSessionModule()).getInstance(serviceClass.java)
val jsonRpcRequest = call.receive<JsonRpcRequest>()
@Suppress("MagicNumber")
if (jsonRpcRequest.params.size == 3) {
@@ -258,7 +274,7 @@ actual open class KVServiceManager<T : Any> actual constructor(val serviceClass:
throw UnsupportedOperationException("GET method is only supported for methods without parameters")
val routeDef = route ?: "route${this::class.simpleName}${counter++}"
addRoute(method, "/kv/$routeDef") {
- val service = call.injector.getInstance(serviceClass.java)
+ val service = call.injector.createChildInjector(DummyWsSessionModule()).getInstance(serviceClass.java)
val jsonRpcRequest = call.receive<JsonRpcRequest>()
@Suppress("MagicNumber")
if (jsonRpcRequest.params.size == 4) {
@@ -310,7 +326,7 @@ actual open class KVServiceManager<T : Any> actual constructor(val serviceClass:
throw UnsupportedOperationException("GET method is only supported for methods without parameters")
val routeDef = route ?: "route${this::class.simpleName}${counter++}"
addRoute(method, "/kv/$routeDef") {
- val service = call.injector.getInstance(serviceClass.java)
+ val service = call.injector.createChildInjector(DummyWsSessionModule()).getInstance(serviceClass.java)
val jsonRpcRequest = call.receive<JsonRpcRequest>()
@Suppress("MagicNumber")
if (jsonRpcRequest.params.size == 5) {
@@ -348,6 +364,51 @@ actual open class KVServiceManager<T : Any> actual constructor(val serviceClass:
}
/**
+ * Binds a given web socket connetion with a function of the receiver.
+ * @param function a function of the receiver
+ * @param route a route
+ */
+ protected actual inline fun <reified PAR1 : Any, reified PAR2 : Any> bind(
+ noinline function: suspend T.(ReceiveChannel<PAR1>, SendChannel<PAR2>) -> Unit,
+ route: String?
+ ) {
+ val routeDef = "route${this::class.simpleName}${counter++}"
+ webSocketRequests["/kv/$routeDef"] = {
+ val wsInjector = call.injector.createChildInjector(WsSessionModule(this))
+ val service = wsInjector.getInstance(serviceClass.java)
+ val requestChannel = incoming.mapNotNull { it as? Frame.Text }.map {
+ val jsonRpcRequest = getParameter<JsonRpcRequest>(it.readText())
+ if (jsonRpcRequest.params.size == 1) {
+ getParameter<PAR1>(jsonRpcRequest.params[0])
+ } else {
+ null
+ }
+ }.filterNotNull()
+ val responseChannel = Channel<PAR2>()
+ val session = this
+ coroutineScope {
+ launch {
+ for (p in responseChannel) {
+ val text = mapper.writeValueAsString(
+ JsonRpcResponse(
+ id = 0,
+ result = mapper.writeValueAsString(p)
+ )
+ )
+ outgoing.send(Frame.Text(text))
+ }
+ session.close(CloseReason(CloseReason.Codes.NORMAL, ""))
+ session.close()
+ }
+ launch {
+ function.invoke(service, requestChannel, responseChannel)
+ if (!responseChannel.isClosedForReceive) responseChannel.close()
+ }
+ }
+ }
+ }
+
+ /**
* Binds a given function of the receiver as a select options source
* @param function a function of the receiver
*/
@@ -357,7 +418,7 @@ actual open class KVServiceManager<T : Any> actual constructor(val serviceClass:
) {
val routeDef = "route${this::class.simpleName}${counter++}"
addRoute(HttpMethod.POST, "/kv/$routeDef") {
- val service = call.injector.getInstance(serviceClass.java)
+ val service = call.injector.createChildInjector(DummyWsSessionModule()).getInstance(serviceClass.java)
val jsonRpcRequest = call.receive<JsonRpcRequest>()
if (jsonRpcRequest.params.size == 2) {
val param1 = getParameter<String?>(jsonRpcRequest.params[0])
@@ -431,4 +492,9 @@ fun <T : Any> Route.applyRoutes(serviceManager: KVServiceManager<T>) {
serviceManager.optionsRequests.forEach { (path, handler) ->
options(path, handler)
}
+ serviceManager.webSocketRequests.forEach { (path, handler) ->
+ this.webSocket(path) {
+ handler()
+ }
+ }
}
diff --git a/kvision-modules/kvision-server-spring-boot/src/main/kotlin/pl/treksoft/kvision/remote/KVServiceManager.kt b/kvision-modules/kvision-server-spring-boot/src/main/kotlin/pl/treksoft/kvision/remote/KVServiceManager.kt
index 66dc4b99..66429acf 100644
--- a/kvision-modules/kvision-server-spring-boot/src/main/kotlin/pl/treksoft/kvision/remote/KVServiceManager.kt
+++ b/kvision-modules/kvision-server-spring-boot/src/main/kotlin/pl/treksoft/kvision/remote/KVServiceManager.kt
@@ -25,6 +25,8 @@ import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import kotlinx.coroutines.CoroutineStart
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.GlobalScope
+import kotlinx.coroutines.channels.ReceiveChannel
+import kotlinx.coroutines.channels.SendChannel
import kotlinx.coroutines.launch
import org.slf4j.Logger
import org.slf4j.LoggerFactory
@@ -392,6 +394,18 @@ actual open class KVServiceManager<T : Any> actual constructor(val serviceClass:
}
/**
+ * Binds a given web socket connetion with a function of the receiver.
+ * @param function a function of the receiver
+ * @param route a route
+ */
+ protected actual inline fun <reified PAR1 : Any, reified PAR2 : Any> bind(
+ noinline function: suspend T.(ReceiveChannel<PAR1>, SendChannel<PAR2>) -> Unit,
+ route: String?
+ ) {
+ TODO("Not implemented in Spring Boot module")
+ }
+
+ /**
* Binds a given function of the receiver as a select options source
* @param function a function of the receiver
*/