aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authortherealbush <therealbush@users.noreply.github.com>2022-04-23 15:22:18 -0700
committertherealbush <therealbush@users.noreply.github.com>2022-04-23 15:22:18 -0700
commitc12696bd528e891b9da126d3b92f3db089b4b6e6 (patch)
tree6683284c29f6dd7b9d65fae881527e05b54b3a84 /src
parent1cb96200f8844e9192f55037cfc11bfb42e1d94f (diff)
downloadeventbus-kotlin-c12696bd528e891b9da126d3b92f3db089b4b6e6.tar.gz
eventbus-kotlin-c12696bd528e891b9da126d3b92f3db089b4b6e6.tar.bz2
eventbus-kotlin-c12696bd528e891b9da126d3b92f3db089b4b6e6.zip
some changes
Diffstat (limited to 'src')
-rw-r--r--src/main/kotlin/me/bush/eventbuskotlin/CancelledState.kt70
-rw-r--r--src/main/kotlin/me/bush/eventbuskotlin/Config.kt (renamed from src/main/kotlin/me/bush/illnamethislater/Config.kt)21
-rw-r--r--src/main/kotlin/me/bush/eventbuskotlin/Event.kt (renamed from src/main/kotlin/me/bush/illnamethislater/Event.kt)8
-rw-r--r--src/main/kotlin/me/bush/eventbuskotlin/EventBus.kt111
-rw-r--r--src/main/kotlin/me/bush/eventbuskotlin/Listener.kt (renamed from src/main/kotlin/me/bush/illnamethislater/Listener.kt)25
-rw-r--r--src/main/kotlin/me/bush/eventbuskotlin/ListenerGroup.kt89
-rw-r--r--src/main/kotlin/me/bush/eventbuskotlin/Util.kt (renamed from src/main/kotlin/me/bush/illnamethislater/ReflectUtil.kt)37
-rw-r--r--src/main/kotlin/me/bush/illnamethislater/CancelledState.kt57
-rw-r--r--src/main/kotlin/me/bush/illnamethislater/EventBus.kt137
-rw-r--r--src/main/kotlin/me/bush/illnamethislater/EventListener.kt6
-rw-r--r--src/main/kotlin/me/bush/illnamethislater/ListenerGroup.kt65
-rw-r--r--src/test/java/JavaTest.java37
-rw-r--r--src/test/kotlin/KotlinTest.kt76
13 files changed, 410 insertions, 329 deletions
diff --git a/src/main/kotlin/me/bush/eventbuskotlin/CancelledState.kt b/src/main/kotlin/me/bush/eventbuskotlin/CancelledState.kt
new file mode 100644
index 0000000..861b9fd
--- /dev/null
+++ b/src/main/kotlin/me/bush/eventbuskotlin/CancelledState.kt
@@ -0,0 +1,70 @@
+package me.bush.eventbuskotlin
+
+import sun.misc.Unsafe
+import java.lang.reflect.Modifier
+import java.util.concurrent.ConcurrentHashMap
+import kotlin.reflect.KClass
+import kotlin.reflect.KMutableProperty
+import kotlin.reflect.full.declaredMembers
+import kotlin.reflect.full.isSubclassOf
+import kotlin.reflect.full.withNullability
+import kotlin.reflect.jvm.javaField
+import kotlin.reflect.typeOf
+
+/**
+ * A simple SAM interface for determining if an event (or any class) is cancellable.
+ *
+ * [Information and examples](https://github.com/therealbush/eventbus-kotlin#tododothething)
+ *
+ * @author bush
+ * @since 1.0.0
+ */
+internal fun interface CancelledState {
+
+ /**
+ * [event] should only ever be of the type that was passed
+ * to [CancelledState.invoke], **or this will throw.**
+ *
+ * [Information and examples](https://github.com/therealbush/eventbus-kotlin#tododothething)
+ *
+ * @return `true` if [event] is cancelled, `false` otherwise.
+ */
+ fun isCancelled(event: Any): Boolean
+
+ companion object {
+ private val UNSAFE = runCatching {
+ Unsafe::class.declaredMembers.single { it.name == "theUnsafe" }.handleCall() as Unsafe
+ }.onFailure {
+ LOGGER.error("Could not obtain Unsafe instance. Will not be able to determine external cancel state.")
+ }.getOrNull() // soy jvm
+ private val NAMES = arrayOf("canceled", "cancelled")
+ private val NOT_CANCELLABLE = CancelledState { false }
+ private val CACHE = ConcurrentHashMap<KClass<*>, CancelledState>()
+
+ /**
+ * Creates a [CancelledState] object for events of class [type].
+ *
+ * [Information and examples](https://github.com/therealbush/eventbus-kotlin#tododothething)
+ */
+ operator fun invoke(type: KClass<*>, config: Config): CancelledState = CACHE.getOrPut(type) {
+ // Default implementation for our event class.
+ if (type.isSubclassOf(Event::class)) CancelledState { (it as Event).cancelled }
+ // If compatibility is disabled.
+ else if (!config.thirdPartyCompatibility) NOT_CANCELLABLE
+ // Find a field named "cancelled" or "canceled" that is a boolean, and has a backing field.
+ else type.allMembers.filter { it.name in NAMES && it.returnType.withNullability(false) == typeOf<Boolean>() }
+ .filterIsInstance<KMutableProperty<*>>().filter { it.javaField != null }.toList().let {
+ if (it.isEmpty() || UNSAFE == null) NOT_CANCELLABLE else {
+ if (it.size != 1) config.logger.warn("Multiple possible cancel fields found for event $type")
+ val offset = it[0].javaField!!.let { field ->
+ if (Modifier.isStatic(field.modifiers))
+ UNSAFE.staticFieldOffset(field)
+ else UNSAFE.objectFieldOffset(field)
+ }
+ // This is the same speed as direct access, plus one JNI call.
+ CancelledState { event -> UNSAFE.getBoolean(event, offset) }
+ }
+ }
+ }
+ }
+}
diff --git a/src/main/kotlin/me/bush/illnamethislater/Config.kt b/src/main/kotlin/me/bush/eventbuskotlin/Config.kt
index 31c2b63..f0e1120 100644
--- a/src/main/kotlin/me/bush/illnamethislater/Config.kt
+++ b/src/main/kotlin/me/bush/eventbuskotlin/Config.kt
@@ -1,10 +1,8 @@
-package me.bush.illnamethislater
+package me.bush.eventbuskotlin
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
-import org.apache.logging.log4j.LogManager
import org.apache.logging.log4j.Logger
-import kotlin.coroutines.CoroutineContext
/**
@@ -18,15 +16,17 @@ import kotlin.coroutines.CoroutineContext
data class Config(
/**
- * The logger this [EventBus] will use to log errors, or log [EventBus.debugInfo]
+ * The logger this [EventBus] will use to log errors, or [EventBus.debug]
+ *
+ * [Information and examples](https://github.com/therealbush/eventbus-kotlin#tododothething)
*/
- val logger: Logger = LogManager.getLogger("Eventbus"),
+ val logger: Logger = LOGGER,
/**
- * The [CoroutineContext] to use when posting events to parallel listeners. The default
- * value will work just fine, but you can specify a custom context if desired.
+ * The [CoroutineScope] to use when posting events to parallel listeners. The default
+ * value will work just fine, but you can specify a custom scope if desired.
*
- * [What is a Coroutine Context?](https://kotlinlang.org/docs/coroutine-context-and-dispatchers.html)
+ * [What is a Coroutine?](https://kotlinlang.org/docs/coroutines-overview.html)
*
* [Information and examples](https://github.com/therealbush/eventbus-kotlin#tododothething)
*/
@@ -41,7 +41,10 @@ data class Config(
val thirdPartyCompatibility: Boolean = true,
/**
- * todo doc
+ * Whether listeners need to be annotated with [EventListener] to be subscribed to this [EventBus].
+ * This has no effect on anything else, and is just to improve code readability.
+ *
+ * [Information and examples](https://github.com/therealbush/eventbus-kotlin#tododothething)
*/
val annotationRequired: Boolean = false
)
diff --git a/src/main/kotlin/me/bush/illnamethislater/Event.kt b/src/main/kotlin/me/bush/eventbuskotlin/Event.kt
index 2881c0e..50f1131 100644
--- a/src/main/kotlin/me/bush/illnamethislater/Event.kt
+++ b/src/main/kotlin/me/bush/eventbuskotlin/Event.kt
@@ -1,4 +1,4 @@
-package me.bush.illnamethislater
+package me.bush.eventbuskotlin
/**
* A base class for events that can be cancelled.
@@ -14,6 +14,8 @@ abstract class Event {
* Whether this event is cancelled or not. If it is, only future listeners with
* [Listener.receiveCancelled] will receive it. However, it can be set back to
* `false`, and listeners will be able to receive it again.
+ *
+ * [Information and examples](https://github.com/therealbush/eventbus-kotlin#tododothething)
*/
var cancelled = false
set(value) {
@@ -22,11 +24,15 @@ abstract class Event {
/**
* Determines if this event can be [cancelled]. This does not have to return a constant value.
+ *
+ * [Information and examples](https://github.com/therealbush/eventbus-kotlin#tododothething)
*/
protected abstract val cancellable: Boolean
/**
* Sets [cancelled] to true.
+ *
+ * [Information and examples](https://github.com/therealbush/eventbus-kotlin#tododothething)
*/
fun cancel() {
cancelled = true
diff --git a/src/main/kotlin/me/bush/eventbuskotlin/EventBus.kt b/src/main/kotlin/me/bush/eventbuskotlin/EventBus.kt
new file mode 100644
index 0000000..eeb887d
--- /dev/null
+++ b/src/main/kotlin/me/bush/eventbuskotlin/EventBus.kt
@@ -0,0 +1,111 @@
+package me.bush.eventbuskotlin
+
+import kotlinx.coroutines.launch
+import java.util.concurrent.ConcurrentHashMap
+import kotlin.reflect.KClass
+
+/**
+ * [A simple, thread safe, and fast event dispatcher for Kotlin/JVM and Java.](https://github.com/therealbush/eventbus-kotlin)
+ *
+ * @author bush
+ * @since 1.0.0
+ */
+class EventBus(private val config: Config = Config()) {
+ private val listeners = ConcurrentHashMap<KClass<*>, ListenerGroup>()
+ private val subscribers = ConcurrentHashMap.newKeySet<Any>()
+ private val cache = ConcurrentHashMap<Any, List<Listener>?>()
+
+ /**
+ * Searches [subscriber] for members that return [Listener] and registers them, if
+ * [Config.annotationRequired] is false, or they are annotated with [EventListener].
+ *
+ * This will not find top level members, use [register] instead.
+ *
+ * [Information and examples](https://github.com/therealbush/eventbus-kotlin#tododothething)
+ *
+ * @return `true` if [subscriber] was successfully subscribed,
+ * `false` if it was already subscribed, or could not be.
+ */
+ fun subscribe(subscriber: Any): Boolean = subscribers.add(subscriber).also {
+ if (it) cache.computeIfAbsent(subscriber) {
+ getListeners(subscriber, config)
+ }?.forEach(::register) ?: return false
+ }
+
+ /**
+ * Unregisters all listeners belonging to [subscriber].
+ *
+ * This will not remove top level listeners, use [unregister] instead.
+ *
+ * [Information and examples](https://github.com/therealbush/eventbus-kotlin#tododothething)
+ *
+ * @return `true` if [subscriber] was successfully unsubscribed, `false` if it was not subscribed.
+ */
+ fun unsubscribe(subscriber: Any): Boolean = subscribers.remove(subscriber).also {
+ if (it) cache[subscriber]?.forEach(::unregister)
+ }
+
+ /**
+ * Registers a [Listener] to this [EventBus].
+ *
+ * [Information and examples](https://github.com/therealbush/eventbus-kotlin#tododothething)
+ */
+ fun register(listener: Listener): Boolean = listeners.computeIfAbsent(listener.type) {
+ ListenerGroup(it, config)
+ }.register(listener)
+
+ /**
+ * Unregisters a [Listener] from this [EventBus]. Returns `true` if [Listener] was registered.
+ *
+ * [Information and examples](https://github.com/therealbush/eventbus-kotlin#tododothething)
+ */
+ fun unregister(listener: Listener): Boolean = listeners[listener.type]?.let {
+ val contained = it.unregister(listener)
+ if (it.parallel.isEmpty() && it.sequential.isEmpty()) {
+ listeners.remove(listener.type)
+ }
+ contained
+ } ?: false
+
+ /**
+ * Posts an [event] to every listener that accepts its type.
+ *
+ * Events are **not** queued: only listeners currently subscribed will be called.
+ *
+ * If [event] is a subclass of [Event], or has a field-backed mutable boolean property
+ * named "cancelled" or "canceled" and [Config.thirdPartyCompatibility] is `true`,
+ * only future listeners with [Listener.receiveCancelled] will receive [event]
+ * while that property is `true`.
+ *
+ * Sequential listeners are called in the order of [Listener.priority], and parallel listeners
+ * are called after using [launch]. This method will not wait for parallel listeners to complete.
+ *
+ * [Information and examples](https://github.com/therealbush/eventbus-kotlin#tododothething)
+ */
+ fun post(event: Any): Boolean = listeners[event::class]?.post(event) ?: false
+
+ /**
+ * Logs the subscriber count, total listener count, and listener count for every event type with at
+ * least one subscriber to [Config.logger]. Per-event counts are sorted from greatest to least listeners.
+ *
+ * **This may cause a [ConcurrentModificationException] if [register] or [subscribe] is called in parallel.**
+ *
+ * [Information and examples](https://github.com/therealbush/eventbus-kotlin#tododothething)
+ * ```
+ * Subscribers: 5
+ * Listeners: 8 sequential, 21 parallel
+ * BushIsSoCool: 4, 9
+ * OtherEvent: 1, 10
+ * String: 3, 0
+ */
+ fun debug() {
+ config.logger.info("Subscribers: ${subscribers.size}")
+ val sequential = listeners.values.sumOf { it.sequential.size }
+ val parallel = listeners.values.sumOf { it.parallel.size }
+ config.logger.info("Listeners: $sequential sequential, $parallel parallel")
+ listeners.values.sortedByDescending { it.sequential.size + it.parallel.size }.forEach {
+ config.logger.info(it.toString())
+ }
+ }
+}
+
diff --git a/src/main/kotlin/me/bush/illnamethislater/Listener.kt b/src/main/kotlin/me/bush/eventbuskotlin/Listener.kt
index c9251d2..bb78b75 100644
--- a/src/main/kotlin/me/bush/illnamethislater/Listener.kt
+++ b/src/main/kotlin/me/bush/eventbuskotlin/Listener.kt
@@ -1,4 +1,4 @@
-package me.bush.illnamethislater
+package me.bush.eventbuskotlin
import java.util.function.Consumer
import kotlin.reflect.KClass
@@ -20,8 +20,7 @@ class Listener @PublishedApi internal constructor(
internal val receiveCancelled: Boolean = false
) {
@Suppress("UNCHECKED_CAST")
- // Generics have no benefit here,
- // it is easier just to force cast.
+ // Generics have no benefit here.
internal val listener = listener as (Any) -> Unit
internal var subscriber: Any? = null
}
@@ -33,11 +32,11 @@ class Listener @PublishedApi internal constructor(
*
* [Information and examples](https://github.com/therealbush/eventbus-kotlin#tododothething)
*
- * @param T The **exact** (no inheritance) type of event to listen for.
- * @param priority The priority of this listener, high to low.
- * @param parallel If a listener should be invoked in parallel with other parallel listeners, or sequentially.
+ * @param T The **exact** (no inheritance) type of event to listen for.
+ * @param priority The priority of this listener, high to low.
+ * @param parallel If a listener should be invoked in parallel with other parallel listeners, or sequentially.
* @param receiveCancelled If a listener should receive events that have been cancelled by previous listeners.
- * @param listener The body of the listener that will be invoked.
+ * @param listener The body of the listener that will be invoked.
*/
inline fun <reified T : Any> listener(
priority: Int = 0,
@@ -47,7 +46,7 @@ inline fun <reified T : Any> listener(
) = Listener(listener, T::class, priority, parallel, receiveCancelled)
/**
- * **This function is intended for use in Java code.**
+ * **This function is intended for use in Java only.**
*
* Creates a listener that can be held in a variable or returned from a function
* or getter belonging to an object to be subscribed with [EventBus.subscribe],
@@ -55,11 +54,11 @@ inline fun <reified T : Any> listener(
*
* [Information and examples](https://github.com/therealbush/eventbus-kotlin#tododothething)
*
- * @param type The **exact** (no inheritance) type of event to listen for.
- * @param priority The priority of this listener, high to low.
- * @param parallel If a listener should be invoked in parallel with other parallel listeners, or sequentially.
+ * @param type The **exact** (no inheritance) type of event to listen for.
+ * @param priority The priority of this listener, high to low.
+ * @param parallel If a listener should be invoked in parallel with other parallel listeners, or sequentially.
* @param receiveCancelled If a listener should receive events that have been cancelled by previous listeners.
- * @param listener The body of the listener that will be invoked.
+ * @param listener The body of the listener that will be invoked.
*/
@JvmOverloads
fun <T : Any> listener(
@@ -68,6 +67,6 @@ fun <T : Any> listener(
parallel: Boolean = false,
receiveCancelled: Boolean = false,
// This might introduce some overhead, but its worth
- // not manually having to return "Kotlin.UNIT" from every Java listener.
+ // not manually having to return "Unit.INSTANCE" from every Java listener.
listener: Consumer<T>
) = Listener(listener::accept, type.kotlin, priority, parallel, receiveCancelled)
diff --git a/src/main/kotlin/me/bush/eventbuskotlin/ListenerGroup.kt b/src/main/kotlin/me/bush/eventbuskotlin/ListenerGroup.kt
new file mode 100644
index 0000000..d4e9697
--- /dev/null
+++ b/src/main/kotlin/me/bush/eventbuskotlin/ListenerGroup.kt
@@ -0,0 +1,89 @@
+package me.bush.eventbuskotlin
+
+import kotlinx.coroutines.launch
+import java.util.concurrent.CopyOnWriteArrayList
+import kotlin.reflect.KClass
+
+/**
+ * A class for storing and handling listeners of the same [type].
+ *
+ * @author bush
+ * @since 1.0.0
+ */
+internal class ListenerGroup(
+ private val type: KClass<*>,
+ private val config: Config
+) {
+ private val cancelledState = CancelledState(type, config)
+ val sequential = CopyOnWriteArrayList<Listener>()
+ val parallel = CopyOnWriteArrayList<Listener>()
+
+ /**
+ * Registers a listener.
+ *
+ * @return `false` if [listener] was already registered, `true` otherwise.
+ */
+ fun register(listener: Listener): Boolean = listWith(listener) {
+ var position = it.size
+ // Sorted insertion and duplicates check
+ it.forEachIndexed { index, other ->
+ if (listener == other) return false
+ if (listener.priority > other.priority && position == it.size) {
+ position = index
+ }
+ }
+ it.add(position, listener)
+ true
+ }
+
+ /**
+ * Unregisters a listener.
+ *
+ * @return `false` if [listener] was not already registered, `true` otherwise.
+ */
+ fun unregister(listener: Listener): Boolean = listWith(listener) {
+ it.remove(listener)
+ }
+
+ /**
+ * Posts an event to every listener in this group. [event] must be the same type as [type].
+ *
+ * @return `true` if [event] was cancelled by sequential listeners, `false` otherwise.
+ */
+ fun post(event: Any): Boolean {
+ sequential.forEach {
+ if (it.receiveCancelled || !cancelledState.isCancelled(event)) {
+ it.listener(event)
+ }
+ }
+ // We check this once, because parallel listener order is not guaranteed.
+ val cancelled = cancelledState.isCancelled(event)
+ if (parallel.isNotEmpty()) {
+ parallel.forEach {
+ if (it.receiveCancelled || !cancelled) {
+ config.parallelScope.launch {
+ it.listener(event)
+ }
+ }
+ }
+ }
+ return cancelled
+ }
+
+ /**
+ * Convenience method to perform an action on the list [listener] belongs to within a synchronized block.
+ *
+ * We are synchronizing a COWArrayList because multiple concurrent mutations can cause IndexOutOfBoundsException.
+ * If we were to use a normal ArrayList we would have to synchronize [post], however posting performance
+ * is much more valuable than [register]/[unregister] performance.
+ */
+ private inline fun <R> listWith(listener: Listener, block: (MutableList<Listener>) -> R): R {
+ return (if (listener.parallel) parallel else sequential).let {
+ synchronized(it) {
+ block(it)
+ }
+ }
+ }
+
+ override fun toString() = "${type.simpleName}: ${sequential.size}, ${parallel.size}"
+}
diff --git a/src/main/kotlin/me/bush/illnamethislater/ReflectUtil.kt b/src/main/kotlin/me/bush/eventbuskotlin/Util.kt
index 7f3618c..edad824 100644
--- a/src/main/kotlin/me/bush/illnamethislater/ReflectUtil.kt
+++ b/src/main/kotlin/me/bush/eventbuskotlin/Util.kt
@@ -1,16 +1,16 @@
-package me.bush.illnamethislater
+package me.bush.eventbuskotlin
+import org.apache.logging.log4j.LogManager
import kotlin.reflect.KCallable
import kotlin.reflect.KClass
-import kotlin.reflect.full.allSuperclasses
-import kotlin.reflect.full.declaredMembers
-import kotlin.reflect.full.valueParameters
-import kotlin.reflect.full.withNullability
+import kotlin.reflect.full.*
import kotlin.reflect.jvm.isAccessible
import kotlin.reflect.typeOf
// by bush, unchanged since 1.0.0
+internal val LOGGER = LogManager.getLogger("EventBus")
+
/**
* Using [KClass.members] only returns public members, and using [KClass.declaredMembers]
* doesn't return inherited members. This returns all members, private and inherited.
@@ -30,7 +30,7 @@ internal val <T : Any> KClass<T>.allMembers
* or requires certain arguments. (instanceParameter exists, but it will throw if it is an argument)
* I thought maybe I was using the wrong methods, but apart from KProperty#get, (which is only for
* properties, and only accepts arguments of type `Nothing` when `T` is star projected or covariant)
- * I could not find any other way to do this, not even on StackOverFlow.
+ * I could not find any other way to do this, not even on StackOverflow.
*
* Funny how this solution is 1/10th the lines and always works.
*/
@@ -40,7 +40,7 @@ internal fun <R> KCallable<R>.handleCall(receiver: Any? = null): R {
}
/*
-internal val KCallable<*>.isJvmStatic
+private val KCallable<*>.isJvmStatic
get() = when (this) {
is KFunction -> Modifier.isStatic(javaMethod?.modifiers ?: 0)
is KProperty -> this.javaGetter == null && Modifier.isStatic(javaField?.modifiers ?: 0)
@@ -52,9 +52,26 @@ internal val KCallable<*>.isJvmStatic
* Finds all members of return type [Listener]. (properties and methods)
*/
@Suppress("UNCHECKED_CAST") // This cannot fail
-internal inline val KClass<*>.listeners
- // Force nullability to false, so this will detect listeners in Java
- // with "!" nullability. Also make sure there are no parameters.
+private inline val KClass<*>.listeners
get() = allMembers.filter {
+ // Set nullability to false, so this will detect listeners in Java
+ // with "!" nullability. Also make sure there are no parameters.
it.returnType.withNullability(false) == typeOf<Listener>() && it.valueParameters.isEmpty()
} as Sequence<KCallable<Listener>>
+
+/**
+ * Finds all listeners in [subscriber].
+ *
+ * @return A list of listeners belonging to [subscriber], or null if an exception is caught.
+ */
+internal fun getListeners(subscriber: Any, config: Config) = runCatching {
+ subscriber::class.listeners.filter { !config.annotationRequired || it.hasAnnotation<EventListener>() }
+ .map { member -> member.handleCall(subscriber).also { it.subscriber = subscriber } }.toList()
+}.onFailure { config.logger.error("Unable to register listeners for subscriber $subscriber", it) }.getOrNull()
+
+/**
+ * An annotation that must be used to identify listeners if [Config.annotationRequired] is `true`.
+ *
+ * [Information and examples](https://github.com/therealbush/eventbus-kotlin#tododothething)
+ */
+annotation class EventListener
diff --git a/src/main/kotlin/me/bush/illnamethislater/CancelledState.kt b/src/main/kotlin/me/bush/illnamethislater/CancelledState.kt
deleted file mode 100644
index d6ff39d..0000000
--- a/src/main/kotlin/me/bush/illnamethislater/CancelledState.kt
+++ /dev/null
@@ -1,57 +0,0 @@
-package me.bush.illnamethislater
-
-import sun.misc.Unsafe
-import java.lang.reflect.Modifier
-import kotlin.reflect.KClass
-import kotlin.reflect.KMutableProperty
-import kotlin.reflect.full.declaredMembers
-import kotlin.reflect.full.isSubclassOf
-import kotlin.reflect.jvm.javaField
-import kotlin.reflect.typeOf
-
-/**
- * A simple SAM interface for determining if an event (or any class) is cancellable.
- *
- * @author bush
- * @since 1.0.0
- */
-internal fun interface CancelledState {
-
- /**
- * Returns whether [event] is cancelled or not. [event] should only ever be of the type
- * that was passed to [CancelledState.of], **or this will crash.**
- */
- fun isCancelled(event: Any): Boolean
-
- companion object {
- private val UNSAFE = runCatching {
- Unsafe::class.declaredMembers.single { it.name == "theUnsafe" }.handleCall() as Unsafe
- }.getOrNull() // soy jvm
- private val CANCELLED_NAMES = arrayOf("canceled", "cancelled")
- private val NOT_CANCELLABLE = CancelledState { false }
- private val OFFSETS = hashMapOf<KClass<*>, Long>()
-
- /**
- * Creates a [CancelledState] object for events of class [type].
- */
- fun of(type: KClass<*>, config: Config): CancelledState {
- // Default impl for our event class.
- if (type.isSubclassOf(Event::class)) return CancelledState { (it as Event).cancelled }
- // If compat is disabled.
- if (!config.thirdPartyCompatibility) return NOT_CANCELLABLE
- // Find a field named "cancelled" or "canceled" that is a boolean, and has a backing field.
- type.allMembers.filter { it.name in CANCELLED_NAMES && it.returnType == typeOf<Boolean>() }
- .filterIsInstance<KMutableProperty<*>>().filter { it.javaField != null }.toList().let {
- if (it.isEmpty() || UNSAFE == null) return NOT_CANCELLABLE
- if (it.size != 1) config.logger.warn("Multiple possible cancel fields found for event type $type")
- it[0].javaField!!.let { field ->
- if (Modifier.isStatic(field.modifiers)) OFFSETS[type] = UNSAFE.staticFieldOffset(field)
- else OFFSETS[type] = UNSAFE.objectFieldOffset(field)
- }
- // This is the same speed as direct access, plus one JNI call and hashmap access.
- // If you are familiar with C, this is essentially the same idea as pointers.
- return CancelledState { event -> UNSAFE.getBoolean(event, OFFSETS[type]!!) }
- }
- }
- }
-}
diff --git a/src/main/kotlin/me/bush/illnamethislater/EventBus.kt b/src/main/kotlin/me/bush/illnamethislater/EventBus.kt
deleted file mode 100644
index ddc72ef..0000000
--- a/src/main/kotlin/me/bush/illnamethislater/EventBus.kt
+++ /dev/null
@@ -1,137 +0,0 @@
-package me.bush.illnamethislater
-
-import kotlin.reflect.KClass
-import kotlin.reflect.full.hasAnnotation
-
-/**
- * [A simple event dispatcher.](https://github.com/therealbush/eventbus-kotlin#tododothething)
- *
- * @author bush
- * @since 1.0.0
- */
-class EventBus(private val config: Config = Config()) {
- private val listeners = hashMapOf<KClass<*>, ListenerGroup>()
- private val subscribers = hashMapOf<Any, List<Listener>>()
-
- /**
- * Returns the current count of active subscribers.
- */
- val subscriberCount get() = subscribers.size
-
- /**
- * Returns the current count of all listeners, regardless of type.
- */
- val listenerCount get() = listeners.values.sumOf { it.parallel.size + it.sequential.size }
-
- /**
- * Searches [subscriber] for members that return [Listener] and registers them.
- *
- * This will not find top level listeners, use [register] instead.
- *
- * Returns `true` if [subscriber] was successfully subscribed,
- * `false` if it was already subscribed, or could not be.
- *
- * [Information and examples](https://github.com/therealbush/eventbus-kotlin#tododothething)
- */
- fun subscribe(subscriber: Any): Boolean {
- return if (subscriber in subscribers) false
- else runCatching {
- // Keep a separate list just for this subscriber.
- subscribers[subscriber] = subscriber::class.listeners
- .filter { !config.annotationRequired || it.hasAnnotation<EventListener>() }.map { member ->
- // Register listener to a group.
- println("${member.name}, ${member.returnType}")
- member.parameters.forEach { println(it) }
- register(member.handleCall(subscriber).also { it.subscriber = subscriber })
- }.toList()
- true
- }.getOrElse {
- config.logger.error("Unable to register listeners for subscriber $subscriber", it)
- false
- }
- }
-
- /**
- * Unregisters all listeners belonging to [subscriber].
- *
- * This will not remove top level listeners, use [unregister] instead.
- *
- * Returns `true` if [subscriber] was subscribed.
- *
- * [Information and examples](https://github.com/therealbush/eventbus-kotlin#tododothething)
- */
- fun unsubscribe(subscriber: Any): Boolean {
- val contained = subscriber in subscribers
- subscribers.remove(subscriber)?.forEach { unregister(it) }
- return contained
- }
-
- /**
- * Registers a [Listener] to this [EventBus].
- *
- * [Information and examples](https://github.com/therealbush/eventbus-kotlin#tododothething)
- */
- fun register(listener: Listener): Listener {
- listeners.computeIfAbsent(listener.type) {
- ListenerGroup(it, config)
- }.register(listener)
- return listener
- }
-
- /**
- * Unregisters a [Listener] from this [EventBus]. Returns `true` if [Listener] was registered.
- *
- * [Information and examples](https://github.com/therealbush/eventbus-kotlin#tododothething)
- */
- fun unregister(listener: Listener): Boolean {
- return listeners[listener.type]?.let {
- val contained = it.unregister(listener)
- if (it.parallel.isEmpty() && it.sequential.isEmpty()) {
- listeners.remove(listener.type)
- }
- contained
- } ?: false
- }
-
- /**
- * Posts an [event] to every listener that accepts its type.
- *
- * Events are **not** queued: only listeners subscribed currently will be called.
- *
- * If [event] is a subclass of [Event], or has a field-backed mutable boolean property
- * named "cancelled" or "canceled" and [Config.thirdPartyCompatibility] is `true`,
- * it can be cancelled by a listener, and only future listeners with [Listener.receiveCancelled]
- * will receive it.
- *
- * Sequential listeners are called in the order of [Listener.priority], and parallel
- * listeners are called before or after, depending on the value of [Config.parallelFirst].
- *
- * [Information and examples](https://github.com/therealbush/eventbus-kotlin#tododothething)
- */
- fun post(event: Any) = listeners[event::class]?.post(event) ?: false
-
- /**
- * Logs the subscriber count, total listener count, and listener count for every event type with at
- * least one subscriber to [Config.logger]. Per-event counts are sorted from greatest to least listeners.
- *
- * **This may cause a [ConcurrentModificationException] if [register] or [subscribe] is called in parallel.**
- *
- * [Information and examples](https://github.com/therealbush/eventbus-kotlin#tododothething)
- * ```
- * Subscribers: 5
- * Listeners: 8 sequential, 21 parallel
- * BushIsSoCool: 4, 9
- * OtherEvent: 1, 10
- * String: 3, 0
- */
- fun debugInfo() {
- config.logger.info("Subscribers: ${subscribers.keys.size}")
- val sequential = listeners.values.sumOf { it.sequential.size }
- val parallel = listeners.values.sumOf { it.parallel.size }
- config.logger.info("Listeners: $sequential sequential, $parallel parallel")
- listeners.values.sortedByDescending { it.sequential.size + it.parallel.size }.forEach {
- config.logger.info(it.toString())
- }
- }
-}
-
diff --git a/src/main/kotlin/me/bush/illnamethislater/EventListener.kt b/src/main/kotlin/me/bush/illnamethislater/EventListener.kt
deleted file mode 100644
index e96b1db..0000000
--- a/src/main/kotlin/me/bush/illnamethislater/EventListener.kt
+++ /dev/null
@@ -1,6 +0,0 @@
-package me.bush.illnamethislater
-
-/**
- * todo docs
- */
-annotation class EventListener
diff --git a/src/main/kotlin/me/bush/illnamethislater/ListenerGroup.kt b/src/main/kotlin/me/bush/illnamethislater/ListenerGroup.kt
deleted file mode 100644
index 24a1ba3..0000000
--- a/src/main/kotlin/me/bush/illnamethislater/ListenerGroup.kt
+++ /dev/null
@@ -1,65 +0,0 @@
-package me.bush.illnamethislater
-
-import kotlinx.coroutines.launch
-import java.util.concurrent.CopyOnWriteArrayList
-import kotlin.reflect.KClass
-
-/**
- * A class for storing and handling listeners.
- *
- * @author bush
- * @since 1.0.0
- */
-internal class ListenerGroup(
- private val type: KClass<*>,
- private val config: Config
-) {
- private val cancelledState = CancelledState.of(type, config)
- val sequential = CopyOnWriteArrayList<Listener>()
- val parallel = CopyOnWriteArrayList<Listener>()
-
- /**
- * Adds [listener] to this [ListenerGroup], and sorts its list.
- */
- fun register(listener: Listener) {
- (if (listener.parallel) parallel else sequential).let {
- if (it.addIfAbsent(listener)) {
- it.sortedByDescending(Listener::priority)
- }
- }
- }
-
- /**
- * Removes [listener] from this [ListenerGroup].
- */
- fun unregister(listener: Listener): Boolean {
- return if (listener.parallel) parallel.remove(listener)
- else sequential.remove(listener)
- }
-
- /**
- * Posts an event to every listener. Returns true of the event was cancelled.
- */
- fun post(event: Any): Boolean {
- sequential.forEach {
- if (it.receiveCancelled || !cancelledState.isCancelled(event)) {
- it.listener(event)
- }
- }
- if (parallel.isNotEmpty()) {
- // We check this once, because listener order is not guaranteed.
- val cancelled = cancelledState.isCancelled(event)
- // Credit to KB for the idea
- parallel.forEach {
- if (it.receiveCancelled || !cancelled) {
- config.parallelScope.launch {
- it.listener(event)
- }
- }
- }
- }
- return cancelledState.isCancelled(event)
- }
-
- override fun toString() = "${type.simpleName}: ${sequential.size}, ${parallel.size}"
-}
diff --git a/src/test/java/JavaTest.java b/src/test/java/JavaTest.java
index 8518315..7d6f192 100644
--- a/src/test/java/JavaTest.java
+++ b/src/test/java/JavaTest.java
@@ -1,5 +1,5 @@
-import me.bush.illnamethislater.EventBus;
-import me.bush.illnamethislater.Listener;
+import me.bush.eventbuskotlin.Event;import me.bush.eventbuskotlin.EventBus;
+import me.bush.eventbuskotlin.Listener;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -10,32 +10,17 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.TestInstance.Lifecycle;
-import static me.bush.illnamethislater.ListenerKt.listener;
+import static me.bush.eventbuskotlin.ListenerKt.listener;
/**
- * I was getting NCDFE when trying to load this class
- * from the other test and I don't care enough to fix it.
- *
* @author bush
* @since 1.0.0
*/
@TestInstance(Lifecycle.PER_CLASS)
public class JavaTest {
- public static Listener someStaticListenerField = listener(SimpleEvent.class, event -> {
- event.setCount(event.getCount() + 1);
- });
private final Logger logger = LogManager.getLogger();
- public Listener someInstanceListenerField = listener(SimpleEvent.class, event -> {
- event.setCount(event.getCount() + 1);
- });
private EventBus eventBus;
- public static Listener someStaticListenerMethod() {
- return listener(SimpleEvent.class, event -> {
- event.setCount(event.getCount() + 1);
- });
- }
-
@BeforeAll
public void setup() {
Configurator.setRootLevel(Level.ALL);
@@ -52,9 +37,23 @@ public class JavaTest {
Assertions.assertEquals(event.getCount(), 4);
}
+ public Listener someInstanceListenerField = listener(SimpleEvent.class, event -> {
+ event.setCount(event.getCount() + 1);
+ });
+
+ public static Listener someStaticListenerField = listener(SimpleEvent.class, event -> {
+ event.setCount(event.getCount() + 1);
+ });
+
public Listener someInstanceListenerMethod() {
return listener(SimpleEvent.class, event -> {
event.setCount(event.getCount() + 1);
});
}
-}
+
+ public static Listener someStaticListenerMethod() {
+ return listener(SimpleEvent.class, event -> {
+ event.setCount(event.getCount() + 1);
+ });
+ }
+ }
diff --git a/src/test/kotlin/KotlinTest.kt b/src/test/kotlin/KotlinTest.kt
index 3670703..136e7c8 100644
--- a/src/test/kotlin/KotlinTest.kt
+++ b/src/test/kotlin/KotlinTest.kt
@@ -1,7 +1,7 @@
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
-import kotlinx.coroutines.runBlocking
-import me.bush.illnamethislater.*
+import kotlinx.coroutines.launch
+import me.bush.eventbuskotlin.*
import org.apache.logging.log4j.Level
import org.apache.logging.log4j.LogManager
import org.apache.logging.log4j.core.config.Configurator
@@ -9,6 +9,7 @@ import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.BeforeAll
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.TestInstance
+import java.util.concurrent.atomic.AtomicInteger
import kotlin.random.Random
/**
@@ -41,7 +42,7 @@ class KotlinTest {
////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@Test
- fun `test listener priority and ability to cancel events or receive cancelled events`() {
+ fun `test priority and ability to cancel events or receive cancelled events`() {
eventBus.subscribe(this)
val event = SimpleEvent()
eventBus.post(event)
@@ -130,25 +131,75 @@ class KotlinTest {
@Test
fun `test parallel event posting`() {
- runBlocking {
- sus()
+ val listeners = mutableListOf<Listener>()
+ repeat(10) {
+ // Not sure what to test
+ listeners += listener<Unit>(parallel = true) {
+ logger.info("Thread:" + Thread.currentThread().name)
+ }
}
- }
+ listeners.forEach { eventBus.register(it) }
+ eventBus.post(Unit)
+ listeners.forEach { eventBus.unregister(it) }
- suspend fun sus() {
- println()
- }
+ /* I'm not sure what else to test for this, but I'm also not really happy with parallel event posting yet. TODO
+
+ [DefaultDispatcher-worker-5 @coroutine#5] INFO Kotlin Test - DefaultDispatcher-worker-5 @coroutine#5
+ [DefaultDispatcher-worker-2 @coroutine#2] INFO Kotlin Test - DefaultDispatcher-worker-2 @coroutine#2
+ [DefaultDispatcher-worker-1 @coroutine#1] INFO Kotlin Test - DefaultDispatcher-worker-1 @coroutine#1
+ [DefaultDispatcher-worker-4 @coroutine#4] INFO Kotlin Test - DefaultDispatcher-worker-4 @coroutine#4
+ [DefaultDispatcher-worker-7 @coroutine#7] INFO Kotlin Test - DefaultDispatcher-worker-7 @coroutine#7
+ [DefaultDispatcher-worker-6 @coroutine#6] INFO Kotlin Test - DefaultDispatcher-worker-6 @coroutine#6
+ [DefaultDispatcher-worker-8 @coroutine#8] INFO Kotlin Test - DefaultDispatcher-worker-8 @coroutine#8
+ [DefaultDispatcher-worker-9 @coroutine#9] INFO Kotlin Test - DefaultDispatcher-worker-9 @coroutine#9
+ [DefaultDispatcher-worker-3 @coroutine#3] INFO Kotlin Test - DefaultDispatcher-worker-3 @coroutine#3
+ [DefaultDispatcher-worker-10 @coroutine#10] INFO Kotlin Test - DefaultDispatcher-worker-10 @coroutine#10 */
- fun sussy() {
- println()
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@Test
- fun `call every method on multiple threads concurrently to ensure no CME is thrown`() {
+ fun `test thread safety`() {
+ val listeners = mutableListOf<Listener>()
+ repeat(10) {
+ listeners += listener<Any>(parallel = false) {
+ doStuff()
+ }
+ listeners += listener<Any>(parallel = true) {
+ doStuff()
+ }
+ }
+ listeners.forEach { eventBus.register(it) }
+ eventBus.debug()
+ CoroutineScope(Dispatchers.Default).launch {
+ repeat(100) {
+ launch {
+ doStuff()
+ eventBus.post(Any())
+ }
+ }
+ }
+ Thread.sleep(2000)
+ Assertions.assertEquals(2100, counter.get())
+ listeners.forEach { eventBus.unregister(it) }
+ eventBus.unregister(dummy)
+ eventBus.unsubscribe(this)
+ // Should be empty
+ eventBus.debug()
+ }
+
+ private val dummy = listener<Unit> {}
+
+ private var counter = AtomicInteger()
+ private fun doStuff() {
+ eventBus.unsubscribe(this)
+ eventBus.subscribe(this)
+ eventBus.unregister(dummy)
+ eventBus.register(dummy)
+ counter.getAndIncrement()
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -181,6 +232,7 @@ class KotlinTest {
eventBus.subscribe(this)
eventBus.post(Unit)
Assertions.assertTrue(called)
+ eventBus.unsubscribe(this)
}
var called = false