aboutsummaryrefslogtreecommitdiff
path: root/kvision-modules/kvision-server-jooby/src/main/kotlin/pl/treksoft/kvision/remote
diff options
context:
space:
mode:
Diffstat (limited to 'kvision-modules/kvision-server-jooby/src/main/kotlin/pl/treksoft/kvision/remote')
-rw-r--r--kvision-modules/kvision-server-jooby/src/main/kotlin/pl/treksoft/kvision/remote/KVModules.kt3
-rw-r--r--kvision-modules/kvision-server-jooby/src/main/kotlin/pl/treksoft/kvision/remote/KVServiceManager.kt79
2 files changed, 79 insertions, 3 deletions
diff --git a/kvision-modules/kvision-server-jooby/src/main/kotlin/pl/treksoft/kvision/remote/KVModules.kt b/kvision-modules/kvision-server-jooby/src/main/kotlin/pl/treksoft/kvision/remote/KVModules.kt
index 34b2877f..b3da785d 100644
--- a/kvision-modules/kvision-server-jooby/src/main/kotlin/pl/treksoft/kvision/remote/KVModules.kt
+++ b/kvision-modules/kvision-server-jooby/src/main/kotlin/pl/treksoft/kvision/remote/KVModules.kt
@@ -24,6 +24,9 @@ package pl.treksoft.kvision.remote
import org.jooby.Kooby
import org.jooby.json.Jackson
+/**
+ * Initialization function for Jooby server.
+ */
fun Kooby.kvisionInit() {
assets("/", "/assets/index.html")
assets("/**", "/assets/{0}").onMissing(0)
diff --git a/kvision-modules/kvision-server-jooby/src/main/kotlin/pl/treksoft/kvision/remote/KVServiceManager.kt b/kvision-modules/kvision-server-jooby/src/main/kotlin/pl/treksoft/kvision/remote/KVServiceManager.kt
index cfd9d467..ca08b080 100644
--- a/kvision-modules/kvision-server-jooby/src/main/kotlin/pl/treksoft/kvision/remote/KVServiceManager.kt
+++ b/kvision-modules/kvision-server-jooby/src/main/kotlin/pl/treksoft/kvision/remote/KVServiceManager.kt
@@ -24,9 +24,15 @@ package pl.treksoft.kvision.remote
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import com.google.inject.Injector
import kotlinx.coroutines.CoroutineStart
+import kotlinx.coroutines.Dispatchers
+import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.GlobalScope
+import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.SendChannel
+import kotlinx.coroutines.channels.filterNotNull
+import kotlinx.coroutines.channels.map
+import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.launch
import org.jooby.Kooby
import org.jooby.Request
@@ -39,6 +45,7 @@ import kotlin.reflect.KClass
* Multiplatform service manager for Jooby.
*/
@Suppress("LargeClass")
+@UseExperimental(ExperimentalCoroutinesApi::class)
actual open class KVServiceManager<T : Any> actual constructor(val serviceClass: KClass<T>) {
companion object {
@@ -313,7 +320,7 @@ actual open class KVServiceManager<T : Any> actual constructor(val serviceClass:
}
/**
- * Binds a given web socket connetion with a function of the receiver.
+ * Binds a given web socket connection with a function of the receiver.
* @param function a function of the receiver
* @param route a route
*/
@@ -321,10 +328,67 @@ actual open class KVServiceManager<T : Any> actual constructor(val serviceClass:
noinline function: suspend T.(ReceiveChannel<PAR1>, SendChannel<PAR2>) -> Unit,
route: String?
) {
- TODO("Not implemented in Jooby module")
+ val routeDef = "route${this::class.simpleName}${counter++}"
+ routes.add {
+ ws("/kvws/$routeDef") { ws ->
+ val injector = ws.require(Injector::class.java)
+ val service = injector.getInstance(serviceClass.java)
+ val incoming = Channel<String>()
+ val outgoing = Channel<String>()
+ GlobalScope.launch {
+ coroutineScope {
+ launch(Dispatchers.IO) {
+ for (text in outgoing) {
+ ws.send(text)
+ }
+ ws.close()
+ }
+ launch {
+ val requestChannel = incoming.map {
+ val jsonRpcRequest = getParameter<JsonRpcRequest>(it)
+ if (jsonRpcRequest.params.size == 1) {
+ getParameter<PAR1>(jsonRpcRequest.params[0])
+ } else {
+ null
+ }
+ }.filterNotNull()
+ val responseChannel = Channel<PAR2>()
+ coroutineScope {
+ launch(Dispatchers.IO) {
+ for (p in responseChannel) {
+ val text = mapper.writeValueAsString(
+ JsonRpcResponse(
+ id = 0,
+ result = mapper.writeValueAsString(p)
+ )
+ )
+ outgoing.send(text)
+ }
+ }
+ launch {
+ function.invoke(service, requestChannel, responseChannel)
+ if (!responseChannel.isClosedForReceive) responseChannel.close()
+ }
+ }
+ if (!outgoing.isClosedForReceive) outgoing.close()
+ }
+ }
+ }
+ ws.onClose {
+ GlobalScope.launch {
+ outgoing.close()
+ incoming.close()
+ }
+ }
+ ws.onMessage { msg ->
+ GlobalScope.launch {
+ incoming.send(msg.value())
+ }
+ }
+ }
+ }
}
-
/**
* Binds a given function of the receiver as a select options source
* @param function a function of the receiver
@@ -363,6 +427,9 @@ actual open class KVServiceManager<T : Any> actual constructor(val serviceClass:
}
}
+ /**
+ * @suppress Internal method
+ */
fun call(
method: HttpMethod,
path: String,
@@ -379,6 +446,9 @@ actual open class KVServiceManager<T : Any> actual constructor(val serviceClass:
}
}
+ /**
+ * @suppress Internal method
+ */
protected inline fun <reified T> getParameter(str: String?): T {
return str?.let {
if (T::class == String::class) {
@@ -391,6 +461,9 @@ actual open class KVServiceManager<T : Any> actual constructor(val serviceClass:
}
+/**
+ * A function to generate routes based on definitions from the service manager.
+ */
fun <T : Any> Kooby.applyRoutes(serviceManager: KVServiceManager<T>) {
serviceManager.routes.forEach {
it.invoke(this@applyRoutes)