aboutsummaryrefslogtreecommitdiff
path: root/kvision-modules/kvision-server-ktor/src/main/kotlin/pl
diff options
context:
space:
mode:
authorRobert Jaros <rjaros@finn.pl>2019-03-31 00:06:25 +0100
committerRobert Jaros <rjaros@finn.pl>2019-03-31 00:06:25 +0100
commitf1f0423f63cc9f39fa74fc9b68bb680f1d1abd31 (patch)
treedc083b68749b09d7d8c0ed4bfa90262feac83624 /kvision-modules/kvision-server-ktor/src/main/kotlin/pl
parent967f6278ded0d540ce3d7613bd0d39338ef14d63 (diff)
downloadkvision-f1f0423f63cc9f39fa74fc9b68bb680f1d1abd31.tar.gz
kvision-f1f0423f63cc9f39fa74fc9b68bb680f1d1abd31.tar.bz2
kvision-f1f0423f63cc9f39fa74fc9b68bb680f1d1abd31.zip
Websockets implementation
Diffstat (limited to 'kvision-modules/kvision-server-ktor/src/main/kotlin/pl')
-rw-r--r--kvision-modules/kvision-server-ktor/src/main/kotlin/pl/treksoft/kvision/remote/KVModules.kt55
-rw-r--r--kvision-modules/kvision-server-ktor/src/main/kotlin/pl/treksoft/kvision/remote/KVServiceManager.kt80
2 files changed, 128 insertions, 7 deletions
diff --git a/kvision-modules/kvision-server-ktor/src/main/kotlin/pl/treksoft/kvision/remote/KVModules.kt b/kvision-modules/kvision-server-ktor/src/main/kotlin/pl/treksoft/kvision/remote/KVModules.kt
index 956e7301..e2edc175 100644
--- a/kvision-modules/kvision-server-ktor/src/main/kotlin/pl/treksoft/kvision/remote/KVModules.kt
+++ b/kvision-modules/kvision-server-ktor/src/main/kotlin/pl/treksoft/kvision/remote/KVModules.kt
@@ -31,17 +31,25 @@ import io.ktor.application.ApplicationCallPipeline
import io.ktor.application.call
import io.ktor.application.install
import io.ktor.features.ContentNegotiation
+import io.ktor.http.cio.websocket.Frame
import io.ktor.http.content.defaultResource
import io.ktor.http.content.resources
import io.ktor.http.content.static
import io.ktor.jackson.jackson
import io.ktor.routing.routing
import io.ktor.util.AttributeKey
+import io.ktor.util.KtorExperimentalAPI
+import io.ktor.websocket.WebSocketServerSession
+import io.ktor.websocket.WebSockets
+import kotlinx.coroutines.channels.ReceiveChannel
+import kotlinx.coroutines.channels.SendChannel
+import kotlin.coroutines.CoroutineContext
fun Application.kvisionInit(vararg modules: Module) {
install(ContentNegotiation) {
jackson()
}
+ install(WebSockets)
routing {
static("/") {
resources("assets")
@@ -72,3 +80,50 @@ class MainModule(private val application: Application) : AbstractModule() {
bind(Application::class.java).toInstance(application)
}
}
+
+class WsSessionModule(private val webSocketSession: WebSocketServerSession) :
+ AbstractModule() {
+ override fun configure() {
+ bind(WebSocketServerSession::class.java).toInstance(webSocketSession)
+ }
+}
+
+class DummyWsSessionModule() : AbstractModule() {
+ override fun configure() {
+ bind(WebSocketServerSession::class.java).toInstance(DummyWebSocketServerSession())
+ }
+}
+
+class DummyWebSocketServerSession : WebSocketServerSession {
+ override val call: ApplicationCall
+ get() = throw UnsupportedOperationException()
+ override val coroutineContext: CoroutineContext
+ get() = throw UnsupportedOperationException()
+ override val incoming: ReceiveChannel<Frame>
+ get() = throw UnsupportedOperationException()
+ override var masking: Boolean
+ get() = throw UnsupportedOperationException()
+ set(value) {
+ throw UnsupportedOperationException()
+ }
+ override var maxFrameSize: Long
+ get() = throw UnsupportedOperationException()
+ set(value) {
+ throw UnsupportedOperationException()
+ }
+ override val outgoing: SendChannel<Frame>
+ get() = throw UnsupportedOperationException()
+
+ @UseExperimental(KtorExperimentalAPI::class)
+ override suspend fun close(cause: Throwable?) {
+ throw UnsupportedOperationException()
+ }
+
+ override suspend fun flush() {
+ throw UnsupportedOperationException()
+ }
+
+ override fun terminate() {
+ throw UnsupportedOperationException()
+ }
+}
diff --git a/kvision-modules/kvision-server-ktor/src/main/kotlin/pl/treksoft/kvision/remote/KVServiceManager.kt b/kvision-modules/kvision-server-ktor/src/main/kotlin/pl/treksoft/kvision/remote/KVServiceManager.kt
index d4985adf..0839a5c5 100644
--- a/kvision-modules/kvision-server-ktor/src/main/kotlin/pl/treksoft/kvision/remote/KVServiceManager.kt
+++ b/kvision-modules/kvision-server-ktor/src/main/kotlin/pl/treksoft/kvision/remote/KVServiceManager.kt
@@ -24,6 +24,10 @@ package pl.treksoft.kvision.remote
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import io.ktor.application.ApplicationCall
import io.ktor.application.call
+import io.ktor.http.cio.websocket.CloseReason
+import io.ktor.http.cio.websocket.Frame
+import io.ktor.http.cio.websocket.close
+import io.ktor.http.cio.websocket.readText
import io.ktor.request.receive
import io.ktor.response.respond
import io.ktor.routing.Route
@@ -33,7 +37,17 @@ import io.ktor.routing.options
import io.ktor.routing.post
import io.ktor.routing.put
import io.ktor.util.pipeline.PipelineContext
+import io.ktor.websocket.WebSocketServerSession
+import io.ktor.websocket.webSocket
import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.channels.Channel
+import kotlinx.coroutines.channels.ReceiveChannel
+import kotlinx.coroutines.channels.SendChannel
+import kotlinx.coroutines.channels.filterNotNull
+import kotlinx.coroutines.channels.map
+import kotlinx.coroutines.channels.mapNotNull
+import kotlinx.coroutines.coroutineScope
+import kotlinx.coroutines.launch
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import kotlin.reflect.KClass
@@ -55,6 +69,8 @@ actual open class KVServiceManager<T : Any> actual constructor(val serviceClass:
mutableMapOf()
val optionsRequests: MutableMap<String, suspend PipelineContext<Unit, ApplicationCall>.(Unit) -> Unit> =
mutableMapOf()
+ val webSocketRequests: MutableMap<String, suspend WebSocketServerSession.() -> Unit> =
+ mutableMapOf()
val mapper = jacksonObjectMapper()
var counter: Int = 0
@@ -72,7 +88,7 @@ actual open class KVServiceManager<T : Any> actual constructor(val serviceClass:
) {
val routeDef = route ?: "route${this::class.simpleName}${counter++}"
addRoute(method, "/kv/$routeDef") {
- val service = call.injector.getInstance(serviceClass.java)
+ val service = call.injector.createChildInjector(DummyWsSessionModule()).getInstance(serviceClass.java)
val jsonRpcRequest = if (method == HttpMethod.GET) {
JsonRpcRequest(call.request.queryParameters["id"]?.toInt() ?: 0, "", listOf())
} else {
@@ -113,7 +129,7 @@ actual open class KVServiceManager<T : Any> actual constructor(val serviceClass:
throw UnsupportedOperationException("GET method is only supported for methods without parameters")
val routeDef = route ?: "route${this::class.simpleName}${counter++}"
addRoute(method, "/kv/$routeDef") {
- val service = call.injector.getInstance(serviceClass.java)
+ val service = call.injector.createChildInjector(DummyWsSessionModule()).getInstance(serviceClass.java)
val jsonRpcRequest = call.receive<JsonRpcRequest>()
if (jsonRpcRequest.params.size == 1) {
val param = getParameter<PAR>(jsonRpcRequest.params[0])
@@ -160,7 +176,7 @@ actual open class KVServiceManager<T : Any> actual constructor(val serviceClass:
throw UnsupportedOperationException("GET method is only supported for methods without parameters")
val routeDef = route ?: "route${this::class.simpleName}${counter++}"
addRoute(method, "/kv/$routeDef") {
- val service = call.injector.getInstance(serviceClass.java)
+ val service = call.injector.createChildInjector(DummyWsSessionModule()).getInstance(serviceClass.java)
val jsonRpcRequest = call.receive<JsonRpcRequest>()
if (jsonRpcRequest.params.size == 2) {
val param1 = getParameter<PAR1>(jsonRpcRequest.params[0])
@@ -208,7 +224,7 @@ actual open class KVServiceManager<T : Any> actual constructor(val serviceClass:
throw UnsupportedOperationException("GET method is only supported for methods without parameters")
val routeDef = route ?: "route${this::class.simpleName}${counter++}"
addRoute(method, "/kv/$routeDef") {
- val service = call.injector.getInstance(serviceClass.java)
+ val service = call.injector.createChildInjector(DummyWsSessionModule()).getInstance(serviceClass.java)
val jsonRpcRequest = call.receive<JsonRpcRequest>()
@Suppress("MagicNumber")
if (jsonRpcRequest.params.size == 3) {
@@ -258,7 +274,7 @@ actual open class KVServiceManager<T : Any> actual constructor(val serviceClass:
throw UnsupportedOperationException("GET method is only supported for methods without parameters")
val routeDef = route ?: "route${this::class.simpleName}${counter++}"
addRoute(method, "/kv/$routeDef") {
- val service = call.injector.getInstance(serviceClass.java)
+ val service = call.injector.createChildInjector(DummyWsSessionModule()).getInstance(serviceClass.java)
val jsonRpcRequest = call.receive<JsonRpcRequest>()
@Suppress("MagicNumber")
if (jsonRpcRequest.params.size == 4) {
@@ -310,7 +326,7 @@ actual open class KVServiceManager<T : Any> actual constructor(val serviceClass:
throw UnsupportedOperationException("GET method is only supported for methods without parameters")
val routeDef = route ?: "route${this::class.simpleName}${counter++}"
addRoute(method, "/kv/$routeDef") {
- val service = call.injector.getInstance(serviceClass.java)
+ val service = call.injector.createChildInjector(DummyWsSessionModule()).getInstance(serviceClass.java)
val jsonRpcRequest = call.receive<JsonRpcRequest>()
@Suppress("MagicNumber")
if (jsonRpcRequest.params.size == 5) {
@@ -348,6 +364,51 @@ actual open class KVServiceManager<T : Any> actual constructor(val serviceClass:
}
/**
+ * Binds a given web socket connetion with a function of the receiver.
+ * @param function a function of the receiver
+ * @param route a route
+ */
+ protected actual inline fun <reified PAR1 : Any, reified PAR2 : Any> bind(
+ noinline function: suspend T.(ReceiveChannel<PAR1>, SendChannel<PAR2>) -> Unit,
+ route: String?
+ ) {
+ val routeDef = "route${this::class.simpleName}${counter++}"
+ webSocketRequests["/kv/$routeDef"] = {
+ val wsInjector = call.injector.createChildInjector(WsSessionModule(this))
+ val service = wsInjector.getInstance(serviceClass.java)
+ val requestChannel = incoming.mapNotNull { it as? Frame.Text }.map {
+ val jsonRpcRequest = getParameter<JsonRpcRequest>(it.readText())
+ if (jsonRpcRequest.params.size == 1) {
+ getParameter<PAR1>(jsonRpcRequest.params[0])
+ } else {
+ null
+ }
+ }.filterNotNull()
+ val responseChannel = Channel<PAR2>()
+ val session = this
+ coroutineScope {
+ launch {
+ for (p in responseChannel) {
+ val text = mapper.writeValueAsString(
+ JsonRpcResponse(
+ id = 0,
+ result = mapper.writeValueAsString(p)
+ )
+ )
+ outgoing.send(Frame.Text(text))
+ }
+ session.close(CloseReason(CloseReason.Codes.NORMAL, ""))
+ session.close()
+ }
+ launch {
+ function.invoke(service, requestChannel, responseChannel)
+ if (!responseChannel.isClosedForReceive) responseChannel.close()
+ }
+ }
+ }
+ }
+
+ /**
* Binds a given function of the receiver as a select options source
* @param function a function of the receiver
*/
@@ -357,7 +418,7 @@ actual open class KVServiceManager<T : Any> actual constructor(val serviceClass:
) {
val routeDef = "route${this::class.simpleName}${counter++}"
addRoute(HttpMethod.POST, "/kv/$routeDef") {
- val service = call.injector.getInstance(serviceClass.java)
+ val service = call.injector.createChildInjector(DummyWsSessionModule()).getInstance(serviceClass.java)
val jsonRpcRequest = call.receive<JsonRpcRequest>()
if (jsonRpcRequest.params.size == 2) {
val param1 = getParameter<String?>(jsonRpcRequest.params[0])
@@ -431,4 +492,9 @@ fun <T : Any> Route.applyRoutes(serviceManager: KVServiceManager<T>) {
serviceManager.optionsRequests.forEach { (path, handler) ->
options(path, handler)
}
+ serviceManager.webSocketRequests.forEach { (path, handler) ->
+ this.webSocket(path) {
+ handler()
+ }
+ }
}