diff options
author | Luck <git@lucko.me> | 2021-09-26 10:05:13 +0100 |
---|---|---|
committer | Luck <git@lucko.me> | 2021-09-26 10:05:13 +0100 |
commit | 89bd36f6cd9859ee8d7f6d9d51620326d678b457 (patch) | |
tree | a4fc6c57b87b539fd6921ae8d9b3df62de7e26ad /spark-common/src | |
parent | abc2eaa49f928a064ab3f197f3d9d8e3d16f2eb8 (diff) | |
download | spark-89bd36f6cd9859ee8d7f6d9d51620326d678b457.tar.gz spark-89bd36f6cd9859ee8d7f6d9d51620326d678b457.tar.bz2 spark-89bd36f6cd9859ee8d7f6d9d51620326d678b457.zip |
Support --timeout flag for async-profiler sampler mode
Diffstat (limited to 'spark-common/src')
6 files changed, 116 insertions, 72 deletions
diff --git a/spark-common/src/main/java/me/lucko/spark/common/command/modules/SamplerModule.java b/spark-common/src/main/java/me/lucko/spark/common/command/modules/SamplerModule.java index c0a295a..4147de3 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/command/modules/SamplerModule.java +++ b/spark-common/src/main/java/me/lucko/spark/common/command/modules/SamplerModule.java @@ -231,7 +231,7 @@ public class SamplerModule implements CommandModule { resp.broadcastPrefixed(text("The results will be automatically returned after the profiler has been running for " + timeoutSeconds + " seconds.")); } - CompletableFuture<? extends Sampler> future = this.activeSampler.getFuture(); + CompletableFuture<Sampler> future = this.activeSampler.getFuture(); // send message if profiling fails future.whenCompleteAsync((s, throwable) -> { diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/AbstractSampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/AbstractSampler.java new file mode 100644 index 0000000..bae93b1 --- /dev/null +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/AbstractSampler.java @@ -0,0 +1,68 @@ +/* + * This file is part of spark. + * + * Copyright (c) lucko (Luck) <luck@lucko.me> + * Copyright (c) contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +package me.lucko.spark.common.sampler; + +import java.util.concurrent.CompletableFuture; + +/** + * Base implementation class for {@link Sampler}s. + */ +public abstract class AbstractSampler implements Sampler { + + /** The interval to wait between sampling, in microseconds */ + protected final int interval; + + /** The instance used to generate thread information for use in sampling */ + protected final ThreadDumper threadDumper; + + /** The time when sampling first began */ + protected long startTime = -1; + + /** The unix timestamp (in millis) when this sampler should automatically complete. */ + protected final long endTime; // -1 for nothing + + /** A future to encapsulate the completion of this sampler instance */ + protected final CompletableFuture<Sampler> future = new CompletableFuture<>(); + + protected AbstractSampler(int interval, ThreadDumper threadDumper, long endTime) { + this.interval = interval; + this.threadDumper = threadDumper; + this.endTime = endTime; + } + + @Override + public long getStartTime() { + if (this.startTime == -1) { + throw new IllegalStateException("Not yet started"); + } + return this.startTime; + } + + @Override + public long getEndTime() { + return this.endTime; + } + + @Override + public CompletableFuture<Sampler> getFuture() { + return this.future; + } +} diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java index b512bc1..b71aaee 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java @@ -65,7 +65,7 @@ public interface Sampler { * * @return a future */ - CompletableFuture<? extends Sampler> getFuture(); + CompletableFuture<Sampler> getFuture(); // Methods used to export the sampler data to the web viewer. SamplerData toProto(PlatformInfo platformInfo, CommandSender creator, Comparator<? super Map.Entry<String, ThreadNode>> outputOrder, String comment, MergeMode mergeMode, ClassSourceLookup classSourceLookup); diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java index 7dff29e..935329d 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java @@ -95,8 +95,8 @@ public class SamplerBuilder { int intervalMicros = (int) (this.samplingInterval * 1000d); if (this.ticksOver == -1 || this.tickHook == null) { - if (this.useAsyncProfiler && this.timeout == -1 && !(this.threadDumper instanceof ThreadDumper.Regex) && AsyncProfilerAccess.INSTANCE.isSupported()) { - sampler = new AsyncSampler(intervalMicros, this.threadDumper, this.threadGrouper); + if (this.useAsyncProfiler && !(this.threadDumper instanceof ThreadDumper.Regex) && AsyncProfilerAccess.INSTANCE.isSupported()) { + sampler = new AsyncSampler(intervalMicros, this.threadDumper, this.threadGrouper, this.timeout); } else { sampler = new JavaSampler(intervalMicros, this.threadDumper, this.threadGrouper, this.timeout, this.ignoreSleeping, this.ignoreNative); } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java index ca30df0..ed09e46 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java @@ -20,9 +20,11 @@ package me.lucko.spark.common.sampler.async; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + import me.lucko.spark.common.command.sender.CommandSender; import me.lucko.spark.common.platform.PlatformInfo; -import me.lucko.spark.common.sampler.Sampler; +import me.lucko.spark.common.sampler.AbstractSampler; import me.lucko.spark.common.sampler.ThreadDumper; import me.lucko.spark.common.sampler.ThreadGrouper; import me.lucko.spark.common.sampler.async.jfr.JfrReader; @@ -41,53 +43,33 @@ import java.util.ArrayList; import java.util.Comparator; import java.util.List; import java.util.Map; -import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.function.Predicate; /** * A sampler implementation using async-profiler. */ -public class AsyncSampler implements Sampler { +public class AsyncSampler extends AbstractSampler { private final AsyncProfiler profiler; - /** The instance used to generate thread information for use in sampling */ - private final ThreadDumper threadDumper; /** Responsible for aggregating and then outputting collected sampling data */ private final AsyncDataAggregator dataAggregator; + /** Flag to mark if the output has been completed */ private boolean outputComplete = false; + /** The temporary output file */ private Path outputFile; - /** The interval to wait between sampling, in microseconds */ - private final int interval; - /** The time when sampling first began */ - private long startTime = -1; + /** The executor used for timeouts */ + private ScheduledExecutorService timeoutExecutor; - public AsyncSampler(int interval, ThreadDumper threadDumper, ThreadGrouper threadGrouper) { + public AsyncSampler(int interval, ThreadDumper threadDumper, ThreadGrouper threadGrouper, long endTime) { + super(interval, threadDumper, endTime); this.profiler = AsyncProfilerAccess.INSTANCE.getProfiler(); - this.threadDumper = threadDumper; this.dataAggregator = new AsyncDataAggregator(threadGrouper); - this.interval = interval; - } - - @Override - public long getStartTime() { - if (this.startTime == -1) { - throw new IllegalStateException("Not yet started"); - } - return this.startTime; - } - - @Override - public long getEndTime() { - return -1; - } - - @Override - public CompletableFuture<? extends Sampler> getFuture() { - return new CompletableFuture<>(); } /** @@ -134,6 +116,28 @@ public class AsyncSampler implements Sampler { this.profiler.addThread(thread); } } + + scheduleTimeout(); + } + + private void scheduleTimeout() { + if (this.endTime == -1) { + return; + } + + long delay = this.endTime - System.currentTimeMillis(); + if (delay <= 0) { + return; + } + + this.timeoutExecutor = Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder().setNameFormat("spark-asyncsampler-timeout-thread").build() + ); + + this.timeoutExecutor.schedule(() -> { + stop(); + this.future.complete(this); + }, delay, TimeUnit.MILLISECONDS); } /** @@ -142,6 +146,11 @@ public class AsyncSampler implements Sampler { @Override public void stop() { this.profiler.stop(); + + if (this.timeoutExecutor != null) { + this.timeoutExecutor.shutdown(); + this.timeoutExecutor = null; + } } @Override diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java index a7204b3..02d5f01 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java @@ -25,7 +25,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import me.lucko.spark.common.command.sender.CommandSender; import me.lucko.spark.common.platform.PlatformInfo; -import me.lucko.spark.common.sampler.Sampler; +import me.lucko.spark.common.sampler.AbstractSampler; import me.lucko.spark.common.sampler.ThreadDumper; import me.lucko.spark.common.sampler.ThreadGrouper; import me.lucko.spark.common.sampler.node.MergeMode; @@ -42,7 +42,6 @@ import java.util.ArrayList; import java.util.Comparator; import java.util.List; import java.util.Map; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; @@ -52,7 +51,7 @@ import java.util.concurrent.atomic.AtomicInteger; /** * A sampler implementation using Java (WarmRoast). */ -public class JavaSampler implements Sampler, Runnable { +public class JavaSampler extends AbstractSampler implements Runnable { private static final AtomicInteger THREAD_ID = new AtomicInteger(0); /** The worker pool for inserting stack nodes */ @@ -65,32 +64,18 @@ public class JavaSampler implements Sampler, Runnable { /** The thread management interface for the current JVM */ private final ThreadMXBean threadBean = ManagementFactory.getThreadMXBean(); - /** The instance used to generate thread information for use in sampling */ - private final ThreadDumper threadDumper; + /** Responsible for aggregating and then outputting collected sampling data */ private final JavaDataAggregator dataAggregator; - - /** A future to encapsulate the completion of this sampler instance */ - private final CompletableFuture<JavaSampler> future = new CompletableFuture<>(); - /** The interval to wait between sampling, in microseconds */ - private final int interval; - /** The time when sampling first began */ - private long startTime = -1; - /** The unix timestamp (in millis) when this sampler should automatically complete. */ - private final long endTime; // -1 for nothing public JavaSampler(int interval, ThreadDumper threadDumper, ThreadGrouper threadGrouper, long endTime, boolean ignoreSleeping, boolean ignoreNative) { - this.threadDumper = threadDumper; + super(interval, threadDumper, endTime); this.dataAggregator = new SimpleDataAggregator(this.workerPool, threadGrouper, interval, ignoreSleeping, ignoreNative); - this.interval = interval; - this.endTime = endTime; } public JavaSampler(int interval, ThreadDumper threadDumper, ThreadGrouper threadGrouper, long endTime, boolean ignoreSleeping, boolean ignoreNative, TickHook tickHook, int tickLengthThreshold) { - this.threadDumper = threadDumper; + super(interval, threadDumper, endTime); this.dataAggregator = new TickedDataAggregator(this.workerPool, threadGrouper, interval, ignoreSleeping, ignoreNative, tickHook, tickLengthThreshold); - this.interval = interval; - this.endTime = endTime; } @Override @@ -100,24 +85,6 @@ public class JavaSampler implements Sampler, Runnable { } @Override - public long getStartTime() { - if (this.startTime == -1) { - throw new IllegalStateException("Not yet started"); - } - return this.startTime; - } - - @Override - public long getEndTime() { - return this.endTime; - } - - @Override - public CompletableFuture<JavaSampler> getFuture() { - return this.future; - } - - @Override public void stop() { this.task.cancel(false); } |