diff options
author | therealbush <therealbush@users.noreply.github.com> | 2022-04-23 15:22:18 -0700 |
---|---|---|
committer | therealbush <therealbush@users.noreply.github.com> | 2022-04-23 15:22:18 -0700 |
commit | c12696bd528e891b9da126d3b92f3db089b4b6e6 (patch) | |
tree | 6683284c29f6dd7b9d65fae881527e05b54b3a84 /src | |
parent | 1cb96200f8844e9192f55037cfc11bfb42e1d94f (diff) | |
download | eventbus-kotlin-c12696bd528e891b9da126d3b92f3db089b4b6e6.tar.gz eventbus-kotlin-c12696bd528e891b9da126d3b92f3db089b4b6e6.tar.bz2 eventbus-kotlin-c12696bd528e891b9da126d3b92f3db089b4b6e6.zip |
some changes
Diffstat (limited to 'src')
13 files changed, 410 insertions, 329 deletions
diff --git a/src/main/kotlin/me/bush/eventbuskotlin/CancelledState.kt b/src/main/kotlin/me/bush/eventbuskotlin/CancelledState.kt new file mode 100644 index 0000000..861b9fd --- /dev/null +++ b/src/main/kotlin/me/bush/eventbuskotlin/CancelledState.kt @@ -0,0 +1,70 @@ +package me.bush.eventbuskotlin + +import sun.misc.Unsafe +import java.lang.reflect.Modifier +import java.util.concurrent.ConcurrentHashMap +import kotlin.reflect.KClass +import kotlin.reflect.KMutableProperty +import kotlin.reflect.full.declaredMembers +import kotlin.reflect.full.isSubclassOf +import kotlin.reflect.full.withNullability +import kotlin.reflect.jvm.javaField +import kotlin.reflect.typeOf + +/** + * A simple SAM interface for determining if an event (or any class) is cancellable. + * + * [Information and examples](https://github.com/therealbush/eventbus-kotlin#tododothething) + * + * @author bush + * @since 1.0.0 + */ +internal fun interface CancelledState { + + /** + * [event] should only ever be of the type that was passed + * to [CancelledState.invoke], **or this will throw.** + * + * [Information and examples](https://github.com/therealbush/eventbus-kotlin#tododothething) + * + * @return `true` if [event] is cancelled, `false` otherwise. + */ + fun isCancelled(event: Any): Boolean + + companion object { + private val UNSAFE = runCatching { + Unsafe::class.declaredMembers.single { it.name == "theUnsafe" }.handleCall() as Unsafe + }.onFailure { + LOGGER.error("Could not obtain Unsafe instance. Will not be able to determine external cancel state.") + }.getOrNull() // soy jvm + private val NAMES = arrayOf("canceled", "cancelled") + private val NOT_CANCELLABLE = CancelledState { false } + private val CACHE = ConcurrentHashMap<KClass<*>, CancelledState>() + + /** + * Creates a [CancelledState] object for events of class [type]. + * + * [Information and examples](https://github.com/therealbush/eventbus-kotlin#tododothething) + */ + operator fun invoke(type: KClass<*>, config: Config): CancelledState = CACHE.getOrPut(type) { + // Default implementation for our event class. + if (type.isSubclassOf(Event::class)) CancelledState { (it as Event).cancelled } + // If compatibility is disabled. + else if (!config.thirdPartyCompatibility) NOT_CANCELLABLE + // Find a field named "cancelled" or "canceled" that is a boolean, and has a backing field. + else type.allMembers.filter { it.name in NAMES && it.returnType.withNullability(false) == typeOf<Boolean>() } + .filterIsInstance<KMutableProperty<*>>().filter { it.javaField != null }.toList().let { + if (it.isEmpty() || UNSAFE == null) NOT_CANCELLABLE else { + if (it.size != 1) config.logger.warn("Multiple possible cancel fields found for event $type") + val offset = it[0].javaField!!.let { field -> + if (Modifier.isStatic(field.modifiers)) + UNSAFE.staticFieldOffset(field) + else UNSAFE.objectFieldOffset(field) + } + // This is the same speed as direct access, plus one JNI call. + CancelledState { event -> UNSAFE.getBoolean(event, offset) } + } + } + } + } +} diff --git a/src/main/kotlin/me/bush/illnamethislater/Config.kt b/src/main/kotlin/me/bush/eventbuskotlin/Config.kt index 31c2b63..f0e1120 100644 --- a/src/main/kotlin/me/bush/illnamethislater/Config.kt +++ b/src/main/kotlin/me/bush/eventbuskotlin/Config.kt @@ -1,10 +1,8 @@ -package me.bush.illnamethislater +package me.bush.eventbuskotlin import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers -import org.apache.logging.log4j.LogManager import org.apache.logging.log4j.Logger -import kotlin.coroutines.CoroutineContext /** @@ -18,15 +16,17 @@ import kotlin.coroutines.CoroutineContext data class Config( /** - * The logger this [EventBus] will use to log errors, or log [EventBus.debugInfo] + * The logger this [EventBus] will use to log errors, or [EventBus.debug] + * + * [Information and examples](https://github.com/therealbush/eventbus-kotlin#tododothething) */ - val logger: Logger = LogManager.getLogger("Eventbus"), + val logger: Logger = LOGGER, /** - * The [CoroutineContext] to use when posting events to parallel listeners. The default - * value will work just fine, but you can specify a custom context if desired. + * The [CoroutineScope] to use when posting events to parallel listeners. The default + * value will work just fine, but you can specify a custom scope if desired. * - * [What is a Coroutine Context?](https://kotlinlang.org/docs/coroutine-context-and-dispatchers.html) + * [What is a Coroutine?](https://kotlinlang.org/docs/coroutines-overview.html) * * [Information and examples](https://github.com/therealbush/eventbus-kotlin#tododothething) */ @@ -41,7 +41,10 @@ data class Config( val thirdPartyCompatibility: Boolean = true, /** - * todo doc + * Whether listeners need to be annotated with [EventListener] to be subscribed to this [EventBus]. + * This has no effect on anything else, and is just to improve code readability. + * + * [Information and examples](https://github.com/therealbush/eventbus-kotlin#tododothething) */ val annotationRequired: Boolean = false ) diff --git a/src/main/kotlin/me/bush/illnamethislater/Event.kt b/src/main/kotlin/me/bush/eventbuskotlin/Event.kt index 2881c0e..50f1131 100644 --- a/src/main/kotlin/me/bush/illnamethislater/Event.kt +++ b/src/main/kotlin/me/bush/eventbuskotlin/Event.kt @@ -1,4 +1,4 @@ -package me.bush.illnamethislater +package me.bush.eventbuskotlin /** * A base class for events that can be cancelled. @@ -14,6 +14,8 @@ abstract class Event { * Whether this event is cancelled or not. If it is, only future listeners with * [Listener.receiveCancelled] will receive it. However, it can be set back to * `false`, and listeners will be able to receive it again. + * + * [Information and examples](https://github.com/therealbush/eventbus-kotlin#tododothething) */ var cancelled = false set(value) { @@ -22,11 +24,15 @@ abstract class Event { /** * Determines if this event can be [cancelled]. This does not have to return a constant value. + * + * [Information and examples](https://github.com/therealbush/eventbus-kotlin#tododothething) */ protected abstract val cancellable: Boolean /** * Sets [cancelled] to true. + * + * [Information and examples](https://github.com/therealbush/eventbus-kotlin#tododothething) */ fun cancel() { cancelled = true diff --git a/src/main/kotlin/me/bush/eventbuskotlin/EventBus.kt b/src/main/kotlin/me/bush/eventbuskotlin/EventBus.kt new file mode 100644 index 0000000..eeb887d --- /dev/null +++ b/src/main/kotlin/me/bush/eventbuskotlin/EventBus.kt @@ -0,0 +1,111 @@ +package me.bush.eventbuskotlin + +import kotlinx.coroutines.launch +import java.util.concurrent.ConcurrentHashMap +import kotlin.reflect.KClass + +/** + * [A simple, thread safe, and fast event dispatcher for Kotlin/JVM and Java.](https://github.com/therealbush/eventbus-kotlin) + * + * @author bush + * @since 1.0.0 + */ +class EventBus(private val config: Config = Config()) { + private val listeners = ConcurrentHashMap<KClass<*>, ListenerGroup>() + private val subscribers = ConcurrentHashMap.newKeySet<Any>() + private val cache = ConcurrentHashMap<Any, List<Listener>?>() + + /** + * Searches [subscriber] for members that return [Listener] and registers them, if + * [Config.annotationRequired] is false, or they are annotated with [EventListener]. + * + * This will not find top level members, use [register] instead. + * + * [Information and examples](https://github.com/therealbush/eventbus-kotlin#tododothething) + * + * @return `true` if [subscriber] was successfully subscribed, + * `false` if it was already subscribed, or could not be. + */ + fun subscribe(subscriber: Any): Boolean = subscribers.add(subscriber).also { + if (it) cache.computeIfAbsent(subscriber) { + getListeners(subscriber, config) + }?.forEach(::register) ?: return false + } + + /** + * Unregisters all listeners belonging to [subscriber]. + * + * This will not remove top level listeners, use [unregister] instead. + * + * [Information and examples](https://github.com/therealbush/eventbus-kotlin#tododothething) + * + * @return `true` if [subscriber] was successfully unsubscribed, `false` if it was not subscribed. + */ + fun unsubscribe(subscriber: Any): Boolean = subscribers.remove(subscriber).also { + if (it) cache[subscriber]?.forEach(::unregister) + } + + /** + * Registers a [Listener] to this [EventBus]. + * + * [Information and examples](https://github.com/therealbush/eventbus-kotlin#tododothething) + */ + fun register(listener: Listener): Boolean = listeners.computeIfAbsent(listener.type) { + ListenerGroup(it, config) + }.register(listener) + + /** + * Unregisters a [Listener] from this [EventBus]. Returns `true` if [Listener] was registered. + * + * [Information and examples](https://github.com/therealbush/eventbus-kotlin#tododothething) + */ + fun unregister(listener: Listener): Boolean = listeners[listener.type]?.let { + val contained = it.unregister(listener) + if (it.parallel.isEmpty() && it.sequential.isEmpty()) { + listeners.remove(listener.type) + } + contained + } ?: false + + /** + * Posts an [event] to every listener that accepts its type. + * + * Events are **not** queued: only listeners currently subscribed will be called. + * + * If [event] is a subclass of [Event], or has a field-backed mutable boolean property + * named "cancelled" or "canceled" and [Config.thirdPartyCompatibility] is `true`, + * only future listeners with [Listener.receiveCancelled] will receive [event] + * while that property is `true`. + * + * Sequential listeners are called in the order of [Listener.priority], and parallel listeners + * are called after using [launch]. This method will not wait for parallel listeners to complete. + * + * [Information and examples](https://github.com/therealbush/eventbus-kotlin#tododothething) + */ + fun post(event: Any): Boolean = listeners[event::class]?.post(event) ?: false + + /** + * Logs the subscriber count, total listener count, and listener count for every event type with at + * least one subscriber to [Config.logger]. Per-event counts are sorted from greatest to least listeners. + * + * **This may cause a [ConcurrentModificationException] if [register] or [subscribe] is called in parallel.** + * + * [Information and examples](https://github.com/therealbush/eventbus-kotlin#tododothething) + * ``` + * Subscribers: 5 + * Listeners: 8 sequential, 21 parallel + * BushIsSoCool: 4, 9 + * OtherEvent: 1, 10 + * String: 3, 0 + */ + fun debug() { + config.logger.info("Subscribers: ${subscribers.size}") + val sequential = listeners.values.sumOf { it.sequential.size } + val parallel = listeners.values.sumOf { it.parallel.size } + config.logger.info("Listeners: $sequential sequential, $parallel parallel") + listeners.values.sortedByDescending { it.sequential.size + it.parallel.size }.forEach { + config.logger.info(it.toString()) + } + } +} + diff --git a/src/main/kotlin/me/bush/illnamethislater/Listener.kt b/src/main/kotlin/me/bush/eventbuskotlin/Listener.kt index c9251d2..bb78b75 100644 --- a/src/main/kotlin/me/bush/illnamethislater/Listener.kt +++ b/src/main/kotlin/me/bush/eventbuskotlin/Listener.kt @@ -1,4 +1,4 @@ -package me.bush.illnamethislater +package me.bush.eventbuskotlin import java.util.function.Consumer import kotlin.reflect.KClass @@ -20,8 +20,7 @@ class Listener @PublishedApi internal constructor( internal val receiveCancelled: Boolean = false ) { @Suppress("UNCHECKED_CAST") - // Generics have no benefit here, - // it is easier just to force cast. + // Generics have no benefit here. internal val listener = listener as (Any) -> Unit internal var subscriber: Any? = null } @@ -33,11 +32,11 @@ class Listener @PublishedApi internal constructor( * * [Information and examples](https://github.com/therealbush/eventbus-kotlin#tododothething) * - * @param T The **exact** (no inheritance) type of event to listen for. - * @param priority The priority of this listener, high to low. - * @param parallel If a listener should be invoked in parallel with other parallel listeners, or sequentially. + * @param T The **exact** (no inheritance) type of event to listen for. + * @param priority The priority of this listener, high to low. + * @param parallel If a listener should be invoked in parallel with other parallel listeners, or sequentially. * @param receiveCancelled If a listener should receive events that have been cancelled by previous listeners. - * @param listener The body of the listener that will be invoked. + * @param listener The body of the listener that will be invoked. */ inline fun <reified T : Any> listener( priority: Int = 0, @@ -47,7 +46,7 @@ inline fun <reified T : Any> listener( ) = Listener(listener, T::class, priority, parallel, receiveCancelled) /** - * **This function is intended for use in Java code.** + * **This function is intended for use in Java only.** * * Creates a listener that can be held in a variable or returned from a function * or getter belonging to an object to be subscribed with [EventBus.subscribe], @@ -55,11 +54,11 @@ inline fun <reified T : Any> listener( * * [Information and examples](https://github.com/therealbush/eventbus-kotlin#tododothething) * - * @param type The **exact** (no inheritance) type of event to listen for. - * @param priority The priority of this listener, high to low. - * @param parallel If a listener should be invoked in parallel with other parallel listeners, or sequentially. + * @param type The **exact** (no inheritance) type of event to listen for. + * @param priority The priority of this listener, high to low. + * @param parallel If a listener should be invoked in parallel with other parallel listeners, or sequentially. * @param receiveCancelled If a listener should receive events that have been cancelled by previous listeners. - * @param listener The body of the listener that will be invoked. + * @param listener The body of the listener that will be invoked. */ @JvmOverloads fun <T : Any> listener( @@ -68,6 +67,6 @@ fun <T : Any> listener( parallel: Boolean = false, receiveCancelled: Boolean = false, // This might introduce some overhead, but its worth - // not manually having to return "Kotlin.UNIT" from every Java listener. + // not manually having to return "Unit.INSTANCE" from every Java listener. listener: Consumer<T> ) = Listener(listener::accept, type.kotlin, priority, parallel, receiveCancelled) diff --git a/src/main/kotlin/me/bush/eventbuskotlin/ListenerGroup.kt b/src/main/kotlin/me/bush/eventbuskotlin/ListenerGroup.kt new file mode 100644 index 0000000..d4e9697 --- /dev/null +++ b/src/main/kotlin/me/bush/eventbuskotlin/ListenerGroup.kt @@ -0,0 +1,89 @@ +package me.bush.eventbuskotlin + +import kotlinx.coroutines.launch +import java.util.concurrent.CopyOnWriteArrayList +import kotlin.reflect.KClass + +/** + * A class for storing and handling listeners of the same [type]. + * + * @author bush + * @since 1.0.0 + */ +internal class ListenerGroup( + private val type: KClass<*>, + private val config: Config +) { + private val cancelledState = CancelledState(type, config) + val sequential = CopyOnWriteArrayList<Listener>() + val parallel = CopyOnWriteArrayList<Listener>() + + /** + * Registers a listener. + * + * @return `false` if [listener] was already registered, `true` otherwise. + */ + fun register(listener: Listener): Boolean = listWith(listener) { + var position = it.size + // Sorted insertion and duplicates check + it.forEachIndexed { index, other -> + if (listener == other) return false + if (listener.priority > other.priority && position == it.size) { + position = index + } + } + it.add(position, listener) + true + } + + /** + * Unregisters a listener. + * + * @return `false` if [listener] was not already registered, `true` otherwise. + */ + fun unregister(listener: Listener): Boolean = listWith(listener) { + it.remove(listener) + } + + /** + * Posts an event to every listener in this group. [event] must be the same type as [type]. + * + * @return `true` if [event] was cancelled by sequential listeners, `false` otherwise. + */ + fun post(event: Any): Boolean { + sequential.forEach { + if (it.receiveCancelled || !cancelledState.isCancelled(event)) { + it.listener(event) + } + } + // We check this once, because parallel listener order is not guaranteed. + val cancelled = cancelledState.isCancelled(event) + if (parallel.isNotEmpty()) { + parallel.forEach { + if (it.receiveCancelled || !cancelled) { + config.parallelScope.launch { + it.listener(event) + } + } + } + } + return cancelled + } + + /** + * Convenience method to perform an action on the list [listener] belongs to within a synchronized block. + * + * We are synchronizing a COWArrayList because multiple concurrent mutations can cause IndexOutOfBoundsException. + * If we were to use a normal ArrayList we would have to synchronize [post], however posting performance + * is much more valuable than [register]/[unregister] performance. + */ + private inline fun <R> listWith(listener: Listener, block: (MutableList<Listener>) -> R): R { + return (if (listener.parallel) parallel else sequential).let { + synchronized(it) { + block(it) + } + } + } + + override fun toString() = "${type.simpleName}: ${sequential.size}, ${parallel.size}" +} diff --git a/src/main/kotlin/me/bush/illnamethislater/ReflectUtil.kt b/src/main/kotlin/me/bush/eventbuskotlin/Util.kt index 7f3618c..edad824 100644 --- a/src/main/kotlin/me/bush/illnamethislater/ReflectUtil.kt +++ b/src/main/kotlin/me/bush/eventbuskotlin/Util.kt @@ -1,16 +1,16 @@ -package me.bush.illnamethislater +package me.bush.eventbuskotlin +import org.apache.logging.log4j.LogManager import kotlin.reflect.KCallable import kotlin.reflect.KClass -import kotlin.reflect.full.allSuperclasses -import kotlin.reflect.full.declaredMembers -import kotlin.reflect.full.valueParameters -import kotlin.reflect.full.withNullability +import kotlin.reflect.full.* import kotlin.reflect.jvm.isAccessible import kotlin.reflect.typeOf // by bush, unchanged since 1.0.0 +internal val LOGGER = LogManager.getLogger("EventBus") + /** * Using [KClass.members] only returns public members, and using [KClass.declaredMembers] * doesn't return inherited members. This returns all members, private and inherited. @@ -30,7 +30,7 @@ internal val <T : Any> KClass<T>.allMembers * or requires certain arguments. (instanceParameter exists, but it will throw if it is an argument) * I thought maybe I was using the wrong methods, but apart from KProperty#get, (which is only for * properties, and only accepts arguments of type `Nothing` when `T` is star projected or covariant) - * I could not find any other way to do this, not even on StackOverFlow. + * I could not find any other way to do this, not even on StackOverflow. * * Funny how this solution is 1/10th the lines and always works. */ @@ -40,7 +40,7 @@ internal fun <R> KCallable<R>.handleCall(receiver: Any? = null): R { } /* -internal val KCallable<*>.isJvmStatic +private val KCallable<*>.isJvmStatic get() = when (this) { is KFunction -> Modifier.isStatic(javaMethod?.modifiers ?: 0) is KProperty -> this.javaGetter == null && Modifier.isStatic(javaField?.modifiers ?: 0) @@ -52,9 +52,26 @@ internal val KCallable<*>.isJvmStatic * Finds all members of return type [Listener]. (properties and methods) */ @Suppress("UNCHECKED_CAST") // This cannot fail -internal inline val KClass<*>.listeners - // Force nullability to false, so this will detect listeners in Java - // with "!" nullability. Also make sure there are no parameters. +private inline val KClass<*>.listeners get() = allMembers.filter { + // Set nullability to false, so this will detect listeners in Java + // with "!" nullability. Also make sure there are no parameters. it.returnType.withNullability(false) == typeOf<Listener>() && it.valueParameters.isEmpty() } as Sequence<KCallable<Listener>> + +/** + * Finds all listeners in [subscriber]. + * + * @return A list of listeners belonging to [subscriber], or null if an exception is caught. + */ +internal fun getListeners(subscriber: Any, config: Config) = runCatching { + subscriber::class.listeners.filter { !config.annotationRequired || it.hasAnnotation<EventListener>() } + .map { member -> member.handleCall(subscriber).also { it.subscriber = subscriber } }.toList() +}.onFailure { config.logger.error("Unable to register listeners for subscriber $subscriber", it) }.getOrNull() + +/** + * An annotation that must be used to identify listeners if [Config.annotationRequired] is `true`. + * + * [Information and examples](https://github.com/therealbush/eventbus-kotlin#tododothething) + */ +annotation class EventListener diff --git a/src/main/kotlin/me/bush/illnamethislater/CancelledState.kt b/src/main/kotlin/me/bush/illnamethislater/CancelledState.kt deleted file mode 100644 index d6ff39d..0000000 --- a/src/main/kotlin/me/bush/illnamethislater/CancelledState.kt +++ /dev/null @@ -1,57 +0,0 @@ -package me.bush.illnamethislater - -import sun.misc.Unsafe -import java.lang.reflect.Modifier -import kotlin.reflect.KClass -import kotlin.reflect.KMutableProperty -import kotlin.reflect.full.declaredMembers -import kotlin.reflect.full.isSubclassOf -import kotlin.reflect.jvm.javaField -import kotlin.reflect.typeOf - -/** - * A simple SAM interface for determining if an event (or any class) is cancellable. - * - * @author bush - * @since 1.0.0 - */ -internal fun interface CancelledState { - - /** - * Returns whether [event] is cancelled or not. [event] should only ever be of the type - * that was passed to [CancelledState.of], **or this will crash.** - */ - fun isCancelled(event: Any): Boolean - - companion object { - private val UNSAFE = runCatching { - Unsafe::class.declaredMembers.single { it.name == "theUnsafe" }.handleCall() as Unsafe - }.getOrNull() // soy jvm - private val CANCELLED_NAMES = arrayOf("canceled", "cancelled") - private val NOT_CANCELLABLE = CancelledState { false } - private val OFFSETS = hashMapOf<KClass<*>, Long>() - - /** - * Creates a [CancelledState] object for events of class [type]. - */ - fun of(type: KClass<*>, config: Config): CancelledState { - // Default impl for our event class. - if (type.isSubclassOf(Event::class)) return CancelledState { (it as Event).cancelled } - // If compat is disabled. - if (!config.thirdPartyCompatibility) return NOT_CANCELLABLE - // Find a field named "cancelled" or "canceled" that is a boolean, and has a backing field. - type.allMembers.filter { it.name in CANCELLED_NAMES && it.returnType == typeOf<Boolean>() } - .filterIsInstance<KMutableProperty<*>>().filter { it.javaField != null }.toList().let { - if (it.isEmpty() || UNSAFE == null) return NOT_CANCELLABLE - if (it.size != 1) config.logger.warn("Multiple possible cancel fields found for event type $type") - it[0].javaField!!.let { field -> - if (Modifier.isStatic(field.modifiers)) OFFSETS[type] = UNSAFE.staticFieldOffset(field) - else OFFSETS[type] = UNSAFE.objectFieldOffset(field) - } - // This is the same speed as direct access, plus one JNI call and hashmap access. - // If you are familiar with C, this is essentially the same idea as pointers. - return CancelledState { event -> UNSAFE.getBoolean(event, OFFSETS[type]!!) } - } - } - } -} diff --git a/src/main/kotlin/me/bush/illnamethislater/EventBus.kt b/src/main/kotlin/me/bush/illnamethislater/EventBus.kt deleted file mode 100644 index ddc72ef..0000000 --- a/src/main/kotlin/me/bush/illnamethislater/EventBus.kt +++ /dev/null @@ -1,137 +0,0 @@ -package me.bush.illnamethislater - -import kotlin.reflect.KClass -import kotlin.reflect.full.hasAnnotation - -/** - * [A simple event dispatcher.](https://github.com/therealbush/eventbus-kotlin#tododothething) - * - * @author bush - * @since 1.0.0 - */ -class EventBus(private val config: Config = Config()) { - private val listeners = hashMapOf<KClass<*>, ListenerGroup>() - private val subscribers = hashMapOf<Any, List<Listener>>() - - /** - * Returns the current count of active subscribers. - */ - val subscriberCount get() = subscribers.size - - /** - * Returns the current count of all listeners, regardless of type. - */ - val listenerCount get() = listeners.values.sumOf { it.parallel.size + it.sequential.size } - - /** - * Searches [subscriber] for members that return [Listener] and registers them. - * - * This will not find top level listeners, use [register] instead. - * - * Returns `true` if [subscriber] was successfully subscribed, - * `false` if it was already subscribed, or could not be. - * - * [Information and examples](https://github.com/therealbush/eventbus-kotlin#tododothething) - */ - fun subscribe(subscriber: Any): Boolean { - return if (subscriber in subscribers) false - else runCatching { - // Keep a separate list just for this subscriber. - subscribers[subscriber] = subscriber::class.listeners - .filter { !config.annotationRequired || it.hasAnnotation<EventListener>() }.map { member -> - // Register listener to a group. - println("${member.name}, ${member.returnType}") - member.parameters.forEach { println(it) } - register(member.handleCall(subscriber).also { it.subscriber = subscriber }) - }.toList() - true - }.getOrElse { - config.logger.error("Unable to register listeners for subscriber $subscriber", it) - false - } - } - - /** - * Unregisters all listeners belonging to [subscriber]. - * - * This will not remove top level listeners, use [unregister] instead. - * - * Returns `true` if [subscriber] was subscribed. - * - * [Information and examples](https://github.com/therealbush/eventbus-kotlin#tododothething) - */ - fun unsubscribe(subscriber: Any): Boolean { - val contained = subscriber in subscribers - subscribers.remove(subscriber)?.forEach { unregister(it) } - return contained - } - - /** - * Registers a [Listener] to this [EventBus]. - * - * [Information and examples](https://github.com/therealbush/eventbus-kotlin#tododothething) - */ - fun register(listener: Listener): Listener { - listeners.computeIfAbsent(listener.type) { - ListenerGroup(it, config) - }.register(listener) - return listener - } - - /** - * Unregisters a [Listener] from this [EventBus]. Returns `true` if [Listener] was registered. - * - * [Information and examples](https://github.com/therealbush/eventbus-kotlin#tododothething) - */ - fun unregister(listener: Listener): Boolean { - return listeners[listener.type]?.let { - val contained = it.unregister(listener) - if (it.parallel.isEmpty() && it.sequential.isEmpty()) { - listeners.remove(listener.type) - } - contained - } ?: false - } - - /** - * Posts an [event] to every listener that accepts its type. - * - * Events are **not** queued: only listeners subscribed currently will be called. - * - * If [event] is a subclass of [Event], or has a field-backed mutable boolean property - * named "cancelled" or "canceled" and [Config.thirdPartyCompatibility] is `true`, - * it can be cancelled by a listener, and only future listeners with [Listener.receiveCancelled] - * will receive it. - * - * Sequential listeners are called in the order of [Listener.priority], and parallel - * listeners are called before or after, depending on the value of [Config.parallelFirst]. - * - * [Information and examples](https://github.com/therealbush/eventbus-kotlin#tododothething) - */ - fun post(event: Any) = listeners[event::class]?.post(event) ?: false - - /** - * Logs the subscriber count, total listener count, and listener count for every event type with at - * least one subscriber to [Config.logger]. Per-event counts are sorted from greatest to least listeners. - * - * **This may cause a [ConcurrentModificationException] if [register] or [subscribe] is called in parallel.** - * - * [Information and examples](https://github.com/therealbush/eventbus-kotlin#tododothething) - * ``` - * Subscribers: 5 - * Listeners: 8 sequential, 21 parallel - * BushIsSoCool: 4, 9 - * OtherEvent: 1, 10 - * String: 3, 0 - */ - fun debugInfo() { - config.logger.info("Subscribers: ${subscribers.keys.size}") - val sequential = listeners.values.sumOf { it.sequential.size } - val parallel = listeners.values.sumOf { it.parallel.size } - config.logger.info("Listeners: $sequential sequential, $parallel parallel") - listeners.values.sortedByDescending { it.sequential.size + it.parallel.size }.forEach { - config.logger.info(it.toString()) - } - } -} - diff --git a/src/main/kotlin/me/bush/illnamethislater/EventListener.kt b/src/main/kotlin/me/bush/illnamethislater/EventListener.kt deleted file mode 100644 index e96b1db..0000000 --- a/src/main/kotlin/me/bush/illnamethislater/EventListener.kt +++ /dev/null @@ -1,6 +0,0 @@ -package me.bush.illnamethislater - -/** - * todo docs - */ -annotation class EventListener diff --git a/src/main/kotlin/me/bush/illnamethislater/ListenerGroup.kt b/src/main/kotlin/me/bush/illnamethislater/ListenerGroup.kt deleted file mode 100644 index 24a1ba3..0000000 --- a/src/main/kotlin/me/bush/illnamethislater/ListenerGroup.kt +++ /dev/null @@ -1,65 +0,0 @@ -package me.bush.illnamethislater - -import kotlinx.coroutines.launch -import java.util.concurrent.CopyOnWriteArrayList -import kotlin.reflect.KClass - -/** - * A class for storing and handling listeners. - * - * @author bush - * @since 1.0.0 - */ -internal class ListenerGroup( - private val type: KClass<*>, - private val config: Config -) { - private val cancelledState = CancelledState.of(type, config) - val sequential = CopyOnWriteArrayList<Listener>() - val parallel = CopyOnWriteArrayList<Listener>() - - /** - * Adds [listener] to this [ListenerGroup], and sorts its list. - */ - fun register(listener: Listener) { - (if (listener.parallel) parallel else sequential).let { - if (it.addIfAbsent(listener)) { - it.sortedByDescending(Listener::priority) - } - } - } - - /** - * Removes [listener] from this [ListenerGroup]. - */ - fun unregister(listener: Listener): Boolean { - return if (listener.parallel) parallel.remove(listener) - else sequential.remove(listener) - } - - /** - * Posts an event to every listener. Returns true of the event was cancelled. - */ - fun post(event: Any): Boolean { - sequential.forEach { - if (it.receiveCancelled || !cancelledState.isCancelled(event)) { - it.listener(event) - } - } - if (parallel.isNotEmpty()) { - // We check this once, because listener order is not guaranteed. - val cancelled = cancelledState.isCancelled(event) - // Credit to KB for the idea - parallel.forEach { - if (it.receiveCancelled || !cancelled) { - config.parallelScope.launch { - it.listener(event) - } - } - } - } - return cancelledState.isCancelled(event) - } - - override fun toString() = "${type.simpleName}: ${sequential.size}, ${parallel.size}" -} diff --git a/src/test/java/JavaTest.java b/src/test/java/JavaTest.java index 8518315..7d6f192 100644 --- a/src/test/java/JavaTest.java +++ b/src/test/java/JavaTest.java @@ -1,5 +1,5 @@ -import me.bush.illnamethislater.EventBus; -import me.bush.illnamethislater.Listener; +import me.bush.eventbuskotlin.Event;import me.bush.eventbuskotlin.EventBus; +import me.bush.eventbuskotlin.Listener; import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -10,32 +10,17 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; import org.junit.jupiter.api.TestInstance.Lifecycle; -import static me.bush.illnamethislater.ListenerKt.listener; +import static me.bush.eventbuskotlin.ListenerKt.listener; /** - * I was getting NCDFE when trying to load this class - * from the other test and I don't care enough to fix it. - * * @author bush * @since 1.0.0 */ @TestInstance(Lifecycle.PER_CLASS) public class JavaTest { - public static Listener someStaticListenerField = listener(SimpleEvent.class, event -> { - event.setCount(event.getCount() + 1); - }); private final Logger logger = LogManager.getLogger(); - public Listener someInstanceListenerField = listener(SimpleEvent.class, event -> { - event.setCount(event.getCount() + 1); - }); private EventBus eventBus; - public static Listener someStaticListenerMethod() { - return listener(SimpleEvent.class, event -> { - event.setCount(event.getCount() + 1); - }); - } - @BeforeAll public void setup() { Configurator.setRootLevel(Level.ALL); @@ -52,9 +37,23 @@ public class JavaTest { Assertions.assertEquals(event.getCount(), 4); } + public Listener someInstanceListenerField = listener(SimpleEvent.class, event -> { + event.setCount(event.getCount() + 1); + }); + + public static Listener someStaticListenerField = listener(SimpleEvent.class, event -> { + event.setCount(event.getCount() + 1); + }); + public Listener someInstanceListenerMethod() { return listener(SimpleEvent.class, event -> { event.setCount(event.getCount() + 1); }); } -} + + public static Listener someStaticListenerMethod() { + return listener(SimpleEvent.class, event -> { + event.setCount(event.getCount() + 1); + }); + } + } diff --git a/src/test/kotlin/KotlinTest.kt b/src/test/kotlin/KotlinTest.kt index 3670703..136e7c8 100644 --- a/src/test/kotlin/KotlinTest.kt +++ b/src/test/kotlin/KotlinTest.kt @@ -1,7 +1,7 @@ import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.runBlocking -import me.bush.illnamethislater.* +import kotlinx.coroutines.launch +import me.bush.eventbuskotlin.* import org.apache.logging.log4j.Level import org.apache.logging.log4j.LogManager import org.apache.logging.log4j.core.config.Configurator @@ -9,6 +9,7 @@ import org.junit.jupiter.api.Assertions import org.junit.jupiter.api.BeforeAll import org.junit.jupiter.api.Test import org.junit.jupiter.api.TestInstance +import java.util.concurrent.atomic.AtomicInteger import kotlin.random.Random /** @@ -41,7 +42,7 @@ class KotlinTest { //////////////////////////////////////////////////////////////////////////////////////////////////////////////// @Test - fun `test listener priority and ability to cancel events or receive cancelled events`() { + fun `test priority and ability to cancel events or receive cancelled events`() { eventBus.subscribe(this) val event = SimpleEvent() eventBus.post(event) @@ -130,25 +131,75 @@ class KotlinTest { @Test fun `test parallel event posting`() { - runBlocking { - sus() + val listeners = mutableListOf<Listener>() + repeat(10) { + // Not sure what to test + listeners += listener<Unit>(parallel = true) { + logger.info("Thread:" + Thread.currentThread().name) + } } - } + listeners.forEach { eventBus.register(it) } + eventBus.post(Unit) + listeners.forEach { eventBus.unregister(it) } - suspend fun sus() { - println() - } + /* I'm not sure what else to test for this, but I'm also not really happy with parallel event posting yet. TODO + + [DefaultDispatcher-worker-5 @coroutine#5] INFO Kotlin Test - DefaultDispatcher-worker-5 @coroutine#5 + [DefaultDispatcher-worker-2 @coroutine#2] INFO Kotlin Test - DefaultDispatcher-worker-2 @coroutine#2 + [DefaultDispatcher-worker-1 @coroutine#1] INFO Kotlin Test - DefaultDispatcher-worker-1 @coroutine#1 + [DefaultDispatcher-worker-4 @coroutine#4] INFO Kotlin Test - DefaultDispatcher-worker-4 @coroutine#4 + [DefaultDispatcher-worker-7 @coroutine#7] INFO Kotlin Test - DefaultDispatcher-worker-7 @coroutine#7 + [DefaultDispatcher-worker-6 @coroutine#6] INFO Kotlin Test - DefaultDispatcher-worker-6 @coroutine#6 + [DefaultDispatcher-worker-8 @coroutine#8] INFO Kotlin Test - DefaultDispatcher-worker-8 @coroutine#8 + [DefaultDispatcher-worker-9 @coroutine#9] INFO Kotlin Test - DefaultDispatcher-worker-9 @coroutine#9 + [DefaultDispatcher-worker-3 @coroutine#3] INFO Kotlin Test - DefaultDispatcher-worker-3 @coroutine#3 + [DefaultDispatcher-worker-10 @coroutine#10] INFO Kotlin Test - DefaultDispatcher-worker-10 @coroutine#10 */ - fun sussy() { - println() } //////////////////////////////////////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////////////////////////////////////// @Test - fun `call every method on multiple threads concurrently to ensure no CME is thrown`() { + fun `test thread safety`() { + val listeners = mutableListOf<Listener>() + repeat(10) { + listeners += listener<Any>(parallel = false) { + doStuff() + } + listeners += listener<Any>(parallel = true) { + doStuff() + } + } + listeners.forEach { eventBus.register(it) } + eventBus.debug() + CoroutineScope(Dispatchers.Default).launch { + repeat(100) { + launch { + doStuff() + eventBus.post(Any()) + } + } + } + Thread.sleep(2000) + Assertions.assertEquals(2100, counter.get()) + listeners.forEach { eventBus.unregister(it) } + eventBus.unregister(dummy) + eventBus.unsubscribe(this) + // Should be empty + eventBus.debug() + } + + private val dummy = listener<Unit> {} + + private var counter = AtomicInteger() + private fun doStuff() { + eventBus.unsubscribe(this) + eventBus.subscribe(this) + eventBus.unregister(dummy) + eventBus.register(dummy) + counter.getAndIncrement() } //////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -181,6 +232,7 @@ class KotlinTest { eventBus.subscribe(this) eventBus.post(Unit) Assertions.assertTrue(called) + eventBus.unsubscribe(this) } var called = false |