diff options
| author | shedaniel <daniel@shedaniel.me> | 2022-11-07 21:51:17 +0800 |
|---|---|---|
| committer | shedaniel <daniel@shedaniel.me> | 2022-11-07 21:55:46 +0800 |
| commit | 85a0ae5badcdb94e8ea092f3feecfa631df47f3c (patch) | |
| tree | 5249bdbbd843fdf77a001464239a46d0cd4f0daf /shared-internals/src/main/java | |
| parent | 7f85089abba4c9500365b694abda364446ab9b3c (diff) | |
| parent | 41180dd40ac5214da245cfa7956dc662c4d95bea (diff) | |
| download | RoughlyEnoughItems-85a0ae5badcdb94e8ea092f3feecfa631df47f3c.tar.gz RoughlyEnoughItems-85a0ae5badcdb94e8ea092f3feecfa631df47f3c.tar.bz2 RoughlyEnoughItems-85a0ae5badcdb94e8ea092f3feecfa631df47f3c.zip | |
Merge commit '41180dd40ac5214da245cfa7956dc662c4d95bea' into modularity
Diffstat (limited to 'shared-internals/src/main/java')
3 files changed, 193 insertions, 68 deletions
diff --git a/shared-internals/src/main/java/me/shedaniel/rei/impl/client/search/AsyncSearchManager.java b/shared-internals/src/main/java/me/shedaniel/rei/impl/client/search/AsyncSearchManager.java index 0fb426a34..442fda91c 100644 --- a/shared-internals/src/main/java/me/shedaniel/rei/impl/client/search/AsyncSearchManager.java +++ b/shared-internals/src/main/java/me/shedaniel/rei/impl/client/search/AsyncSearchManager.java @@ -24,31 +24,33 @@ package me.shedaniel.rei.impl.client.search; import com.google.common.collect.Lists; +import dev.architectury.platform.Platform; import me.shedaniel.rei.api.client.config.ConfigObject; import me.shedaniel.rei.api.client.search.SearchFilter; import me.shedaniel.rei.api.client.search.SearchProvider; import me.shedaniel.rei.api.common.entry.EntryStack; import me.shedaniel.rei.api.common.util.CollectionUtils; +import me.shedaniel.rei.impl.client.util.ThreadCreator; import org.jetbrains.annotations.Nullable; +import java.util.AbstractMap; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.concurrent.*; -import java.util.function.Consumer; +import java.util.function.BiConsumer; import java.util.function.Predicate; import java.util.function.Supplier; import java.util.function.UnaryOperator; public class AsyncSearchManager implements SearchManager { + private static final ExecutorService EXECUTOR_SERVICE = new ThreadCreator("REI-AsyncSearchManager").asService(); private final Supplier<List<EntryStack<?>>> stacksProvider; private final Supplier<Predicate<EntryStack<?>>> additionalPredicateSupplier; private final UnaryOperator<EntryStack<?>> transformer; - private Predicate<EntryStack<?>> additionalPredicate; + private ExecutorTuple executor; private SearchFilter filter; - private boolean dirty = false; - private boolean filterDirty = false; - private CompletableFuture<List<EntryStack<?>>> future; - private List<EntryStack<?>> last; + private Map.Entry<List<EntryStack<?>>, SearchFilter> last; public AsyncSearchManager(Supplier<List<EntryStack<?>>> stacksProvider, Supplier<Predicate<EntryStack<?>>> additionalPredicateSupplier, UnaryOperator<EntryStack<?>> transformer) { this.stacksProvider = stacksProvider; @@ -58,95 +60,157 @@ public class AsyncSearchManager implements SearchManager { @Override public void markDirty() { - this.dirty = true; + synchronized (AsyncSearchManager.this) { + this.last = null; + } } - @Override - public void markFilterDirty() { - this.filterDirty = true; + private record ExecutorTuple(SearchFilter filter, CompletableFuture<Map.Entry<List<EntryStack<?>>, SearchFilter>> future) { } @Override public void updateFilter(String filter) { if (this.filter == null || !this.filter.getFilter().equals(filter)) { + if (this.executor != null) { + this.executor.future().cancel(Platform.isFabric()); + } + this.executor = null; this.filter = SearchProvider.getInstance().createFilter(filter); - markDirty(); - markFilterDirty(); } } @Override public boolean isDirty() { - return last == null || dirty; + synchronized (AsyncSearchManager.this) { + return this.last == null || this.last.getValue() != this.filter; + } } @Override - public boolean isFilterDirty() { - return filterDirty; + public Future<?> getAsync(BiConsumer<List<EntryStack<?>>, SearchFilter> consumer) { + if (this.executor == null || this.executor.filter() != filter) { + if (this.executor != null) { + this.executor.future().cancel(Platform.isFabric()); + } + this.executor = new ExecutorTuple(filter, get(EXECUTOR_SERVICE)); + } + SearchFilter savedFilter = filter; + return (this.executor = new ExecutorTuple(this.executor.filter(), this.executor.future().thenApplyAsync(result -> { + if (savedFilter == filter) { + consumer.accept(result.getKey(), result.getValue()); + } + + return result; + }, EXECUTOR_SERVICE))).future(); } @Override - public Future<Void> getAsync(Consumer<List<EntryStack<?>>> consumer) { - if (future == null || future.isCancelled() || future.isDone() || future.isCompletedExceptionally()) { - if (future != null) future.cancel(true); - future = CompletableFuture.supplyAsync(this) - .exceptionally(throwable -> { - throwable.printStackTrace(); - return null; - }); + public List<EntryStack<?>> get() { + return getNow(); + } + + public List<EntryStack<?>> getNow() { + try { + return get(Runnable::run).get().getKey(); + } catch (ExecutionException e) { + throw new RuntimeException(e); + } catch (InterruptedException | CancellationException e) { + return Lists.newArrayList(); } - return future.thenAccept(consumer); } - @Override - public List<EntryStack<?>> get() { + public CompletableFuture<Map.Entry<List<EntryStack<?>>, SearchFilter>> get(Executor executor) { if (isDirty()) { - this.additionalPredicate = additionalPredicateSupplier.get(); - int searchPartitionSize = ConfigObject.getInstance().getAsyncSearchPartitionSize(); - List<EntryStack<?>> stacks = stacksProvider.get(); - last = new ArrayList<>(); + Map.Entry<List<EntryStack<?>>, SearchFilter> last; + synchronized (AsyncSearchManager.this) { + last = this.last; + } + return get(this.filter, this.additionalPredicateSupplier.get(), this.transformer, + this.stacksProvider.get(), last, this, executor) + .thenApply(entry -> { + synchronized (AsyncSearchManager.this) { + this.last = entry; + } + return entry; + }); + } + + return CompletableFuture.completedFuture(last); + } + + public static CompletableFuture<Map.Entry<List<EntryStack<?>>, SearchFilter>> get(SearchFilter filter, Predicate<EntryStack<?>> additionalPredicate, + UnaryOperator<EntryStack<?>> transformer, List<EntryStack<?>> stacks, Map.Entry<List<EntryStack<?>>, SearchFilter> last, + AsyncSearchManager manager, Executor executor) { + int searchPartitionSize = ConfigObject.getInstance().getAsyncSearchPartitionSize(); + boolean shouldAsync = ConfigObject.getInstance().shouldAsyncSearch() && stacks.size() > searchPartitionSize * 4; + + if (!stacks.isEmpty()) { + CompletableFuture<Void> preparationFuture = CompletableFuture.completedFuture(null); - if (!stacks.isEmpty()) { - if (filterDirty) { - filter.prepareFilter(stacks); - filterDirty = false; - } - - if (ConfigObject.getInstance().shouldAsyncSearch() && stacks.size() > searchPartitionSize * 4) { - List<CompletableFuture<List<EntryStack<?>>>> futures = Lists.newArrayList(); - for (Iterable<EntryStack<?>> partitionStacks : CollectionUtils.partition(stacks, searchPartitionSize)) { - futures.add(CompletableFuture.supplyAsync(() -> { - List<EntryStack<?>> filtered = Lists.newArrayList(); - for (EntryStack<?> stack : partitionStacks) { - if (stack != null && matches(stack) && additionalPredicate.test(stack)) { - filtered.add(transformer.apply(stack)); - } - } - return filtered; - })); - } - try { - CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(30, TimeUnit.SECONDS); - } catch (InterruptedException | ExecutionException | TimeoutException e) { - e.printStackTrace(); - } - for (CompletableFuture<List<EntryStack<?>>> future : futures) { - List<EntryStack<?>> now = future.getNow(null); - if (now != null) last.addAll(now); + if (last == null || last.getValue() != filter) { + Runnable prepare = () -> { + if (manager.filter == filter) { + filter.prepareFilter(stacks); + } else { + throw new CancellationException(); } + }; + if (shouldAsync) { + preparationFuture = CompletableFuture.runAsync(prepare, executor); } else { - for (EntryStack<?> stack : stacks) { - if (matches(stack) && additionalPredicate.test(stack)) { - last.add(transformer.apply(stack)); + prepare.run(); + preparationFuture = CompletableFuture.completedFuture(null); + } + } + + if (shouldAsync) { + List<CompletableFuture<List<EntryStack<?>>>> futures = Lists.newArrayList(); + for (Iterable<EntryStack<?>> partitionStacks : CollectionUtils.partition(stacks, Math.max(searchPartitionSize, stacks.size() * 3 / Runtime.getRuntime().availableProcessors()))) { + futures.add(CompletableFuture.supplyAsync(() -> { + List<EntryStack<?>> filtered = Lists.newArrayList(); + for (EntryStack<?> stack : partitionStacks) { + if (stack != null && filter.test(stack) && additionalPredicate.test(stack)) { + filtered.add(transformer.apply(stack)); + } + if (manager.filter != filter) throw new CancellationException(); } + return filtered; + }, executor)); + } + return preparationFuture.thenCompose($ -> CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])) + .orTimeout(30, TimeUnit.SECONDS)) + .thenApplyAsync($ -> { + List<EntryStack<?>> list = new ArrayList<>(); + + if (manager.filter == filter) { + for (CompletableFuture<List<EntryStack<?>>> future : futures) { + List<EntryStack<?>> now = future.getNow(null); + if (now != null) list.addAll(now); + } + } else { + throw new CancellationException(); + } + + return list; + }, executor) + .thenApply(result -> { + return new AbstractMap.SimpleImmutableEntry<>(result, filter); + }); + } else { + List<EntryStack<?>> list = new ArrayList<>(); + + for (EntryStack<?> stack : stacks) { + if (filter.test(stack) && additionalPredicate.test(stack)) { + list.add(transformer.apply(stack)); } + if (manager.filter != filter) throw new CancellationException(); } + + return CompletableFuture.completedFuture(new AbstractMap.SimpleImmutableEntry<>(list, filter)); } - - dirty = false; } - return last; + return CompletableFuture.completedFuture(new AbstractMap.SimpleImmutableEntry<>(Lists.newArrayList(), filter)); } @Override diff --git a/shared-internals/src/main/java/me/shedaniel/rei/impl/client/search/SearchManager.java b/shared-internals/src/main/java/me/shedaniel/rei/impl/client/search/SearchManager.java index cdbc75925..e57d36f4c 100644 --- a/shared-internals/src/main/java/me/shedaniel/rei/impl/client/search/SearchManager.java +++ b/shared-internals/src/main/java/me/shedaniel/rei/impl/client/search/SearchManager.java @@ -29,21 +29,18 @@ import org.jetbrains.annotations.Nullable; import java.util.List; import java.util.concurrent.Future; +import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Supplier; public interface SearchManager extends Supplier<List<EntryStack<?>>> { void markDirty(); - void markFilterDirty(); - void updateFilter(String filter); boolean isDirty(); - boolean isFilterDirty(); - - Future<Void> getAsync(Consumer<List<EntryStack<?>>> consumer); + Future<?> getAsync(BiConsumer<List<EntryStack<?>>, SearchFilter> consumer); boolean matches(EntryStack<?> stack); diff --git a/shared-internals/src/main/java/me/shedaniel/rei/impl/client/util/ThreadCreator.java b/shared-internals/src/main/java/me/shedaniel/rei/impl/client/util/ThreadCreator.java new file mode 100644 index 000000000..6815d7c56 --- /dev/null +++ b/shared-internals/src/main/java/me/shedaniel/rei/impl/client/util/ThreadCreator.java @@ -0,0 +1,64 @@ +/* + * This file is licensed under the MIT License, part of Roughly Enough Items. + * Copyright (c) 2018, 2019, 2020, 2021, 2022 shedaniel + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package me.shedaniel.rei.impl.client.util; + +import me.shedaniel.rei.impl.common.InternalLogger; + +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; + +public final class ThreadCreator { + private final ThreadGroup group; + private final AtomicInteger threadId = new AtomicInteger(0); + + public ThreadCreator(String groupName) { + this.group = new ThreadGroup(groupName); + } + + public ThreadGroup group() { + return group; + } + + public AtomicInteger threadId() { + return threadId; + } + + public Thread create(Runnable task) { + Thread thread = new Thread(this.group(), task, this.group().getName() + "-" + this.threadId().getAndIncrement()); + thread.setDaemon(true); + thread.setUncaughtExceptionHandler(($, exception) -> { + if (!(exception instanceof InterruptedException) && !(exception instanceof CancellationException) && !(exception instanceof ThreadDeath)) { + InternalLogger.getInstance().throwException(exception); + } + }); + return thread; + } + + public ExecutorService asService() { + return new ThreadPoolExecutor(0, Runtime.getRuntime().availableProcessors() * 4, + 0L, TimeUnit.SECONDS, + new SynchronousQueue<>(), + this::create); + } +} |
