aboutsummaryrefslogtreecommitdiff
path: root/spark-common/src
diff options
context:
space:
mode:
authorLuck <git@lucko.me>2021-09-26 10:05:13 +0100
committerLuck <git@lucko.me>2021-09-26 10:05:13 +0100
commit89bd36f6cd9859ee8d7f6d9d51620326d678b457 (patch)
treea4fc6c57b87b539fd6921ae8d9b3df62de7e26ad /spark-common/src
parentabc2eaa49f928a064ab3f197f3d9d8e3d16f2eb8 (diff)
downloadspark-89bd36f6cd9859ee8d7f6d9d51620326d678b457.tar.gz
spark-89bd36f6cd9859ee8d7f6d9d51620326d678b457.tar.bz2
spark-89bd36f6cd9859ee8d7f6d9d51620326d678b457.zip
Support --timeout flag for async-profiler sampler mode
Diffstat (limited to 'spark-common/src')
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/command/modules/SamplerModule.java2
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/AbstractSampler.java68
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java2
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java4
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java69
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java43
6 files changed, 116 insertions, 72 deletions
diff --git a/spark-common/src/main/java/me/lucko/spark/common/command/modules/SamplerModule.java b/spark-common/src/main/java/me/lucko/spark/common/command/modules/SamplerModule.java
index c0a295a..4147de3 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/command/modules/SamplerModule.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/command/modules/SamplerModule.java
@@ -231,7 +231,7 @@ public class SamplerModule implements CommandModule {
resp.broadcastPrefixed(text("The results will be automatically returned after the profiler has been running for " + timeoutSeconds + " seconds."));
}
- CompletableFuture<? extends Sampler> future = this.activeSampler.getFuture();
+ CompletableFuture<Sampler> future = this.activeSampler.getFuture();
// send message if profiling fails
future.whenCompleteAsync((s, throwable) -> {
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/AbstractSampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/AbstractSampler.java
new file mode 100644
index 0000000..bae93b1
--- /dev/null
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/AbstractSampler.java
@@ -0,0 +1,68 @@
+/*
+ * This file is part of spark.
+ *
+ * Copyright (c) lucko (Luck) <luck@lucko.me>
+ * Copyright (c) contributors
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+package me.lucko.spark.common.sampler;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Base implementation class for {@link Sampler}s.
+ */
+public abstract class AbstractSampler implements Sampler {
+
+ /** The interval to wait between sampling, in microseconds */
+ protected final int interval;
+
+ /** The instance used to generate thread information for use in sampling */
+ protected final ThreadDumper threadDumper;
+
+ /** The time when sampling first began */
+ protected long startTime = -1;
+
+ /** The unix timestamp (in millis) when this sampler should automatically complete. */
+ protected final long endTime; // -1 for nothing
+
+ /** A future to encapsulate the completion of this sampler instance */
+ protected final CompletableFuture<Sampler> future = new CompletableFuture<>();
+
+ protected AbstractSampler(int interval, ThreadDumper threadDumper, long endTime) {
+ this.interval = interval;
+ this.threadDumper = threadDumper;
+ this.endTime = endTime;
+ }
+
+ @Override
+ public long getStartTime() {
+ if (this.startTime == -1) {
+ throw new IllegalStateException("Not yet started");
+ }
+ return this.startTime;
+ }
+
+ @Override
+ public long getEndTime() {
+ return this.endTime;
+ }
+
+ @Override
+ public CompletableFuture<Sampler> getFuture() {
+ return this.future;
+ }
+}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java
index b512bc1..b71aaee 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java
@@ -65,7 +65,7 @@ public interface Sampler {
*
* @return a future
*/
- CompletableFuture<? extends Sampler> getFuture();
+ CompletableFuture<Sampler> getFuture();
// Methods used to export the sampler data to the web viewer.
SamplerData toProto(PlatformInfo platformInfo, CommandSender creator, Comparator<? super Map.Entry<String, ThreadNode>> outputOrder, String comment, MergeMode mergeMode, ClassSourceLookup classSourceLookup);
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java
index 7dff29e..935329d 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java
@@ -95,8 +95,8 @@ public class SamplerBuilder {
int intervalMicros = (int) (this.samplingInterval * 1000d);
if (this.ticksOver == -1 || this.tickHook == null) {
- if (this.useAsyncProfiler && this.timeout == -1 && !(this.threadDumper instanceof ThreadDumper.Regex) && AsyncProfilerAccess.INSTANCE.isSupported()) {
- sampler = new AsyncSampler(intervalMicros, this.threadDumper, this.threadGrouper);
+ if (this.useAsyncProfiler && !(this.threadDumper instanceof ThreadDumper.Regex) && AsyncProfilerAccess.INSTANCE.isSupported()) {
+ sampler = new AsyncSampler(intervalMicros, this.threadDumper, this.threadGrouper, this.timeout);
} else {
sampler = new JavaSampler(intervalMicros, this.threadDumper, this.threadGrouper, this.timeout, this.ignoreSleeping, this.ignoreNative);
}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java
index ca30df0..ed09e46 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java
@@ -20,9 +20,11 @@
package me.lucko.spark.common.sampler.async;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
import me.lucko.spark.common.command.sender.CommandSender;
import me.lucko.spark.common.platform.PlatformInfo;
-import me.lucko.spark.common.sampler.Sampler;
+import me.lucko.spark.common.sampler.AbstractSampler;
import me.lucko.spark.common.sampler.ThreadDumper;
import me.lucko.spark.common.sampler.ThreadGrouper;
import me.lucko.spark.common.sampler.async.jfr.JfrReader;
@@ -41,53 +43,33 @@ import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
/**
* A sampler implementation using async-profiler.
*/
-public class AsyncSampler implements Sampler {
+public class AsyncSampler extends AbstractSampler {
private final AsyncProfiler profiler;
- /** The instance used to generate thread information for use in sampling */
- private final ThreadDumper threadDumper;
/** Responsible for aggregating and then outputting collected sampling data */
private final AsyncDataAggregator dataAggregator;
+
/** Flag to mark if the output has been completed */
private boolean outputComplete = false;
+
/** The temporary output file */
private Path outputFile;
- /** The interval to wait between sampling, in microseconds */
- private final int interval;
- /** The time when sampling first began */
- private long startTime = -1;
+ /** The executor used for timeouts */
+ private ScheduledExecutorService timeoutExecutor;
- public AsyncSampler(int interval, ThreadDumper threadDumper, ThreadGrouper threadGrouper) {
+ public AsyncSampler(int interval, ThreadDumper threadDumper, ThreadGrouper threadGrouper, long endTime) {
+ super(interval, threadDumper, endTime);
this.profiler = AsyncProfilerAccess.INSTANCE.getProfiler();
- this.threadDumper = threadDumper;
this.dataAggregator = new AsyncDataAggregator(threadGrouper);
- this.interval = interval;
- }
-
- @Override
- public long getStartTime() {
- if (this.startTime == -1) {
- throw new IllegalStateException("Not yet started");
- }
- return this.startTime;
- }
-
- @Override
- public long getEndTime() {
- return -1;
- }
-
- @Override
- public CompletableFuture<? extends Sampler> getFuture() {
- return new CompletableFuture<>();
}
/**
@@ -134,6 +116,28 @@ public class AsyncSampler implements Sampler {
this.profiler.addThread(thread);
}
}
+
+ scheduleTimeout();
+ }
+
+ private void scheduleTimeout() {
+ if (this.endTime == -1) {
+ return;
+ }
+
+ long delay = this.endTime - System.currentTimeMillis();
+ if (delay <= 0) {
+ return;
+ }
+
+ this.timeoutExecutor = Executors.newSingleThreadScheduledExecutor(
+ new ThreadFactoryBuilder().setNameFormat("spark-asyncsampler-timeout-thread").build()
+ );
+
+ this.timeoutExecutor.schedule(() -> {
+ stop();
+ this.future.complete(this);
+ }, delay, TimeUnit.MILLISECONDS);
}
/**
@@ -142,6 +146,11 @@ public class AsyncSampler implements Sampler {
@Override
public void stop() {
this.profiler.stop();
+
+ if (this.timeoutExecutor != null) {
+ this.timeoutExecutor.shutdown();
+ this.timeoutExecutor = null;
+ }
}
@Override
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java
index a7204b3..02d5f01 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java
@@ -25,7 +25,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
import me.lucko.spark.common.command.sender.CommandSender;
import me.lucko.spark.common.platform.PlatformInfo;
-import me.lucko.spark.common.sampler.Sampler;
+import me.lucko.spark.common.sampler.AbstractSampler;
import me.lucko.spark.common.sampler.ThreadDumper;
import me.lucko.spark.common.sampler.ThreadGrouper;
import me.lucko.spark.common.sampler.node.MergeMode;
@@ -42,7 +42,6 @@ import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
@@ -52,7 +51,7 @@ import java.util.concurrent.atomic.AtomicInteger;
/**
* A sampler implementation using Java (WarmRoast).
*/
-public class JavaSampler implements Sampler, Runnable {
+public class JavaSampler extends AbstractSampler implements Runnable {
private static final AtomicInteger THREAD_ID = new AtomicInteger(0);
/** The worker pool for inserting stack nodes */
@@ -65,32 +64,18 @@ public class JavaSampler implements Sampler, Runnable {
/** The thread management interface for the current JVM */
private final ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
- /** The instance used to generate thread information for use in sampling */
- private final ThreadDumper threadDumper;
+
/** Responsible for aggregating and then outputting collected sampling data */
private final JavaDataAggregator dataAggregator;
-
- /** A future to encapsulate the completion of this sampler instance */
- private final CompletableFuture<JavaSampler> future = new CompletableFuture<>();
- /** The interval to wait between sampling, in microseconds */
- private final int interval;
- /** The time when sampling first began */
- private long startTime = -1;
- /** The unix timestamp (in millis) when this sampler should automatically complete. */
- private final long endTime; // -1 for nothing
public JavaSampler(int interval, ThreadDumper threadDumper, ThreadGrouper threadGrouper, long endTime, boolean ignoreSleeping, boolean ignoreNative) {
- this.threadDumper = threadDumper;
+ super(interval, threadDumper, endTime);
this.dataAggregator = new SimpleDataAggregator(this.workerPool, threadGrouper, interval, ignoreSleeping, ignoreNative);
- this.interval = interval;
- this.endTime = endTime;
}
public JavaSampler(int interval, ThreadDumper threadDumper, ThreadGrouper threadGrouper, long endTime, boolean ignoreSleeping, boolean ignoreNative, TickHook tickHook, int tickLengthThreshold) {
- this.threadDumper = threadDumper;
+ super(interval, threadDumper, endTime);
this.dataAggregator = new TickedDataAggregator(this.workerPool, threadGrouper, interval, ignoreSleeping, ignoreNative, tickHook, tickLengthThreshold);
- this.interval = interval;
- this.endTime = endTime;
}
@Override
@@ -100,24 +85,6 @@ public class JavaSampler implements Sampler, Runnable {
}
@Override
- public long getStartTime() {
- if (this.startTime == -1) {
- throw new IllegalStateException("Not yet started");
- }
- return this.startTime;
- }
-
- @Override
- public long getEndTime() {
- return this.endTime;
- }
-
- @Override
- public CompletableFuture<JavaSampler> getFuture() {
- return this.future;
- }
-
- @Override
public void stop() {
this.task.cancel(false);
}