aboutsummaryrefslogtreecommitdiff
path: root/spark-common/src/main/java/me/lucko/spark/common/sampler
diff options
context:
space:
mode:
authorlucko <git@lucko.me>2021-03-10 15:44:14 +0000
committerGitHub <noreply@github.com>2021-03-10 15:44:14 +0000
commit9ee26a12f12a89a6df14eb86afce5d88b5a2cf25 (patch)
tree840ab480ae324b50cefc63b69dae9b38660dc94f /spark-common/src/main/java/me/lucko/spark/common/sampler
parentfd6736fc2f0bae48dda1d4b595e867d9c7244c27 (diff)
downloadspark-9ee26a12f12a89a6df14eb86afce5d88b5a2cf25.tar.gz
spark-9ee26a12f12a89a6df14eb86afce5d88b5a2cf25.tar.bz2
spark-9ee26a12f12a89a6df14eb86afce5d88b5a2cf25.zip
async-profiler support (#102)
Diffstat (limited to 'spark-common/src/main/java/me/lucko/spark/common/sampler')
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java179
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java17
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadDumper.java25
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/AbstractDataAggregator.java63
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/DataAggregator.java7
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncDataAggregator.java53
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerAccess.java117
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java279
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncStackTraceElement.java52
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/async/ProfileSegment.java61
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/ClassRef.java25
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/Dictionary.java107
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/Element.java23
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/JfrClass.java49
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/JfrField.java31
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/JfrReader.java389
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/MethodRef.java29
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/Sample.java36
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/StackTrace.java28
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaDataAggregator.java116
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java189
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/java/SimpleDataAggregator.java (renamed from spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/SimpleDataAggregator.java)20
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/java/TickedDataAggregator.java (renamed from spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/TickedDataAggregator.java)15
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/node/AbstractNode.java30
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/node/StackTraceNode.java58
25 files changed, 1754 insertions, 244 deletions
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java
index e772cb3..5088ed7 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java
@@ -1,7 +1,6 @@
/*
* This file is part of spark.
*
- * Copyright (C) Albert Pham <http://www.sk89q.com>
* Copyright (c) lucko (Luck) <luck@lucko.me>
* Copyright (c) contributors
*
@@ -21,174 +20,66 @@
package me.lucko.spark.common.sampler;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
import me.lucko.spark.common.command.sender.CommandSender;
import me.lucko.spark.common.platform.PlatformInfo;
-import me.lucko.spark.common.sampler.aggregator.DataAggregator;
-import me.lucko.spark.common.sampler.aggregator.SimpleDataAggregator;
-import me.lucko.spark.common.sampler.aggregator.TickedDataAggregator;
import me.lucko.spark.common.sampler.node.MergeMode;
import me.lucko.spark.common.sampler.node.ThreadNode;
-import me.lucko.spark.common.sampler.tick.TickHook;
import me.lucko.spark.proto.SparkProtos.SamplerData;
-import me.lucko.spark.proto.SparkProtos.SamplerMetadata;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
-import java.lang.management.ManagementFactory;
-import java.lang.management.ThreadInfo;
-import java.lang.management.ThreadMXBean;
-import java.util.ArrayList;
import java.util.Comparator;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.GZIPOutputStream;
/**
- * Main sampler class.
+ * Abstract superinterface for all sampler implementations.
*/
-public class Sampler implements Runnable {
- private static final AtomicInteger THREAD_ID = new AtomicInteger(0);
-
- /** The worker pool for inserting stack nodes */
- private final ScheduledExecutorService workerPool = Executors.newScheduledThreadPool(
- 6, new ThreadFactoryBuilder().setNameFormat("spark-worker-" + THREAD_ID.getAndIncrement() + "-%d").build()
- );
-
- /** The main sampling task */
- private ScheduledFuture<?> task;
-
- /** The thread management interface for the current JVM */
- private final ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
- /** The instance used to generate thread information for use in sampling */
- private final ThreadDumper threadDumper;
- /** Responsible for aggregating and then outputting collected sampling data */
- private final DataAggregator dataAggregator;
-
- /** A future to encapsulation the completion of this sampler instance */
- private final CompletableFuture<Sampler> future = new CompletableFuture<>();
-
- /** The interval to wait between sampling, in microseconds */
- private final int interval;
- /** The time when sampling first began */
- private long startTime = -1;
- /** The unix timestamp (in millis) when this sampler should automatically complete.*/
- private final long endTime; // -1 for nothing
-
- public Sampler(int interval, ThreadDumper threadDumper, ThreadGrouper threadGrouper, long endTime, boolean ignoreSleeping, boolean ignoreNative) {
- this.threadDumper = threadDumper;
- this.dataAggregator = new SimpleDataAggregator(this.workerPool, threadGrouper, interval, ignoreSleeping, ignoreNative);
- this.interval = interval;
- this.endTime = endTime;
- }
-
- public Sampler(int interval, ThreadDumper threadDumper, ThreadGrouper threadGrouper, long endTime, boolean ignoreSleeping, boolean ignoreNative, TickHook tickHook, int tickLengthThreshold) {
- this.threadDumper = threadDumper;
- this.dataAggregator = new TickedDataAggregator(this.workerPool, threadGrouper, interval, ignoreSleeping, ignoreNative, tickHook, tickLengthThreshold);
- this.interval = interval;
- this.endTime = endTime;
- }
+public interface Sampler {
/**
* Starts the sampler.
*/
- public void start() {
- this.startTime = System.currentTimeMillis();
- this.task = this.workerPool.scheduleAtFixedRate(this, 0, this.interval, TimeUnit.MICROSECONDS);
- }
-
- public long getStartTime() {
- if (this.startTime == -1) {
- throw new IllegalStateException("Not yet started");
- }
- return this.startTime;
- }
-
- public long getEndTime() {
- return this.endTime;
- }
-
- public CompletableFuture<Sampler> getFuture() {
- return this.future;
- }
+ void start();
- public void cancel() {
- this.task.cancel(false);
- }
-
- @Override
- public void run() {
- // this is effectively synchronized, the worker pool will not allow this task
- // to concurrently execute.
- try {
- if (this.endTime != -1 && this.endTime <= System.currentTimeMillis()) {
- this.future.complete(this);
- cancel();
- return;
- }
-
- ThreadInfo[] threadDumps = this.threadDumper.dumpThreads(this.threadBean);
- this.workerPool.execute(new InsertDataTask(this.dataAggregator, threadDumps));
- } catch (Throwable t) {
- this.future.completeExceptionally(t);
- cancel();
- }
- }
-
- private static final class InsertDataTask implements Runnable {
- private final DataAggregator dataAggregator;
- private final ThreadInfo[] threadDumps;
-
- InsertDataTask(DataAggregator dataAggregator, ThreadInfo[] threadDumps) {
- this.dataAggregator = dataAggregator;
- this.threadDumps = threadDumps;
- }
-
- @Override
- public void run() {
- for (ThreadInfo threadInfo : this.threadDumps) {
- if (threadInfo.getThreadName() == null || threadInfo.getStackTrace() == null) {
- continue;
- }
- this.dataAggregator.insertData(threadInfo);
- }
- }
- }
-
- private SamplerData toProto(PlatformInfo platformInfo, CommandSender creator, Comparator<? super Map.Entry<String, ThreadNode>> outputOrder, String comment, MergeMode mergeMode) {
- final SamplerMetadata.Builder metadata = SamplerMetadata.newBuilder()
- .setPlatform(platformInfo.toData().toProto())
- .setUser(creator.toData().toProto())
- .setStartTime(this.startTime)
- .setInterval(this.interval)
- .setThreadDumper(this.threadDumper.getMetadata())
- .setDataAggregator(this.dataAggregator.getMetadata());
-
- if (comment != null) {
- metadata.setComment(comment);
- }
-
- SamplerData.Builder proto = SamplerData.newBuilder();
- proto.setMetadata(metadata.build());
+ /**
+ * Stops the sampler.
+ */
+ void stop();
- List<Map.Entry<String, ThreadNode>> data = new ArrayList<>(this.dataAggregator.getData().entrySet());
- data.sort(outputOrder);
+ /**
+ * Gets the time when the sampler started (unix timestamp in millis)
+ *
+ * @return the start time
+ */
+ long getStartTime();
- for (Map.Entry<String, ThreadNode> entry : data) {
- proto.addThreads(entry.getValue().toProto(mergeMode));
- }
+ /**
+ * Gets the time when the sampler should automatically stop (unix timestamp in millis)
+ *
+ * @return the end time, or -1 if undefined
+ */
+ long getEndTime();
- return proto.build();
- }
+ /**
+ * Gets a future to encapsulate the completion of the sampler
+ *
+ * @return a future
+ */
+ CompletableFuture<? extends Sampler> getFuture();
+
+ // Methods used to export the sampler data to the web viewer.
+ SamplerData toProto(
+ PlatformInfo platformInfo,
+ CommandSender creator,
+ Comparator<? super Map.Entry<String, ThreadNode>> outputOrder,
+ String comment,
+ MergeMode mergeMode
+ );
- public byte[] formCompressedDataPayload(PlatformInfo platformInfo, CommandSender creator, Comparator<? super Map.Entry<String, ThreadNode>> outputOrder, String comment, MergeMode mergeMode) {
+ default byte[] formCompressedDataPayload(PlatformInfo platformInfo, CommandSender creator, Comparator<? super Map.Entry<String, ThreadNode>> outputOrder, String comment, MergeMode mergeMode) {
SamplerData proto = toProto(platformInfo, creator, outputOrder, comment, mergeMode);
ByteArrayOutputStream byteOut = new ByteArrayOutputStream();
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java
index 8993445..7abe1a7 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java
@@ -20,6 +20,9 @@
package me.lucko.spark.common.sampler;
+import me.lucko.spark.common.sampler.async.AsyncProfilerAccess;
+import me.lucko.spark.common.sampler.async.AsyncSampler;
+import me.lucko.spark.common.sampler.java.JavaSampler;
import me.lucko.spark.common.sampler.tick.TickHook;
import java.util.concurrent.TimeUnit;
@@ -32,6 +35,7 @@ public class SamplerBuilder {
private double samplingInterval = 4; // milliseconds
private boolean ignoreSleeping = false;
private boolean ignoreNative = false;
+ private boolean useAsyncProfiler = true;
private long timeout = -1;
private ThreadDumper threadDumper = ThreadDumper.ALL;
private ThreadGrouper threadGrouper = ThreadGrouper.BY_NAME;
@@ -81,14 +85,23 @@ public class SamplerBuilder {
return this;
}
+ public SamplerBuilder forceJavaSampler(boolean forceJavaSampler) {
+ this.useAsyncProfiler = !forceJavaSampler;
+ return this;
+ }
+
public Sampler start() {
Sampler sampler;
int intervalMicros = (int) (this.samplingInterval * 1000d);
if (this.ticksOver == -1 || this.tickHook == null) {
- sampler = new Sampler(intervalMicros, this.threadDumper, this.threadGrouper, this.timeout, this.ignoreSleeping, this.ignoreNative);
+ if (this.useAsyncProfiler && this.timeout == -1 && !(this.threadDumper instanceof ThreadDumper.Regex) && AsyncProfilerAccess.INSTANCE.isSupported()) {
+ sampler = new AsyncSampler(intervalMicros, this.threadDumper, this.threadGrouper);
+ } else {
+ sampler = new JavaSampler(intervalMicros, this.threadDumper, this.threadGrouper, this.timeout, this.ignoreSleeping, this.ignoreNative);
+ }
} else {
- sampler = new Sampler(intervalMicros, this.threadDumper, this.threadGrouper, this.timeout, this.ignoreSleeping, this.ignoreNative, this.tickHook, this.ticksOver);
+ sampler = new JavaSampler(intervalMicros, this.threadDumper, this.threadGrouper, this.timeout, this.ignoreSleeping, this.ignoreNative, this.tickHook, this.ticksOver);
}
sampler.start();
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadDumper.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadDumper.java
index 224918e..4863482 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadDumper.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadDumper.java
@@ -76,17 +76,38 @@ public interface ThreadDumper {
*/
final class Specific implements ThreadDumper {
private final long[] ids;
+ private Set<Thread> threads;
+ private Set<String> threadNamesLowerCase;
public Specific(long[] ids) {
this.ids = ids;
}
public Specific(Set<String> names) {
- Set<String> namesLower = names.stream().map(String::toLowerCase).collect(Collectors.toSet());
+ this.threadNamesLowerCase = names.stream().map(String::toLowerCase).collect(Collectors.toSet());
this.ids = new ThreadFinder().getThreads()
- .filter(t -> namesLower.contains(t.getName().toLowerCase()))
+ .filter(t -> this.threadNamesLowerCase.contains(t.getName().toLowerCase()))
.mapToLong(Thread::getId)
.toArray();
+ Arrays.sort(this.ids);
+ }
+
+ public Set<Thread> getThreads() {
+ if (this.threads == null) {
+ this.threads = new ThreadFinder().getThreads()
+ .filter(t -> Arrays.binarySearch(this.ids, t.getId()) >= 0)
+ .collect(Collectors.toSet());
+ }
+ return this.threads;
+ }
+
+ public Set<String> getThreadNames() {
+ if (this.threadNamesLowerCase == null) {
+ this.threadNamesLowerCase = getThreads().stream()
+ .map(t -> t.getName().toLowerCase())
+ .collect(Collectors.toSet());
+ }
+ return this.threadNamesLowerCase;
}
@Override
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/AbstractDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/AbstractDataAggregator.java
index c9a4f37..7640d60 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/AbstractDataAggregator.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/AbstractDataAggregator.java
@@ -23,37 +23,22 @@ package me.lucko.spark.common.sampler.aggregator;
import me.lucko.spark.common.sampler.ThreadGrouper;
import me.lucko.spark.common.sampler.node.ThreadNode;
-import java.lang.management.ThreadInfo;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
+/**
+ * Abstract implementation of {@link DataAggregator}.
+ */
public abstract class AbstractDataAggregator implements DataAggregator {
/** A map of root stack nodes for each thread with sampling data */
protected final Map<String, ThreadNode> threadData = new ConcurrentHashMap<>();
- /** The worker pool for inserting stack nodes */
- protected final ExecutorService workerPool;
-
/** The instance used to group threads together */
protected final ThreadGrouper threadGrouper;
- /** The interval to wait between sampling, in microseconds */
- protected final int interval;
-
- /** If sleeping threads should be ignored */
- private final boolean ignoreSleeping;
-
- /** If threads executing native code should be ignored */
- private final boolean ignoreNative;
-
- public AbstractDataAggregator(ExecutorService workerPool, ThreadGrouper threadGrouper, int interval, boolean ignoreSleeping, boolean ignoreNative) {
- this.workerPool = workerPool;
+ protected AbstractDataAggregator(ThreadGrouper threadGrouper) {
this.threadGrouper = threadGrouper;
- this.interval = interval;
- this.ignoreSleeping = ignoreSleeping;
- this.ignoreNative = ignoreNative;
}
protected ThreadNode getNode(String group) {
@@ -64,42 +49,8 @@ public abstract class AbstractDataAggregator implements DataAggregator {
return this.threadData.computeIfAbsent(group, ThreadNode::new);
}
- protected void writeData(ThreadInfo threadInfo) {
- if (this.ignoreSleeping && isSleeping(threadInfo)) {
- return;
- }
- if (this.ignoreNative && threadInfo.isInNative()) {
- return;
- }
-
- try {
- ThreadNode node = getNode(this.threadGrouper.getGroup(threadInfo.getThreadId(), threadInfo.getThreadName()));
- node.log(threadInfo.getStackTrace(), this.interval);
- } catch (Exception e) {
- e.printStackTrace();
- }
+ @Override
+ public Map<String, ThreadNode> getData() {
+ return this.threadData;
}
-
- private static boolean isSleeping(ThreadInfo thread) {
- if (thread.getThreadState() == Thread.State.WAITING || thread.getThreadState() == Thread.State.TIMED_WAITING) {
- return true;
- }
-
- StackTraceElement[] stackTrace = thread.getStackTrace();
- if (stackTrace.length == 0) {
- return false;
- }
-
- StackTraceElement call = stackTrace[0];
- String clazz = call.getClassName();
- String method = call.getMethodName();
-
- // java.lang.Thread.yield()
- // jdk.internal.misc.Unsafe.park()
- // sun.misc.Unsafe.park()
- return (clazz.equals("java.lang.Thread") && method.equals("yield")) ||
- (clazz.equals("jdk.internal.misc.Unsafe") && method.equals("park")) ||
- (clazz.equals("sun.misc.Unsafe") && method.equals("park"));
- }
-
}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/DataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/DataAggregator.java
index 8318fbd..a91a998 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/DataAggregator.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/DataAggregator.java
@@ -39,13 +39,6 @@ public interface DataAggregator {
Map<String, ThreadNode> getData();
/**
- * Inserts sampling data into this aggregator
- *
- * @param threadInfo the thread info
- */
- void insertData(ThreadInfo threadInfo);
-
- /**
* Gets metadata about the data aggregator instance.
*/
SamplerMetadata.DataAggregator getMetadata();
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncDataAggregator.java
new file mode 100644
index 0000000..cb3bd6f
--- /dev/null
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncDataAggregator.java
@@ -0,0 +1,53 @@
+/*
+ * This file is part of spark.
+ *
+ * Copyright (c) lucko (Luck) <luck@lucko.me>
+ * Copyright (c) contributors
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+package me.lucko.spark.common.sampler.async;
+
+import me.lucko.spark.common.sampler.ThreadGrouper;
+import me.lucko.spark.common.sampler.aggregator.AbstractDataAggregator;
+import me.lucko.spark.common.sampler.node.ThreadNode;
+import me.lucko.spark.proto.SparkProtos.SamplerMetadata;
+
+/**
+ * Data aggregator for {@link AsyncSampler}.
+ */
+public class AsyncDataAggregator extends AbstractDataAggregator {
+ protected AsyncDataAggregator(ThreadGrouper threadGrouper) {
+ super(threadGrouper);
+ }
+
+ @Override
+ public SamplerMetadata.DataAggregator getMetadata() {
+ return SamplerMetadata.DataAggregator.newBuilder()
+ .setType(SamplerMetadata.DataAggregator.Type.SIMPLE)
+ .setThreadGrouper(this.threadGrouper.asProto())
+ .build();
+ }
+
+ public void insertData(ProfileSegment element) {
+ try {
+ ThreadNode node = getNode(this.threadGrouper.getGroup(element.getNativeThreadId(), element.getThreadName()));
+ node.log(element.getStackTrace(), element.getTime());
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerAccess.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerAccess.java
new file mode 100644
index 0000000..3d53903
--- /dev/null
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerAccess.java
@@ -0,0 +1,117 @@
+/*
+ * This file is part of spark.
+ *
+ * Copyright (c) lucko (Luck) <luck@lucko.me>
+ * Copyright (c) contributors
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+package me.lucko.spark.common.sampler.async;
+
+import one.profiler.AsyncProfiler;
+
+import java.io.InputStream;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
+
+/**
+ * Provides a bridge between spark and async-profiler.
+ */
+public final class AsyncProfilerAccess {
+
+ // Singleton
+ public static final AsyncProfilerAccess INSTANCE = new AsyncProfilerAccess();
+
+ // Only support Linux x86_64
+ private static final String SUPPORTED_OS = "linux";
+ private static final String SUPPORTED_ARCH = "amd64";
+
+ private static AsyncProfiler load() throws Exception {
+ // check compatibility
+ String os = System.getProperty("os.name");
+ if (!SUPPORTED_OS.equalsIgnoreCase(os)) {
+ throw new UnsupportedOperationException("Only supported on Linux x86_64, your OS: " + os);
+ }
+
+ String arch = System.getProperty("os.arch");
+ if (!SUPPORTED_ARCH.equalsIgnoreCase(arch)) {
+ throw new UnsupportedOperationException("Only supported on Linux x86_64, your arch: " + os);
+ }
+
+ // extract the profiler binary from the spark jar file
+ URL profilerResource = AsyncProfilerAccess.class.getClassLoader().getResource("libasyncProfiler.so");
+ if (profilerResource == null) {
+ throw new IllegalStateException("Could not find libasyncProfiler.so in spark jar file");
+ }
+
+ Path extractPath = Files.createTempFile("spark-", "-libasyncProfiler.so.tmp");
+ extractPath.toFile().deleteOnExit();
+
+ try (InputStream in = profilerResource.openStream()) {
+ Files.copy(in, extractPath, StandardCopyOption.REPLACE_EXISTING);
+ }
+
+ // get an instance of async-profiler
+ return AsyncProfiler.getInstance(extractPath.toAbsolutePath().toString());
+ }
+
+ /** An instance of the async-profiler Java API. */
+ private final AsyncProfiler profiler;
+
+ /** If profiler is null, contains the reason why setup failed */
+ private final Exception setupException;
+
+ private AsyncProfilerAccess() {
+ AsyncProfiler profiler;
+ Exception setupException = null;
+
+ try {
+ profiler = load();
+ ensureCpuEventSupported(profiler);
+ } catch (Exception e) {
+ profiler = null;
+ setupException = e;
+ }
+
+ this.profiler = profiler;
+ this.setupException = setupException;
+ }
+
+ /**
+ * Checks the {@code profiler} to ensure the CPU event is supported.
+ *
+ * @param profiler the profiler instance
+ * @throws Exception if the event is not supported
+ */
+ private static void ensureCpuEventSupported(AsyncProfiler profiler) throws Exception {
+ String resp = profiler.execute("check,event=cpu").trim();
+ if (!resp.equalsIgnoreCase("ok")) {
+ throw new UnsupportedOperationException("CPU event is not supported");
+ }
+ }
+
+ public AsyncProfiler getProfiler() {
+ if (this.profiler == null) {
+ throw new UnsupportedOperationException("async-profiler not supported", this.setupException);
+ }
+ return this.profiler;
+ }
+
+ public boolean isSupported() {
+ return this.profiler != null;
+ }
+}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java
new file mode 100644
index 0000000..a109be7
--- /dev/null
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java
@@ -0,0 +1,279 @@
+/*
+ * This file is part of spark.
+ *
+ * Copyright (c) lucko (Luck) <luck@lucko.me>
+ * Copyright (c) contributors
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+package me.lucko.spark.common.sampler.async;
+
+import me.lucko.spark.common.command.sender.CommandSender;
+import me.lucko.spark.common.platform.PlatformInfo;
+import me.lucko.spark.common.sampler.Sampler;
+import me.lucko.spark.common.sampler.ThreadDumper;
+import me.lucko.spark.common.sampler.ThreadGrouper;
+import me.lucko.spark.common.sampler.async.jfr.ClassRef;
+import me.lucko.spark.common.sampler.async.jfr.JfrReader;
+import me.lucko.spark.common.sampler.async.jfr.MethodRef;
+import me.lucko.spark.common.sampler.async.jfr.Sample;
+import me.lucko.spark.common.sampler.async.jfr.StackTrace;
+import me.lucko.spark.common.sampler.node.MergeMode;
+import me.lucko.spark.common.sampler.node.ThreadNode;
+import me.lucko.spark.proto.SparkProtos;
+
+import one.profiler.AsyncProfiler;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
+
+/**
+ * A sampler implementation using async-profiler.
+ */
+public class AsyncSampler implements Sampler {
+ private final AsyncProfiler profiler;
+