diff options
author | Robert Jaros <rjaros@finn.pl> | 2019-03-31 00:06:25 +0100 |
---|---|---|
committer | Robert Jaros <rjaros@finn.pl> | 2019-03-31 00:06:25 +0100 |
commit | f1f0423f63cc9f39fa74fc9b68bb680f1d1abd31 (patch) | |
tree | dc083b68749b09d7d8c0ed4bfa90262feac83624 /kvision-modules/kvision-server-ktor | |
parent | 967f6278ded0d540ce3d7613bd0d39338ef14d63 (diff) | |
download | kvision-f1f0423f63cc9f39fa74fc9b68bb680f1d1abd31.tar.gz kvision-f1f0423f63cc9f39fa74fc9b68bb680f1d1abd31.tar.bz2 kvision-f1f0423f63cc9f39fa74fc9b68bb680f1d1abd31.zip |
Websockets implementation
Diffstat (limited to 'kvision-modules/kvision-server-ktor')
3 files changed, 129 insertions, 7 deletions
diff --git a/kvision-modules/kvision-server-ktor/build.gradle b/kvision-modules/kvision-server-ktor/build.gradle index 62b5fe83..44be9171 100644 --- a/kvision-modules/kvision-server-ktor/build.gradle +++ b/kvision-modules/kvision-server-ktor/build.gradle @@ -10,6 +10,7 @@ dependencies { compile "org.jetbrains.kotlinx:kotlinx-coroutines-core:$coroutinesVersion" compile "io.ktor:ktor-server-core:$ktorVersion" compile "io.ktor:ktor-jackson:$ktorVersion" + compile "io.ktor:ktor-websockets:$ktorVersion" compile "com.google.inject:guice:$guiceVersion" compile "com.fasterxml.jackson.module:jackson-module-kotlin:$jacksonModuleKotlinVersion" testCompile "org.jetbrains.kotlin:kotlin-test:$kotlinVersion" diff --git a/kvision-modules/kvision-server-ktor/src/main/kotlin/pl/treksoft/kvision/remote/KVModules.kt b/kvision-modules/kvision-server-ktor/src/main/kotlin/pl/treksoft/kvision/remote/KVModules.kt index 956e7301..e2edc175 100644 --- a/kvision-modules/kvision-server-ktor/src/main/kotlin/pl/treksoft/kvision/remote/KVModules.kt +++ b/kvision-modules/kvision-server-ktor/src/main/kotlin/pl/treksoft/kvision/remote/KVModules.kt @@ -31,17 +31,25 @@ import io.ktor.application.ApplicationCallPipeline import io.ktor.application.call import io.ktor.application.install import io.ktor.features.ContentNegotiation +import io.ktor.http.cio.websocket.Frame import io.ktor.http.content.defaultResource import io.ktor.http.content.resources import io.ktor.http.content.static import io.ktor.jackson.jackson import io.ktor.routing.routing import io.ktor.util.AttributeKey +import io.ktor.util.KtorExperimentalAPI +import io.ktor.websocket.WebSocketServerSession +import io.ktor.websocket.WebSockets +import kotlinx.coroutines.channels.ReceiveChannel +import kotlinx.coroutines.channels.SendChannel +import kotlin.coroutines.CoroutineContext fun Application.kvisionInit(vararg modules: Module) { install(ContentNegotiation) { jackson() } + install(WebSockets) routing { static("/") { resources("assets") @@ -72,3 +80,50 @@ class MainModule(private val application: Application) : AbstractModule() { bind(Application::class.java).toInstance(application) } } + +class WsSessionModule(private val webSocketSession: WebSocketServerSession) : + AbstractModule() { + override fun configure() { + bind(WebSocketServerSession::class.java).toInstance(webSocketSession) + } +} + +class DummyWsSessionModule() : AbstractModule() { + override fun configure() { + bind(WebSocketServerSession::class.java).toInstance(DummyWebSocketServerSession()) + } +} + +class DummyWebSocketServerSession : WebSocketServerSession { + override val call: ApplicationCall + get() = throw UnsupportedOperationException() + override val coroutineContext: CoroutineContext + get() = throw UnsupportedOperationException() + override val incoming: ReceiveChannel<Frame> + get() = throw UnsupportedOperationException() + override var masking: Boolean + get() = throw UnsupportedOperationException() + set(value) { + throw UnsupportedOperationException() + } + override var maxFrameSize: Long + get() = throw UnsupportedOperationException() + set(value) { + throw UnsupportedOperationException() + } + override val outgoing: SendChannel<Frame> + get() = throw UnsupportedOperationException() + + @UseExperimental(KtorExperimentalAPI::class) + override suspend fun close(cause: Throwable?) { + throw UnsupportedOperationException() + } + + override suspend fun flush() { + throw UnsupportedOperationException() + } + + override fun terminate() { + throw UnsupportedOperationException() + } +} diff --git a/kvision-modules/kvision-server-ktor/src/main/kotlin/pl/treksoft/kvision/remote/KVServiceManager.kt b/kvision-modules/kvision-server-ktor/src/main/kotlin/pl/treksoft/kvision/remote/KVServiceManager.kt index d4985adf..0839a5c5 100644 --- a/kvision-modules/kvision-server-ktor/src/main/kotlin/pl/treksoft/kvision/remote/KVServiceManager.kt +++ b/kvision-modules/kvision-server-ktor/src/main/kotlin/pl/treksoft/kvision/remote/KVServiceManager.kt @@ -24,6 +24,10 @@ package pl.treksoft.kvision.remote import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import io.ktor.application.ApplicationCall import io.ktor.application.call +import io.ktor.http.cio.websocket.CloseReason +import io.ktor.http.cio.websocket.Frame +import io.ktor.http.cio.websocket.close +import io.ktor.http.cio.websocket.readText import io.ktor.request.receive import io.ktor.response.respond import io.ktor.routing.Route @@ -33,7 +37,17 @@ import io.ktor.routing.options import io.ktor.routing.post import io.ktor.routing.put import io.ktor.util.pipeline.PipelineContext +import io.ktor.websocket.WebSocketServerSession +import io.ktor.websocket.webSocket import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.channels.ReceiveChannel +import kotlinx.coroutines.channels.SendChannel +import kotlinx.coroutines.channels.filterNotNull +import kotlinx.coroutines.channels.map +import kotlinx.coroutines.channels.mapNotNull +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.launch import org.slf4j.Logger import org.slf4j.LoggerFactory import kotlin.reflect.KClass @@ -55,6 +69,8 @@ actual open class KVServiceManager<T : Any> actual constructor(val serviceClass: mutableMapOf() val optionsRequests: MutableMap<String, suspend PipelineContext<Unit, ApplicationCall>.(Unit) -> Unit> = mutableMapOf() + val webSocketRequests: MutableMap<String, suspend WebSocketServerSession.() -> Unit> = + mutableMapOf() val mapper = jacksonObjectMapper() var counter: Int = 0 @@ -72,7 +88,7 @@ actual open class KVServiceManager<T : Any> actual constructor(val serviceClass: ) { val routeDef = route ?: "route${this::class.simpleName}${counter++}" addRoute(method, "/kv/$routeDef") { - val service = call.injector.getInstance(serviceClass.java) + val service = call.injector.createChildInjector(DummyWsSessionModule()).getInstance(serviceClass.java) val jsonRpcRequest = if (method == HttpMethod.GET) { JsonRpcRequest(call.request.queryParameters["id"]?.toInt() ?: 0, "", listOf()) } else { @@ -113,7 +129,7 @@ actual open class KVServiceManager<T : Any> actual constructor(val serviceClass: throw UnsupportedOperationException("GET method is only supported for methods without parameters") val routeDef = route ?: "route${this::class.simpleName}${counter++}" addRoute(method, "/kv/$routeDef") { - val service = call.injector.getInstance(serviceClass.java) + val service = call.injector.createChildInjector(DummyWsSessionModule()).getInstance(serviceClass.java) val jsonRpcRequest = call.receive<JsonRpcRequest>() if (jsonRpcRequest.params.size == 1) { val param = getParameter<PAR>(jsonRpcRequest.params[0]) @@ -160,7 +176,7 @@ actual open class KVServiceManager<T : Any> actual constructor(val serviceClass: throw UnsupportedOperationException("GET method is only supported for methods without parameters") val routeDef = route ?: "route${this::class.simpleName}${counter++}" addRoute(method, "/kv/$routeDef") { - val service = call.injector.getInstance(serviceClass.java) + val service = call.injector.createChildInjector(DummyWsSessionModule()).getInstance(serviceClass.java) val jsonRpcRequest = call.receive<JsonRpcRequest>() if (jsonRpcRequest.params.size == 2) { val param1 = getParameter<PAR1>(jsonRpcRequest.params[0]) @@ -208,7 +224,7 @@ actual open class KVServiceManager<T : Any> actual constructor(val serviceClass: throw UnsupportedOperationException("GET method is only supported for methods without parameters") val routeDef = route ?: "route${this::class.simpleName}${counter++}" addRoute(method, "/kv/$routeDef") { - val service = call.injector.getInstance(serviceClass.java) + val service = call.injector.createChildInjector(DummyWsSessionModule()).getInstance(serviceClass.java) val jsonRpcRequest = call.receive<JsonRpcRequest>() @Suppress("MagicNumber") if (jsonRpcRequest.params.size == 3) { @@ -258,7 +274,7 @@ actual open class KVServiceManager<T : Any> actual constructor(val serviceClass: throw UnsupportedOperationException("GET method is only supported for methods without parameters") val routeDef = route ?: "route${this::class.simpleName}${counter++}" addRoute(method, "/kv/$routeDef") { - val service = call.injector.getInstance(serviceClass.java) + val service = call.injector.createChildInjector(DummyWsSessionModule()).getInstance(serviceClass.java) val jsonRpcRequest = call.receive<JsonRpcRequest>() @Suppress("MagicNumber") if (jsonRpcRequest.params.size == 4) { @@ -310,7 +326,7 @@ actual open class KVServiceManager<T : Any> actual constructor(val serviceClass: throw UnsupportedOperationException("GET method is only supported for methods without parameters") val routeDef = route ?: "route${this::class.simpleName}${counter++}" addRoute(method, "/kv/$routeDef") { - val service = call.injector.getInstance(serviceClass.java) + val service = call.injector.createChildInjector(DummyWsSessionModule()).getInstance(serviceClass.java) val jsonRpcRequest = call.receive<JsonRpcRequest>() @Suppress("MagicNumber") if (jsonRpcRequest.params.size == 5) { @@ -348,6 +364,51 @@ actual open class KVServiceManager<T : Any> actual constructor(val serviceClass: } /** + * Binds a given web socket connetion with a function of the receiver. + * @param function a function of the receiver + * @param route a route + */ + protected actual inline fun <reified PAR1 : Any, reified PAR2 : Any> bind( + noinline function: suspend T.(ReceiveChannel<PAR1>, SendChannel<PAR2>) -> Unit, + route: String? + ) { + val routeDef = "route${this::class.simpleName}${counter++}" + webSocketRequests["/kv/$routeDef"] = { + val wsInjector = call.injector.createChildInjector(WsSessionModule(this)) + val service = wsInjector.getInstance(serviceClass.java) + val requestChannel = incoming.mapNotNull { it as? Frame.Text }.map { + val jsonRpcRequest = getParameter<JsonRpcRequest>(it.readText()) + if (jsonRpcRequest.params.size == 1) { + getParameter<PAR1>(jsonRpcRequest.params[0]) + } else { + null + } + }.filterNotNull() + val responseChannel = Channel<PAR2>() + val session = this + coroutineScope { + launch { + for (p in responseChannel) { + val text = mapper.writeValueAsString( + JsonRpcResponse( + id = 0, + result = mapper.writeValueAsString(p) + ) + ) + outgoing.send(Frame.Text(text)) + } + session.close(CloseReason(CloseReason.Codes.NORMAL, "")) + session.close() + } + launch { + function.invoke(service, requestChannel, responseChannel) + if (!responseChannel.isClosedForReceive) responseChannel.close() + } + } + } + } + + /** * Binds a given function of the receiver as a select options source * @param function a function of the receiver */ @@ -357,7 +418,7 @@ actual open class KVServiceManager<T : Any> actual constructor(val serviceClass: ) { val routeDef = "route${this::class.simpleName}${counter++}" addRoute(HttpMethod.POST, "/kv/$routeDef") { - val service = call.injector.getInstance(serviceClass.java) + val service = call.injector.createChildInjector(DummyWsSessionModule()).getInstance(serviceClass.java) val jsonRpcRequest = call.receive<JsonRpcRequest>() if (jsonRpcRequest.params.size == 2) { val param1 = getParameter<String?>(jsonRpcRequest.params[0]) @@ -431,4 +492,9 @@ fun <T : Any> Route.applyRoutes(serviceManager: KVServiceManager<T>) { serviceManager.optionsRequests.forEach { (path, handler) -> options(path, handler) } + serviceManager.webSocketRequests.forEach { (path, handler) -> + this.webSocket(path) { + handler() + } + } } |