aboutsummaryrefslogtreecommitdiff
path: root/shared-internals/src/main/java
diff options
context:
space:
mode:
authorshedaniel <daniel@shedaniel.me>2022-11-07 21:51:17 +0800
committershedaniel <daniel@shedaniel.me>2022-11-07 21:55:46 +0800
commit85a0ae5badcdb94e8ea092f3feecfa631df47f3c (patch)
tree5249bdbbd843fdf77a001464239a46d0cd4f0daf /shared-internals/src/main/java
parent7f85089abba4c9500365b694abda364446ab9b3c (diff)
parent41180dd40ac5214da245cfa7956dc662c4d95bea (diff)
downloadRoughlyEnoughItems-85a0ae5badcdb94e8ea092f3feecfa631df47f3c.tar.gz
RoughlyEnoughItems-85a0ae5badcdb94e8ea092f3feecfa631df47f3c.tar.bz2
RoughlyEnoughItems-85a0ae5badcdb94e8ea092f3feecfa631df47f3c.zip
Merge commit '41180dd40ac5214da245cfa7956dc662c4d95bea' into modularity
Diffstat (limited to 'shared-internals/src/main/java')
-rw-r--r--shared-internals/src/main/java/me/shedaniel/rei/impl/client/search/AsyncSearchManager.java190
-rw-r--r--shared-internals/src/main/java/me/shedaniel/rei/impl/client/search/SearchManager.java7
-rw-r--r--shared-internals/src/main/java/me/shedaniel/rei/impl/client/util/ThreadCreator.java64
3 files changed, 193 insertions, 68 deletions
diff --git a/shared-internals/src/main/java/me/shedaniel/rei/impl/client/search/AsyncSearchManager.java b/shared-internals/src/main/java/me/shedaniel/rei/impl/client/search/AsyncSearchManager.java
index 0fb426a34..442fda91c 100644
--- a/shared-internals/src/main/java/me/shedaniel/rei/impl/client/search/AsyncSearchManager.java
+++ b/shared-internals/src/main/java/me/shedaniel/rei/impl/client/search/AsyncSearchManager.java
@@ -24,31 +24,33 @@
package me.shedaniel.rei.impl.client.search;
import com.google.common.collect.Lists;
+import dev.architectury.platform.Platform;
import me.shedaniel.rei.api.client.config.ConfigObject;
import me.shedaniel.rei.api.client.search.SearchFilter;
import me.shedaniel.rei.api.client.search.SearchProvider;
import me.shedaniel.rei.api.common.entry.EntryStack;
import me.shedaniel.rei.api.common.util.CollectionUtils;
+import me.shedaniel.rei.impl.client.util.ThreadCreator;
import org.jetbrains.annotations.Nullable;
+import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.*;
-import java.util.function.Consumer;
+import java.util.function.BiConsumer;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.function.UnaryOperator;
public class AsyncSearchManager implements SearchManager {
+ private static final ExecutorService EXECUTOR_SERVICE = new ThreadCreator("REI-AsyncSearchManager").asService();
private final Supplier<List<EntryStack<?>>> stacksProvider;
private final Supplier<Predicate<EntryStack<?>>> additionalPredicateSupplier;
private final UnaryOperator<EntryStack<?>> transformer;
- private Predicate<EntryStack<?>> additionalPredicate;
+ private ExecutorTuple executor;
private SearchFilter filter;
- private boolean dirty = false;
- private boolean filterDirty = false;
- private CompletableFuture<List<EntryStack<?>>> future;
- private List<EntryStack<?>> last;
+ private Map.Entry<List<EntryStack<?>>, SearchFilter> last;
public AsyncSearchManager(Supplier<List<EntryStack<?>>> stacksProvider, Supplier<Predicate<EntryStack<?>>> additionalPredicateSupplier, UnaryOperator<EntryStack<?>> transformer) {
this.stacksProvider = stacksProvider;
@@ -58,95 +60,157 @@ public class AsyncSearchManager implements SearchManager {
@Override
public void markDirty() {
- this.dirty = true;
+ synchronized (AsyncSearchManager.this) {
+ this.last = null;
+ }
}
- @Override
- public void markFilterDirty() {
- this.filterDirty = true;
+ private record ExecutorTuple(SearchFilter filter, CompletableFuture<Map.Entry<List<EntryStack<?>>, SearchFilter>> future) {
}
@Override
public void updateFilter(String filter) {
if (this.filter == null || !this.filter.getFilter().equals(filter)) {
+ if (this.executor != null) {
+ this.executor.future().cancel(Platform.isFabric());
+ }
+ this.executor = null;
this.filter = SearchProvider.getInstance().createFilter(filter);
- markDirty();
- markFilterDirty();
}
}
@Override
public boolean isDirty() {
- return last == null || dirty;
+ synchronized (AsyncSearchManager.this) {
+ return this.last == null || this.last.getValue() != this.filter;
+ }
}
@Override
- public boolean isFilterDirty() {
- return filterDirty;
+ public Future<?> getAsync(BiConsumer<List<EntryStack<?>>, SearchFilter> consumer) {
+ if (this.executor == null || this.executor.filter() != filter) {
+ if (this.executor != null) {
+ this.executor.future().cancel(Platform.isFabric());
+ }
+ this.executor = new ExecutorTuple(filter, get(EXECUTOR_SERVICE));
+ }
+ SearchFilter savedFilter = filter;
+ return (this.executor = new ExecutorTuple(this.executor.filter(), this.executor.future().thenApplyAsync(result -> {
+ if (savedFilter == filter) {
+ consumer.accept(result.getKey(), result.getValue());
+ }
+
+ return result;
+ }, EXECUTOR_SERVICE))).future();
}
@Override
- public Future<Void> getAsync(Consumer<List<EntryStack<?>>> consumer) {
- if (future == null || future.isCancelled() || future.isDone() || future.isCompletedExceptionally()) {
- if (future != null) future.cancel(true);
- future = CompletableFuture.supplyAsync(this)
- .exceptionally(throwable -> {
- throwable.printStackTrace();
- return null;
- });
+ public List<EntryStack<?>> get() {
+ return getNow();
+ }
+
+ public List<EntryStack<?>> getNow() {
+ try {
+ return get(Runnable::run).get().getKey();
+ } catch (ExecutionException e) {
+ throw new RuntimeException(e);
+ } catch (InterruptedException | CancellationException e) {
+ return Lists.newArrayList();
}
- return future.thenAccept(consumer);
}
- @Override
- public List<EntryStack<?>> get() {
+ public CompletableFuture<Map.Entry<List<EntryStack<?>>, SearchFilter>> get(Executor executor) {
if (isDirty()) {
- this.additionalPredicate = additionalPredicateSupplier.get();
- int searchPartitionSize = ConfigObject.getInstance().getAsyncSearchPartitionSize();
- List<EntryStack<?>> stacks = stacksProvider.get();
- last = new ArrayList<>();
+ Map.Entry<List<EntryStack<?>>, SearchFilter> last;
+ synchronized (AsyncSearchManager.this) {
+ last = this.last;
+ }
+ return get(this.filter, this.additionalPredicateSupplier.get(), this.transformer,
+ this.stacksProvider.get(), last, this, executor)
+ .thenApply(entry -> {
+ synchronized (AsyncSearchManager.this) {
+ this.last = entry;
+ }
+ return entry;
+ });
+ }
+
+ return CompletableFuture.completedFuture(last);
+ }
+
+ public static CompletableFuture<Map.Entry<List<EntryStack<?>>, SearchFilter>> get(SearchFilter filter, Predicate<EntryStack<?>> additionalPredicate,
+ UnaryOperator<EntryStack<?>> transformer, List<EntryStack<?>> stacks, Map.Entry<List<EntryStack<?>>, SearchFilter> last,
+ AsyncSearchManager manager, Executor executor) {
+ int searchPartitionSize = ConfigObject.getInstance().getAsyncSearchPartitionSize();
+ boolean shouldAsync = ConfigObject.getInstance().shouldAsyncSearch() && stacks.size() > searchPartitionSize * 4;
+
+ if (!stacks.isEmpty()) {
+ CompletableFuture<Void> preparationFuture = CompletableFuture.completedFuture(null);
- if (!stacks.isEmpty()) {
- if (filterDirty) {
- filter.prepareFilter(stacks);
- filterDirty = false;
- }
-
- if (ConfigObject.getInstance().shouldAsyncSearch() && stacks.size() > searchPartitionSize * 4) {
- List<CompletableFuture<List<EntryStack<?>>>> futures = Lists.newArrayList();
- for (Iterable<EntryStack<?>> partitionStacks : CollectionUtils.partition(stacks, searchPartitionSize)) {
- futures.add(CompletableFuture.supplyAsync(() -> {
- List<EntryStack<?>> filtered = Lists.newArrayList();
- for (EntryStack<?> stack : partitionStacks) {
- if (stack != null && matches(stack) && additionalPredicate.test(stack)) {
- filtered.add(transformer.apply(stack));
- }
- }
- return filtered;
- }));
- }
- try {
- CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(30, TimeUnit.SECONDS);
- } catch (InterruptedException | ExecutionException | TimeoutException e) {
- e.printStackTrace();
- }
- for (CompletableFuture<List<EntryStack<?>>> future : futures) {
- List<EntryStack<?>> now = future.getNow(null);
- if (now != null) last.addAll(now);
+ if (last == null || last.getValue() != filter) {
+ Runnable prepare = () -> {
+ if (manager.filter == filter) {
+ filter.prepareFilter(stacks);
+ } else {
+ throw new CancellationException();
}
+ };
+ if (shouldAsync) {
+ preparationFuture = CompletableFuture.runAsync(prepare, executor);
} else {
- for (EntryStack<?> stack : stacks) {
- if (matches(stack) && additionalPredicate.test(stack)) {
- last.add(transformer.apply(stack));
+ prepare.run();
+ preparationFuture = CompletableFuture.completedFuture(null);
+ }
+ }
+
+ if (shouldAsync) {
+ List<CompletableFuture<List<EntryStack<?>>>> futures = Lists.newArrayList();
+ for (Iterable<EntryStack<?>> partitionStacks : CollectionUtils.partition(stacks, Math.max(searchPartitionSize, stacks.size() * 3 / Runtime.getRuntime().availableProcessors()))) {
+ futures.add(CompletableFuture.supplyAsync(() -> {
+ List<EntryStack<?>> filtered = Lists.newArrayList();
+ for (EntryStack<?> stack : partitionStacks) {
+ if (stack != null && filter.test(stack) && additionalPredicate.test(stack)) {
+ filtered.add(transformer.apply(stack));
+ }
+ if (manager.filter != filter) throw new CancellationException();
}
+ return filtered;
+ }, executor));
+ }
+ return preparationFuture.thenCompose($ -> CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
+ .orTimeout(30, TimeUnit.SECONDS))
+ .thenApplyAsync($ -> {
+ List<EntryStack<?>> list = new ArrayList<>();
+
+ if (manager.filter == filter) {
+ for (CompletableFuture<List<EntryStack<?>>> future : futures) {
+ List<EntryStack<?>> now = future.getNow(null);
+ if (now != null) list.addAll(now);
+ }
+ } else {
+ throw new CancellationException();
+ }
+
+ return list;
+ }, executor)
+ .thenApply(result -> {
+ return new AbstractMap.SimpleImmutableEntry<>(result, filter);
+ });
+ } else {
+ List<EntryStack<?>> list = new ArrayList<>();
+
+ for (EntryStack<?> stack : stacks) {
+ if (filter.test(stack) && additionalPredicate.test(stack)) {
+ list.add(transformer.apply(stack));
}
+ if (manager.filter != filter) throw new CancellationException();
}
+
+ return CompletableFuture.completedFuture(new AbstractMap.SimpleImmutableEntry<>(list, filter));
}
-
- dirty = false;
}
- return last;
+ return CompletableFuture.completedFuture(new AbstractMap.SimpleImmutableEntry<>(Lists.newArrayList(), filter));
}
@Override
diff --git a/shared-internals/src/main/java/me/shedaniel/rei/impl/client/search/SearchManager.java b/shared-internals/src/main/java/me/shedaniel/rei/impl/client/search/SearchManager.java
index cdbc75925..e57d36f4c 100644
--- a/shared-internals/src/main/java/me/shedaniel/rei/impl/client/search/SearchManager.java
+++ b/shared-internals/src/main/java/me/shedaniel/rei/impl/client/search/SearchManager.java
@@ -29,21 +29,18 @@ import org.jetbrains.annotations.Nullable;
import java.util.List;
import java.util.concurrent.Future;
+import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Supplier;
public interface SearchManager extends Supplier<List<EntryStack<?>>> {
void markDirty();
- void markFilterDirty();
-
void updateFilter(String filter);
boolean isDirty();
- boolean isFilterDirty();
-
- Future<Void> getAsync(Consumer<List<EntryStack<?>>> consumer);
+ Future<?> getAsync(BiConsumer<List<EntryStack<?>>, SearchFilter> consumer);
boolean matches(EntryStack<?> stack);
diff --git a/shared-internals/src/main/java/me/shedaniel/rei/impl/client/util/ThreadCreator.java b/shared-internals/src/main/java/me/shedaniel/rei/impl/client/util/ThreadCreator.java
new file mode 100644
index 000000000..6815d7c56
--- /dev/null
+++ b/shared-internals/src/main/java/me/shedaniel/rei/impl/client/util/ThreadCreator.java
@@ -0,0 +1,64 @@
+/*
+ * This file is licensed under the MIT License, part of Roughly Enough Items.
+ * Copyright (c) 2018, 2019, 2020, 2021, 2022 shedaniel
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package me.shedaniel.rei.impl.client.util;
+
+import me.shedaniel.rei.impl.common.InternalLogger;
+
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public final class ThreadCreator {
+ private final ThreadGroup group;
+ private final AtomicInteger threadId = new AtomicInteger(0);
+
+ public ThreadCreator(String groupName) {
+ this.group = new ThreadGroup(groupName);
+ }
+
+ public ThreadGroup group() {
+ return group;
+ }
+
+ public AtomicInteger threadId() {
+ return threadId;
+ }
+
+ public Thread create(Runnable task) {
+ Thread thread = new Thread(this.group(), task, this.group().getName() + "-" + this.threadId().getAndIncrement());
+ thread.setDaemon(true);
+ thread.setUncaughtExceptionHandler(($, exception) -> {
+ if (!(exception instanceof InterruptedException) && !(exception instanceof CancellationException) && !(exception instanceof ThreadDeath)) {
+ InternalLogger.getInstance().throwException(exception);
+ }
+ });
+ return thread;
+ }
+
+ public ExecutorService asService() {
+ return new ThreadPoolExecutor(0, Runtime.getRuntime().availableProcessors() * 4,
+ 0L, TimeUnit.SECONDS,
+ new SynchronousQueue<>(),
+ this::create);
+ }
+}