aboutsummaryrefslogtreecommitdiff
path: root/spark-common/src/main/java/me/lucko/spark
diff options
context:
space:
mode:
authorlucko <git@lucko.me>2021-03-10 15:44:14 +0000
committerGitHub <noreply@github.com>2021-03-10 15:44:14 +0000
commit9ee26a12f12a89a6df14eb86afce5d88b5a2cf25 (patch)
tree840ab480ae324b50cefc63b69dae9b38660dc94f /spark-common/src/main/java/me/lucko/spark
parentfd6736fc2f0bae48dda1d4b595e867d9c7244c27 (diff)
downloadspark-9ee26a12f12a89a6df14eb86afce5d88b5a2cf25.tar.gz
spark-9ee26a12f12a89a6df14eb86afce5d88b5a2cf25.tar.bz2
spark-9ee26a12f12a89a6df14eb86afce5d88b5a2cf25.zip
async-profiler support (#102)
Diffstat (limited to 'spark-common/src/main/java/me/lucko/spark')
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/SparkPlatform.java2
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/command/Arguments.java23
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/command/modules/SamplerModule.java23
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java179
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java17
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadDumper.java25
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/AbstractDataAggregator.java63
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/DataAggregator.java7
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncDataAggregator.java53
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerAccess.java117
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java279
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncStackTraceElement.java52
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/async/ProfileSegment.java61
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/ClassRef.java25
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/Dictionary.java107
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/Element.java23
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/JfrClass.java49
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/JfrField.java31
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/JfrReader.java389
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/MethodRef.java29
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/Sample.java36
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/StackTrace.java28
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaDataAggregator.java116
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java189
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/java/SimpleDataAggregator.java (renamed from spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/SimpleDataAggregator.java)20
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/java/TickedDataAggregator.java (renamed from spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/TickedDataAggregator.java)15
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/node/AbstractNode.java30
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/node/StackTraceNode.java58
28 files changed, 1791 insertions, 255 deletions
diff --git a/spark-common/src/main/java/me/lucko/spark/common/SparkPlatform.java b/spark-common/src/main/java/me/lucko/spark/common/SparkPlatform.java
index cf3b0ee..a6bf332 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/SparkPlatform.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/SparkPlatform.java
@@ -211,7 +211,7 @@ public class SparkPlatform {
if (command.aliases().contains(alias)) {
try {
command.executor().execute(this, sender, resp, new Arguments(rawArgs));
- } catch (IllegalArgumentException e) {
+ } catch (Arguments.ParseException e) {
resp.replyPrefixed(text(e.getMessage(), RED));
}
return;
diff --git a/spark-common/src/main/java/me/lucko/spark/common/command/Arguments.java b/spark-common/src/main/java/me/lucko/spark/common/command/Arguments.java
index 2b202af..3cd0365 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/command/Arguments.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/command/Arguments.java
@@ -51,7 +51,7 @@ public class Arguments {
if (flag == null || matches) {
if (!matches) {
- throw new IllegalArgumentException("Expected flag at position " + i + " but got '" + arg + "' instead!");
+ throw new ParseException("Expected flag at position " + i + " but got '" + arg + "' instead!");
}
// store existing value, if present
@@ -83,7 +83,7 @@ public class Arguments {
try {
return Math.abs(Integer.parseInt(it.next()));
} catch (NumberFormatException e) {
- throw new IllegalArgumentException("Invalid input for '" + key + "' argument. Please specify a number!");
+ throw new ParseException("Invalid input for '" + key + "' argument. Please specify a number!");
}
}
return -1; // undefined
@@ -95,7 +95,7 @@ public class Arguments {
try {
return Math.abs(Double.parseDouble(it.next()));
} catch (NumberFormatException e) {
- throw new IllegalArgumentException("Invalid input for '" + key + "' argument. Please specify a number!");
+ throw new ParseException("Invalid input for '" + key + "' argument. Please specify a number!");
}
}
return -1; // undefined
@@ -108,4 +108,21 @@ public class Arguments {
public boolean boolFlag(String key) {
return this.parsedArgs.containsKey(key);
}
+
+ public static final class ParseException extends IllegalArgumentException {
+ public ParseException() {
+ }
+
+ public ParseException(String s) {
+ super(s);
+ }
+
+ public ParseException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public ParseException(Throwable cause) {
+ super(cause);
+ }
+ }
}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/command/modules/SamplerModule.java b/spark-common/src/main/java/me/lucko/spark/common/command/modules/SamplerModule.java
index eb77b24..cce3169 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/command/modules/SamplerModule.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/command/modules/SamplerModule.java
@@ -33,6 +33,7 @@ import me.lucko.spark.common.sampler.SamplerBuilder;
import me.lucko.spark.common.sampler.ThreadDumper;
import me.lucko.spark.common.sampler.ThreadGrouper;
import me.lucko.spark.common.sampler.ThreadNodeOrder;
+import me.lucko.spark.common.sampler.async.AsyncSampler;
import me.lucko.spark.common.sampler.node.MergeMode;
import me.lucko.spark.common.sampler.tick.TickHook;
import me.lucko.spark.common.util.MethodDisambiguator;
@@ -55,13 +56,13 @@ import static net.kyori.adventure.text.format.NamedTextColor.*;
public class SamplerModule implements CommandModule {
private static final MediaType SPARK_SAMPLER_MEDIA_TYPE = MediaType.parse("application/x-spark-sampler");
- /** The WarmRoast instance currently running, if any */
+ /** The sampler instance currently running, if any */
private Sampler activeSampler = null;
@Override
public void close() {
if (this.activeSampler != null) {
- this.activeSampler.cancel();
+ this.activeSampler.stop();
this.activeSampler = null;
}
}
@@ -83,6 +84,7 @@ public class SamplerModule implements CommandModule {
.argumentUsage("only-ticks-over", "tick length millis")
.argumentUsage("ignore-sleeping", null)
.argumentUsage("ignore-native", null)
+ .argumentUsage("force-java-sampler", null)
.argumentUsage("order-by-time", null)
.argumentUsage("separate-parent-calls", null)
.executor((platform, sender, resp, arguments) -> {
@@ -118,7 +120,7 @@ public class SamplerModule implements CommandModule {
if (this.activeSampler == null) {
resp.replyPrefixed(text("There isn't an active sampling task running."));
} else {
- this.activeSampler.cancel();
+ this.activeSampler.stop();
resp.broadcastPrefixed(text("The active sampling operation has been stopped! Uploading results..."));
ThreadNodeOrder threadOrder = arguments.boolFlag("order-by-time") ? ThreadNodeOrder.BY_TIME : ThreadNodeOrder.BY_NAME;
String comment = Iterables.getFirst(arguments.stringFlag("comment"), null);
@@ -149,6 +151,7 @@ public class SamplerModule implements CommandModule {
boolean ignoreSleeping = arguments.boolFlag("ignore-sleeping");
boolean ignoreNative = arguments.boolFlag("ignore-native");
+ boolean forceJavaSampler = arguments.boolFlag("force-java-sampler");
Set<String> threads = arguments.stringFlag("thread");
ThreadDumper threadDumper;
@@ -201,19 +204,25 @@ public class SamplerModule implements CommandModule {
builder.samplingInterval(intervalMillis);
builder.ignoreSleeping(ignoreSleeping);
builder.ignoreNative(ignoreNative);
+ builder.forceJavaSampler(forceJavaSampler);
if (ticksOver != -1) {
builder.ticksOver(ticksOver, tickHook);
}
Sampler sampler = this.activeSampler = builder.start();
- resp.broadcastPrefixed(text("Profiler now active!", GOLD));
+ resp.broadcastPrefixed(text()
+ .append(text("Profiler now active!", GOLD))
+ .append(space())
+ .append(text("(" + (sampler instanceof AsyncSampler ? "async" : "built-in java") + ")", DARK_GRAY))
+ .build()
+ );
if (timeoutSeconds == -1) {
resp.broadcastPrefixed(text("Use '/" + platform.getPlugin().getCommandName() + " profiler --stop' to stop profiling and upload the results."));
} else {
resp.broadcastPrefixed(text("The results will be automatically returned after the profiler has been running for " + timeoutSeconds + " seconds."));
}
- CompletableFuture<Sampler> future = this.activeSampler.getFuture();
+ CompletableFuture<? extends Sampler> future = this.activeSampler.getFuture();
// send message if profiling fails
future.whenCompleteAsync((s, throwable) -> {
@@ -253,8 +262,8 @@ public class SamplerModule implements CommandModule {
List<String> opts = new ArrayList<>(Arrays.asList("--info", "--stop", "--cancel",
"--timeout", "--regex", "--combine-all", "--not-combined", "--interval",
- "--only-ticks-over", "--ignore-sleeping", "--ignore-native", "--order-by-time",
- "--separate-parent-calls", "--comment"));
+ "--only-ticks-over", "--ignore-sleeping", "--ignore-native", "--force-java-sampler",
+ "--order-by-time", "--separate-parent-calls", "--comment"));
opts.removeAll(arguments);
opts.add("--thread"); // allowed multiple times
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java
index e772cb3..5088ed7 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java
@@ -1,7 +1,6 @@
/*
* This file is part of spark.
*
- * Copyright (C) Albert Pham <http://www.sk89q.com>
* Copyright (c) lucko (Luck) <luck@lucko.me>
* Copyright (c) contributors
*
@@ -21,174 +20,66 @@
package me.lucko.spark.common.sampler;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
import me.lucko.spark.common.command.sender.CommandSender;
import me.lucko.spark.common.platform.PlatformInfo;
-import me.lucko.spark.common.sampler.aggregator.DataAggregator;
-import me.lucko.spark.common.sampler.aggregator.SimpleDataAggregator;
-import me.lucko.spark.common.sampler.aggregator.TickedDataAggregator;
import me.lucko.spark.common.sampler.node.MergeMode;
import me.lucko.spark.common.sampler.node.ThreadNode;
-import me.lucko.spark.common.sampler.tick.TickHook;
import me.lucko.spark.proto.SparkProtos.SamplerData;
-import me.lucko.spark.proto.SparkProtos.SamplerMetadata;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
-import java.lang.management.ManagementFactory;
-import java.lang.management.ThreadInfo;
-import java.lang.management.ThreadMXBean;
-import java.util.ArrayList;
import java.util.Comparator;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.GZIPOutputStream;
/**
- * Main sampler class.
+ * Abstract superinterface for all sampler implementations.
*/
-public class Sampler implements Runnable {
- private static final AtomicInteger THREAD_ID = new AtomicInteger(0);
-
- /** The worker pool for inserting stack nodes */
- private final ScheduledExecutorService workerPool = Executors.newScheduledThreadPool(
- 6, new ThreadFactoryBuilder().setNameFormat("spark-worker-" + THREAD_ID.getAndIncrement() + "-%d").build()
- );
-
- /** The main sampling task */
- private ScheduledFuture<?> task;
-
- /** The thread management interface for the current JVM */
- private final ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
- /** The instance used to generate thread information for use in sampling */
- private final ThreadDumper threadDumper;
- /** Responsible for aggregating and then outputting collected sampling data */
- private final DataAggregator dataAggregator;
-
- /** A future to encapsulation the completion of this sampler instance */
- private final CompletableFuture<Sampler> future = new CompletableFuture<>();
-
- /** The interval to wait between sampling, in microseconds */
- private final int interval;
- /** The time when sampling first began */
- private long startTime = -1;
- /** The unix timestamp (in millis) when this sampler should automatically complete.*/
- private final long endTime; // -1 for nothing
-
- public Sampler(int interval, ThreadDumper threadDumper, ThreadGrouper threadGrouper, long endTime, boolean ignoreSleeping, boolean ignoreNative) {
- this.threadDumper = threadDumper;
- this.dataAggregator = new SimpleDataAggregator(this.workerPool, threadGrouper, interval, ignoreSleeping, ignoreNative);
- this.interval = interval;
- this.endTime = endTime;
- }
-
- public Sampler(int interval, ThreadDumper threadDumper, ThreadGrouper threadGrouper, long endTime, boolean ignoreSleeping, boolean ignoreNative, TickHook tickHook, int tickLengthThreshold) {
- this.threadDumper = threadDumper;
- this.dataAggregator = new TickedDataAggregator(this.workerPool, threadGrouper, interval, ignoreSleeping, ignoreNative, tickHook, tickLengthThreshold);
- this.interval = interval;
- this.endTime = endTime;
- }
+public interface Sampler {
/**
* Starts the sampler.
*/
- public void start() {
- this.startTime = System.currentTimeMillis();
- this.task = this.workerPool.scheduleAtFixedRate(this, 0, this.interval, TimeUnit.MICROSECONDS);
- }
-
- public long getStartTime() {
- if (this.startTime == -1) {
- throw new IllegalStateException("Not yet started");
- }
- return this.startTime;
- }
-
- public long getEndTime() {
- return this.endTime;
- }
-
- public CompletableFuture<Sampler> getFuture() {
- return this.future;
- }
+ void start();
- public void cancel() {
- this.task.cancel(false);
- }
-
- @Override
- public void run() {
- // this is effectively synchronized, the worker pool will not allow this task
- // to concurrently execute.
- try {
- if (this.endTime != -1 && this.endTime <= System.currentTimeMillis()) {
- this.future.complete(this);
- cancel();
- return;
- }
-
- ThreadInfo[] threadDumps = this.threadDumper.dumpThreads(this.threadBean);
- this.workerPool.execute(new InsertDataTask(this.dataAggregator, threadDumps));
- } catch (Throwable t) {
- this.future.completeExceptionally(t);
- cancel();
- }
- }
-
- private static final class InsertDataTask implements Runnable {
- private final DataAggregator dataAggregator;
- private final ThreadInfo[] threadDumps;
-
- InsertDataTask(DataAggregator dataAggregator, ThreadInfo[] threadDumps) {
- this.dataAggregator = dataAggregator;
- this.threadDumps = threadDumps;
- }
-
- @Override
- public void run() {
- for (ThreadInfo threadInfo : this.threadDumps) {
- if (threadInfo.getThreadName() == null || threadInfo.getStackTrace() == null) {
- continue;
- }
- this.dataAggregator.insertData(threadInfo);
- }
- }
- }
-
- private SamplerData toProto(PlatformInfo platformInfo, CommandSender creator, Comparator<? super Map.Entry<String, ThreadNode>> outputOrder, String comment, MergeMode mergeMode) {
- final SamplerMetadata.Builder metadata = SamplerMetadata.newBuilder()
- .setPlatform(platformInfo.toData().toProto())
- .setUser(creator.toData().toProto())
- .setStartTime(this.startTime)
- .setInterval(this.interval)
- .setThreadDumper(this.threadDumper.getMetadata())
- .setDataAggregator(this.dataAggregator.getMetadata());
-
- if (comment != null) {
- metadata.setComment(comment);
- }
-
- SamplerData.Builder proto = SamplerData.newBuilder();
- proto.setMetadata(metadata.build());
+ /**
+ * Stops the sampler.
+ */
+ void stop();
- List<Map.Entry<String, ThreadNode>> data = new ArrayList<>(this.dataAggregator.getData().entrySet());
- data.sort(outputOrder);
+ /**
+ * Gets the time when the sampler started (unix timestamp in millis)
+ *
+ * @return the start time
+ */
+ long getStartTime();
- for (Map.Entry<String, ThreadNode> entry : data) {
- proto.addThreads(entry.getValue().toProto(mergeMode));
- }
+ /**
+ * Gets the time when the sampler should automatically stop (unix timestamp in millis)
+ *
+ * @return the end time, or -1 if undefined
+ */
+ long getEndTime();
- return proto.build();
- }
+ /**
+ * Gets a future to encapsulate the completion of the sampler
+ *
+ * @return a future
+ */
+ CompletableFuture<? extends Sampler> getFuture();
+
+ // Methods used to export the sampler data to the web viewer.
+ SamplerData toProto(
+ PlatformInfo platformInfo,
+ CommandSender creator,
+ Comparator<? super Map.Entry<String, ThreadNode>> outputOrder,
+ String comment,
+ MergeMode mergeMode
+ );
- public byte[] formCompressedDataPayload(PlatformInfo platformInfo, CommandSender creator, Comparator<? super Map.Entry<String, ThreadNode>> outputOrder, String comment, MergeMode mergeMode) {
+ default byte[] formCompressedDataPayload(PlatformInfo platformInfo, CommandSender creator, Comparator<? super Map.Entry<String, ThreadNode>> outputOrder, String comment, MergeMode mergeMode) {
SamplerData proto = toProto(platformInfo, creator, outputOrder, comment, mergeMode);
ByteArrayOutputStream byteOut = new ByteArrayOutputStream();
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java
index 8993445..7abe1a7 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java
@@ -20,6 +20,9 @@
package me.lucko.spark.common.sampler;
+import me.lucko.spark.common.sampler.async.AsyncProfilerAccess;
+import me.lucko.spark.common.sampler.async.AsyncSampler;
+import me.lucko.spark.common.sampler.java.JavaSampler;
import me.lucko.spark.common.sampler.tick.TickHook;
import java.util.concurrent.TimeUnit;
@@ -32,6 +35,7 @@ public class SamplerBuilder {
private double samplingInterval = 4; // milliseconds
private boolean ignoreSleeping = false;
private boolean ignoreNative = false;
+ private boolean useAsyncProfiler = true;
private long timeout = -1;
private ThreadDumper threadDumper = ThreadDumper.ALL;
private ThreadGrouper threadGrouper = ThreadGrouper.BY_NAME;
@@ -81,14 +85,23 @@ public class SamplerBuilder {
return this;
}
+ public SamplerBuilder forceJavaSampler(boolean forceJavaSampler) {
+ this.useAsyncProfiler = !forceJavaSampler;
+ return this;
+ }
+
public Sampler start() {
Sampler sampler;
int intervalMicros = (int) (this.samplingInterval * 1000d);
if (this.ticksOver == -1 || this.tickHook == null) {
- sampler = new Sampler(intervalMicros, this.threadDumper, this.threadGrouper, this.timeout, this.ignoreSleeping, this.ignoreNative);
+ if (this.useAsyncProfiler && this.timeout == -1 && !(this.threadDumper instanceof ThreadDumper.Regex) && AsyncProfilerAccess.INSTANCE.isSupported()) {
+ sampler = new AsyncSampler(intervalMicros, this.threadDumper, this.threadGrouper);
+ } else {
+ sampler = new JavaSampler(intervalMicros, this.threadDumper, this.threadGrouper, this.timeout, this.ignoreSleeping, this.ignoreNative);
+ }
} else {
- sampler = new Sampler(intervalMicros, this.threadDumper, this.threadGrouper, this.timeout, this.ignoreSleeping, this.ignoreNative, this.tickHook, this.ticksOver);
+ sampler = new JavaSampler(intervalMicros, this.threadDumper, this.threadGrouper, this.timeout, this.ignoreSleeping, this.ignoreNative, this.tickHook, this.ticksOver);
}
sampler.start();
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadDumper.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadDumper.java
index 224918e..4863482 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadDumper.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadDumper.java
@@ -76,17 +76,38 @@ public interface ThreadDumper {
*/
final class Specific implements ThreadDumper {
private final long[] ids;
+ private Set<Thread> threads;
+ private Set<String> threadNamesLowerCase;
public Specific(long[] ids) {
this.ids = ids;
}
public Specific(Set<String> names) {
- Set<String> namesLower = names.stream().map(String::toLowerCase).collect(Collectors.toSet());
+ this.threadNamesLowerCase = names.stream().map(String::toLowerCase).collect(Collectors.toSet());
this.ids = new ThreadFinder().getThreads()
- .filter(t -> namesLower.contains(t.getName().toLowerCase()))
+ .filter(t -> this.threadNamesLowerCase.contains(t.getName().toLowerCase()))
.mapToLong(Thread::getId)
.toArray();
+ Arrays.sort(this.ids);
+ }
+
+ public Set<Thread> getThreads() {
+ if (this.threads == null) {
+ this.threads = new ThreadFinder().getThreads()
+ .filter(t -> Arrays.binarySearch(this.ids, t.getId()) >= 0)
+ .collect(Collectors.toSet());
+ }
+ return this.threads;
+ }
+
+ public Set<String> getThreadNames() {
+ if (this.threadNamesLowerCase == null) {
+ this.threadNamesLowerCase = getThreads().stream()
+ .map(t -> t.getName().toLowerCase())
+ .collect(Collectors.toSet());
+ }
+ return this.threadNamesLowerCase;
}
@Override
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/AbstractDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/AbstractDataAggregator.java
index c9a4f37..7640d60 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/AbstractDataAggregator.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/AbstractDataAggregator.java
@@ -23,37 +23,22 @@ package me.lucko.spark.common.sampler.aggregator;
import me.lucko.spark.common.sampler.ThreadGrouper;
import me.lucko.spark.common.sampler.node.ThreadNode;
-import java.lang.management.ThreadInfo;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
+/**
+ * Abstract implementation of {@link DataAggregator}.
+ */
public abstract class AbstractDataAggregator implements DataAggregator {
/** A map of root stack nodes for each thread with sampling data */
protected final Map<String, ThreadNode> threadData = new ConcurrentHashMap<>();
- /** The worker pool for inserting stack nodes */
- protected final ExecutorService workerPool;
-
/** The instance used to group threads together */
protected final ThreadGrouper threadGrouper;
- /** The interval to wait between sampling, in microseconds */
- protected final int interval;
-
- /** If sleeping threads should be ignored */
- private final boolean ignoreSleeping;
-
- /** If threads executing native code should be ignored */
- private final boolean ignoreNative;
-
- public AbstractDataAggregator(ExecutorService workerPool, ThreadGrouper threadGrouper, int interval, boolean ignoreSleeping, boolean ignoreNative) {
- this.workerPool = workerPool;
+ protected AbstractDataAggregator(ThreadGrouper threadGrouper) {
this.threadGrouper = threadGrouper;
- this.interval = interval;
- this.ignoreSleeping = ignoreSleeping;
- this.ignoreNative = ignoreNative;
}
protected ThreadNode getNode(String group) {
@@ -64,42 +49,8 @@ public abstract class AbstractDataAggregator implements DataAggregator {
return this.threadData.computeIfAbsent(group, ThreadNode::new);
}
- protected void writeData(ThreadInfo threadInfo) {
- if (this.ignoreSleeping && isSleeping(threadInfo)) {
- return;
- }
- if (this.ignoreNative && threadInfo.isInNative()) {
- return;
- }
-
- try {
- ThreadNode node = getNode(this.threadGrouper.getGroup(threadInfo.getThreadId(), threadInfo.getThreadName()));
- node.log(threadInfo.getStackTrace(), this.interval);
- } catch (Exception e) {
- e.printStackTrace();
- }
+ @Override
+ public Map<String, ThreadNode> getData() {
+ return this.threadData;
}
-
- private static boolean isSleeping(ThreadInfo thread) {
- if (thread.getThreadState() == Thread.State.WAITING || thread.getThreadState() == Thread.State.TIMED_WAITING) {
- return true;
- }
-
- StackTraceElement[] stackTrace = thread.getStackTrace();
- if (stackTrace.length == 0) {
- return false;
- }
-
- StackTraceElement call = stackTrace[0];
- String clazz = call.getClassName();
- String method = call.getMethodName();
-
- // java.lang.Thread.yield()
- // jdk.internal.misc.Unsafe.park()
- // sun.misc.Unsafe.park()
- return (clazz.equals("java.lang.Thread") && method.equals("yield")) ||
- (clazz.equals("jdk.internal.misc.Unsafe") && method.equals("park")) ||
- (clazz.equals("sun.misc.Unsafe") && method.equals("park"));
- }
-
}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/DataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/DataAggregator.java
index 8318fbd..a91a998 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/DataAggregator.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/DataAggregator.java
@@ -39,13 +39,6 @@ public interface DataAggregator {
Map<String, ThreadNode> getData();
/**
- * Inserts sampling data into this aggregator
- *
- * @param threadInfo the thread info
- */
- void insertData(ThreadInfo threadInfo);
-
- /**
* Gets metadata about the data aggregator instance.
*/
SamplerMetadata.DataAggregator getMetadata();
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncDataAggregator.java
new file mode 100644
index 0000000..cb3bd6f
--- /dev/null
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncDataAggregator.java
@@ -0,0 +1,53 @@
+/*
+ * This file is part of spark.
+ *
+ * Copyright (c) lucko (Luck) <luck@lucko.me>
+ * Copyright (c) contributors
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+package me.lucko.spark.common.sampler.async;
+
+import me.lucko.spark.common.sampler.ThreadGrouper;
+import me.lucko.spark.common.sampler.aggregator.AbstractDataAggregator;
+import me.lucko.spark.common.sampler.node.ThreadNode;
+import me.lucko.spark.proto.SparkProtos.SamplerMetadata;
+
+/**
+ * Data aggregator for {@link AsyncSampler}.
+ */
+public class AsyncDataAggregator extends AbstractDataAggregator {
+ protected AsyncDataAggregator(ThreadGrouper threadGrouper) {
+ super(threadGrouper);
+ }
+
+ @Override
+ public SamplerMetadata.DataAggregator getMetadata() {
+ return SamplerMetadata.DataAggregator.newBuilder()
+ .setType(SamplerMetadata.DataAggregator.Type.SIMPLE)
+ .setThreadGrouper(this.threadGrouper.asProto())
+ .build();
+ }
+
+ public void insertData(ProfileSegment element) {
+ try {
+ ThreadNode node = getNode(this.threadGrouper.getGroup(element.getNativeThreadId(), element.getThreadName()));
+ node.log(element.getStackTrace(), element.getTime());
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerAccess.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerAccess.java
new file mode 100644
index 0000000..3d53903
--- /dev/null
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerAccess.java
@@ -0,0 +1,117 @@
+/*
+ * This file is part of spark.
+ *
+ * Copyright (c) lucko (Luck) <luck@lucko.me>
+ * Copyright (c) contributors
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+package me.lucko.spark.common.sampler.async;
+
+import one.profiler.AsyncProfiler;
+
+import java.io.InputStream;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
+
+/**
+ * Provides a bridge between spark and async-profiler.
+ */
+public final class AsyncProfilerAccess {
+
+ // Singleton
+ public static final AsyncProfilerAccess INSTANCE = new AsyncProfilerAccess();
+
+ // Only support Linux x86_64
+ private static final String SUPPORTED_OS = "linux";
+ private static final String SUPPORTED_ARCH = "amd64";
+
+ private static AsyncProfiler load() throws Exception {
+ // check compatibility
+ String os = System.getProperty("os.name");
+ if (!SUPPORTED_OS.equalsIgnoreCase(os)) {
+ throw new UnsupportedOperationException("Only supported on Linux x86_64, your OS: " + os);
+ }
+
+ String arch = System.getProperty("os.arch");
+ if (!SUPPORTED_ARCH.equalsIgnoreCase(arch)) {
+ throw new UnsupportedOperationException("Only supported on Linux x86_64, your arch: " + os);
+ }
+
+ // extract the profiler binary from the spark jar file
+ URL profilerResource = AsyncProfilerAccess.class.getClassLoader().getResource("libasyncProfiler.so");
+ if (profilerResource == null) {
+ throw new IllegalStateException("Could not find libasyncProfiler.so in spark jar file");
+ }
+
+ Path extractPath = Files.createTempFile("spark-", "-libasyncProfiler.so.tmp");
+ extractPath.toFile().deleteOnExit();
+
+ try (InputStream in = profilerResource.openStream()) {
+ Files.copy(in, extractPath, StandardCopyOption.REPLACE_EXISTING);
+ }
+
+ // get an instance of async-profiler
+ return AsyncProfiler.getInstance(extractPath.toAbsolutePath().toString());
+ }
+
+ /** An instance of the async-profiler Java API. */
+ private final AsyncProfiler profiler;
+
+ /** If profiler is null, contains the reason why setup failed */
+ private final Exception setupException;
+
+ private AsyncProfilerAccess() {
+ AsyncProfiler profiler;
+ Exception setupException = null;
+
+ try {
+ profiler = load();
+ ensureCpuEventSupported(profiler);
+ } catch (Exception e) {
+ profiler = null;
+ setupException = e;
+ }
+
+ this.profiler = profiler;
+ this.setupException = setupException;
+ }
+
+ /**
+ * Checks the {@code profiler} to ensure the CPU event is supported.
+ *
+ * @param profiler the profiler instance
+ * @throws Exception if the event is not supported
+ */
+ private static void ensureCpuEventSupported(AsyncProfiler profiler) throws Exception {
+ String resp = profiler.execute("check,event=cpu").trim();
+ if (!resp.equalsIgnoreCase("ok")) {
+ throw new UnsupportedOperationException("CPU event is not supported");
+ }
+ }
+
+ public AsyncProfiler getProfiler() {
+ if (this.profiler == null) {
+ throw new UnsupportedOperationException("async-profiler not supported", this.setupException);
+ }
+ return this.profiler;
+ }
+
+ public boolean isSupported() {
+ return this.profiler != null;
+ }
+}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java
new file mode 100644
index 0000000..a109be7
--- /dev/null
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java
@@ -0,0 +1,279 @@
+/*
+ * This file is part of spark.
+ *
+ * Copyright (c) lucko (Luck) <luck@lucko.me>
+ * Copyright (c) contributors
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+package me.lucko.spark.common.sampler.async;
+
+import me.lucko.spark.common.command.sender.CommandSender;
+import me.lucko.spark.common.platform.PlatformInfo;
+import me.lucko.spark.common.sampler.Sampler;
+import me.lucko.spark.common.sampler.ThreadDumper;
+import me.lucko.spark.common.sampler.ThreadGrouper;
+import me.lucko.spark.common.sampler.async.jfr.ClassRef;
+import me.lucko.spark.common.sampler.async.jfr.JfrReader;
+import me.lucko.spark.common.sampler.async.jfr.MethodRef;
+import me.lucko.spark.common.sampler.async.jfr.Sample;
+import me.lucko.spark.common.sampler.async.jfr.StackTrace;
+import me.lucko.spark.common.sampler.node.MergeMode;
+import me.lucko.spark.common.sampler.node.ThreadNode;
+import me.lucko.spark.proto.SparkProtos;
+
+import one.profiler.AsyncProfiler;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
+
+/**
+ * A sampler implementation using async-profiler.
+ */
+public class AsyncSampler implements Sampler {
+ private final AsyncProfiler profiler;
+
+ /** The instance used to generate thread information for use in sampling */
+ private final ThreadDumper threadDumper;
+ /** Responsible for aggregating and then outputting collected sampling data */
+ private final AsyncDataAggregator dataAggregator;
+ /** Flag to mark if the output has been completed */
+ private boolean outputComplete = false;
+ /** The temporary output file */
+ private Path outputFile;
+
+ /** The interval to wait between sampling, in microseconds */
+ private final int interval;
+ /** The time when sampling first began */
+ private long startTime = -1;
+
+ public AsyncSampler(int interval, ThreadDumper threadDumper, ThreadGrouper threadGrouper) {
+ this.profiler = AsyncProfilerAccess.INSTANCE.getProfiler();
+ this.threadDumper = threadDumper;
+ this.dataAggregator = new AsyncDataAggregator(threadGrouper);
+ this.interval = interval;
+ }
+
+ @Override
+ public long getStartTime() {
+ if (this.startTime == -1) {
+ throw new IllegalStateException("Not yet started");
+ }
+ return this.startTime;
+ }
+
+ @Override
+ public long getEndTime() {
+ return -1;
+ }
+
+ @Override
+ public CompletableFuture<? extends Sampler> getFuture() {
+ return new CompletableFuture<>();
+ }
+
+ /**
+ * Executes a profiler command.
+ *
+ * @param command the command to execute
+ * @return the response
+ */
+ private String execute(String command) {
+ try {
+ return this.profiler.execute(command);
+ } catch (IOException e) {
+ throw new RuntimeException("Exception whilst executing profiler command", e);
+ }
+ }
+
+ /**
+ * Starts the profiler.
+ */
+ @Override
+ public void start() {
+ this.startTime = System.currentTimeMillis();
+
+ try {
+ this.outputFile = Files.createTempFile("spark-profile-", ".jfr.tmp");
+ this.outputFile.toFile().deleteOnExit();
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to create temporary output file", e);
+ }
+
+ String command = "start,event=cpu,interval=" + this.interval + "us,threads,jfr,file=" + this.outputFile.toString();
+ if (this.threadDumper instanceof ThreadDumper.Specific) {
+ command += ",filter";
+ }
+
+ String resp = execute(command).trim();
+ if (!resp.equalsIgnoreCase("profiling started")) {
+ throw new RuntimeException("Unexpected response: " + resp);
+ }
+
+ if (this.threadDumper instanceof ThreadDumper.Specific) {
+ ThreadDumper.Specific threadDumper = (ThreadDumper.Specific) this.threadDumper;
+ for (Thread thread : threadDumper.getThreads()) {
+ this.profiler.addThread(thread);
+ }
+ }
+ }
+
+ /**
+ * Stops the profiler.
+ */
+ @Override
+ public void stop() {
+ this.profiler.stop();
+ }
+
+ @Override
+ public SparkProtos.SamplerData toProto(PlatformInfo platformInfo, CommandSender creator, Comparator<? super Map.Entry<String, ThreadNode>> outputOrder, String comment, MergeMode mergeMode) {
+ final SparkProtos.SamplerMetadata.Builder metadata = SparkProtos.SamplerMetadata.newBuilder()
+ .setPlatform(platformInfo.toData().toProto())
+ .setUser(creator.toData().toProto())
+ .setStartTime(this.startTime)
+ .setInterval(this.interval)
+ .setThreadDumper(this.threadDumper.getMetadata())
+ .setDataAggregator(this.dataAggregator.getMetadata());
+
+ if (comment != null) {
+ metadata.setComment(comment);
+ }
+
+ SparkProtos.SamplerData.Builder proto = SparkProtos.SamplerData.newBuilder();
+ proto.setMetadata(metadata.build());
+
+ aggregateOutput();
+
+ List<Map.Entry<String, ThreadNode>> data = new ArrayList<>(this.dataAggregator.getData().entrySet());
+ data.sort(outputOrder);
+
+ for (Map.Entry<String, ThreadNode> entry : data) {
+ proto.addThreads(entry.getValue().toProto(mergeMode));
+ }
+
+ return proto.build();
+ }
+
+ private void aggregateOutput() {
+ if (this.outputComplete) {
+ return;
+ }
+ this.outputComplete = true;
+
+ Predicate<String> threadFilter;
+ if (this.threadDumper instanceof ThreadDumper.Specific) {
+ ThreadDumper.Specific threadDumper = (ThreadDumper.Specific) this.threadDumper;
+ threadFilter = n -> threadDumper.getThreadNames().contains(n.toLowerCase());
+ } else {
+ threadFilter = n -> true;
+ }
+
+ // read the jfr file produced by async-profiler
+ try (JfrReader reader = new JfrReader(this.outputFile)) {
+ readSegments(reader, threadFilter);
+ } catch (IOException e) {
+ throw new RuntimeException("Read error", e);
+ }
+
+ // delete the output file after reading
+ try {
+ Files.deleteIfExists(this.outputFile);
+ } catch (IOException e) {
+ // ignore
+ }
+ }
+
+ private void readSegments(JfrReader reader, Predicate<String> threadFilter) {
+ List<Sample> samples = reader.samples;
+ for (int i = 0; i < samples.size(); i++) {
+ Sample sample = samples.get(i);
+
+ long duration;
+ if (i == 0) {
+ // we don't really know the duration of the first sample, so just use the sampling
+ // interval
+ duration = this.interval;
+ } else {
+ // calculate the duration of the sample by calculating the time elapsed since the
+ // previous sample
+ duration = TimeUnit.NANOSECONDS.toMicros(sample.time - samples.get(i - 1).time);
+ }
+
+ String threadName = reader.threads.get(sample.tid);
+ if (!threadFilter.test(threadName)) {
+ continue;
+ }
+
+ // parse the segment and give it to the data aggregator
+ ProfileSegment segment = parseSegment(reader, sample, threadName, duration);
+ this.dataAggregator.insertData(segment);
+ }
+ }
+
+ private static ProfileSegment parseSegment(JfrReader reader, Sample sample, String threadName, long duration) {
+ StackTrace stackTrace = reader.stackTraces.get(sample.stackTraceId);
+ int len = stackTrace.methods.length;
+
+ AsyncStackTraceElement[] stack = new AsyncStackTraceElement[len];
+ for (int i = 0; i < len; i++) {
+ stack[i] = parseStackFrame(reader, stackTrace.methods[i]);
+ }
+
+ return new ProfileSegment(sample.tid, threadName, stack, duration);
+ }
+
+ private static AsyncStackTraceElement parseStackFrame(JfrReader reader, long methodId) {
+ AsyncStackTraceElement result = reader.stackFrames.get(methodId);
+ if (result != null) {
+ return result;
+ }
+
+ MethodRef methodRef = reader.methods.get(methodId);
+ ClassRef classRef = reader.classes.get(methodRef.cls);
+
+ byte[] className = reader.symbols.get(classRef.name);
+ byte[] methodName = reader.symbols.get(methodRef.name);
+
+ if (className == null || className.length == 0) {
+ // native call
+ result = new AsyncStackTraceElement(
+ "native",
+ new String(methodName, StandardCharsets.UTF_8),
+ null
+ );
+ } else {
+ // java method
+ byte[] methodDesc = reader.symbols.get(methodRef.sig);
+ result = new AsyncStackTraceElement(
+ new String(className, StandardCharsets.UTF_8).replace('/', '.'),
+ new String(methodName, StandardCharsets.UTF_8),
+ new String(methodDesc, StandardCharsets.UTF_8)
+ );
+ }
+
+ reader.stackFrames.put(methodId, result);
+ return result;
+ }
+}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncStackTraceElement.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncStackTraceElement.java
new file mode 100644
index 0000000..cf66ded
--- /dev/null
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncStackTraceElement.java
@@ -0,0 +1,52 @@
+/*
+ * This file is part of spark.
+ *
+ * Copyright (c) lucko (Luck) <luck@lucko.me>
+ * Copyright (c) contributors
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+package me.lucko.spark.common.sampler.async;
+
+/**
+ * Version of {@link StackTraceElement} for async-profiler output.
+ */
+public class AsyncStackTraceElement {
+
+ /** The name of the class */
+ private final String className;
+ /** The name of the method */
+ private final String methodName;
+ /** The method description */
+ private final String methodDescription;
+
+ public AsyncStackTraceElement(String className, String methodName, String methodDescription) {
+ this.className = className;
+ this.methodName = methodName;
+ this.methodDescription = methodDescription;
+ }
+
+ public String getClassName() {
+ return this.className;
+ }
+
+ public String getMethodName() {
+ return this.methodName;
+ }
+
+ public String getMethodDescription() {
+ return this.methodDescription;
+ }
+}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/ProfileSegment.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/ProfileSegment.java
new file mode 100644
index 0000000..154e6fe
--- /dev/null
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/ProfileSegment.java
@@ -0,0 +1,61 @@
+/*
+ * This file is part of spark.
+ *
+ * Copyright (c) lucko (Luck) <luck@lucko.me>
+ * Copyright (c) contributors
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+package me.lucko.spark.common.sampler.async;
+
+/**
+ * Represents a profile "segment".
+ *
+ * <p>async-profiler groups unique stack traces together per-thread in its output.</p>
+ */
+public class ProfileSegment {
+
+ /** The native thread id (does not correspond to Thread#getId) */
+ private final int nativeThreadId;
+ /** The name of the thread */
+ private final String threadName;
+ /** The stack trace for this segment */
+ private final AsyncStackTraceElement[] stackTrace;
+ /** The time spent executing this segment in microseconds */
+ private final long time;
+
+ public ProfileSegment(int nativeThreadId, String threadName, AsyncStackTraceElement[] stackTrace, long time) {
+ this.nativeThreadId = nativeThreadId;
+ this.threadName = threadName;
+ this.stackTrace = stackTrace;
+ this.time = time;
+ }
+
+ public int getNativeThreadId() {
+ return this.nativeThreadId;
+ }
+
+ public String getThreadName() {
+ return this.threadName;
+ }
+
+ public AsyncStackTraceElement[] getStackTrace() {
+ return this.stackTrace;
+ }
+
+ public long getTime() {
+ return this.time;
+ }
+}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/ClassRef.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/ClassRef.java
new file mode 100644
index 0000000..2366fa6
--- /dev/null
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/ClassRef.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2020 Andrei Pangin
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package me.lucko.spark.common.sampler.async.jfr;
+
+public class ClassRef {
+ public final long name;
+
+ public ClassRef(long name) {
+ this.name = name;
+ }
+}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/Dictionary.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/Dictionary.java
new file mode 100644
index 0000000..55f78ba
--- /dev/null
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/Dictionary.java
@@ -0,0 +1,107 @@
+/*
+ * Copyright 2020 Andrei Pangin
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package me.lucko.spark.common.sampler.async.jfr;
+
+/**
+ * Fast and compact long->Object map.
+ */
+public class Dictionary<T> {
+ private static final int INITIAL_CAPACITY = 16;
+
+ private long[] keys;
+ private Object[] values;
+ private int size;
+
+ public Dictionary() {
+ this.keys = new long[INITIAL_CAPACITY];
+ this.values = new Object[INITIAL_CAPACITY];
+ }
+
+ public void put(long key, T value) {
+ if (key == 0) {
+ throw new IllegalArgumentException("Zero key not allowed");
+ }
+
+ if (++size * 2 > keys.length) {
+ resize(keys.length * 2);
+ }
+
+ int mask = keys.length - 1;
+ int i = hashCode(key) & mask;
+ while (keys[i] != 0 && keys[i] != key) {
+ i = (i + 1) & mask;
+ }
+ keys[i] = key;
+ values[i] = value;
+ }
+
+ @SuppressWarnings("unchecked")
+ public T get(long key) {
+ int mask = keys.length - 1;
+ int i = hashCode(key) & mask;
+ while (keys[i] != key && keys[i] != 0) {
+ i = (i + 1) & mask;
+ }
+ return (T) values[i];
+ }
+
+ @SuppressWarnings("unchecked")
+ public void forEach(Visitor<T> visitor) {
+ for (int i = 0; i < keys.length; i++) {
+ if (keys[i] != 0) {
+ visitor.visit(keys[i], (T) values[i]);
+ }
+ }
+ }
+
+ public int preallocate(int count) {
+ int newSize = size + count;
+ if (newSize * 2 > keys.length) {
+ resize(Integer.highestOneBit(newSize * 4 - 1));
+ }
+ return count;
+ }
+
+ private void resize(int newCapacity) {
+ long[] newKeys = new long[newCapacity];
+ Object[] newValues = new Object[newCapacity];
+ int mask = newKeys.length - 1;
+
+ for (int i = 0; i < keys.length; i++) {
+ if (keys[i] != 0) {
+ for (int j = hashCode(keys[i]) & mask; ; j = (j + 1) & mask) {
+ if (newKeys[j] == 0) {
+ newKeys[j] = keys[i];
+ newValues[j] = values[i];
+ break;
+ }
+ }
+ }
+ }
+
+ keys = newKeys;
+ values = newValues;
+ }
+
+ private static int hashCode(long key) {
+ return (int) (key ^ (key >>> 32));
+ }
+
+ public interface Visitor<T> {
+ void visit(long key, T value);
+ }
+}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/Element.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/Element.java
new file mode 100644
index 0000000..9d6b6c7
--- /dev/null
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/Element.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2020 Andrei Pangin
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package me.lucko.spark.common.sampler.async.jfr;
+
+class Element {
+
+ void addChild(Element e) {
+ }
+}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/JfrClass.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/JfrClass.java
new file mode 100644
index 0000000..a171552
--- /dev/null
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/JfrClass.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2020 Andrei Pangin
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package me.lucko.spark.common.sampler.async.jfr;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+class JfrClass extends Element {
+ final int id;
+ final String name;
+ final List<JfrField> fields;
+
+ JfrClass(Map<String, String> attributes) {
+ this.id = Integer.parseInt(attributes.get("id"));
+ this.name = attributes.get("name");
+ this.fields = new ArrayList<>(2);
+ }
+
+ @Override
+ void addChild(Element e) {
+ if (e instanceof JfrField) {
+ fields.add((JfrField) e);
+ }
+ }
+
+ JfrField field(String name) {
+ for (JfrField field : fields) {
+ if (field.name.equals(name)) {
+ return field;
+ }
+ }
+ return null;
+ }
+}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/JfrField.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/JfrField.java
new file mode 100644
index 0000000..7a78f2c
--- /dev/null
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/JfrField.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2020 Andrei Pangin
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package me.lucko.spark.common.sampler.async.jfr;
+
+import java.util.Map;
+
+class JfrField extends Element {
+ final String name;
+ final int type;
+ final boolean constantPool;
+
+ JfrField(Map<String, String> attributes) {
+ this.name = attributes.get("name");
+ this.type = Integer.parseInt(attributes.get("class"));
+ this.constantPool = "true".equals(attributes.get("constantPool"));
+ }
+}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/JfrReader.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/JfrReader.java
new file mode 100644
index 0000000..49a3474
--- /dev/null
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/JfrReader.java
@@ -0,0 +1,389 @@
+/*
+ * Copyright 2020 Andrei Pangin
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package me.lucko.spark.common.sampler.async.jfr;
+
+import me.lucko.spark.common.sampler.async.AsyncStackTraceElement;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Parses JFR output produced by async-profiler.
+ */
+public class JfrReader implements Closeable {
+ private static final int CHUNK_HEADER_SIZE = 68;
+ private static final int CPOOL_OFFSET = 16;
+ private static final int META_OFFSET = 24;
+
+ private final FileChannel ch;
+ private final ByteBuffer buf;
+
+ public final long startNanos;
+ public final long durationNanos;
+ public final long startTicks;
+ public final long ticksPerSec;
+
+ public final Dictionary<JfrClass> types = new Dictionary<>();
+ public final Map<String, JfrClass> typesByName = new HashMap<>();
+ public final Dictionary<String> threads = new Dictionary<>();
+ public final Dictionary<ClassRef> classes = new Dictionary<>();
+ public final Dictionary<byte[]> symbols = new Dictionary<>();
+ public final Dictionary<MethodRef> methods = new Dictionary<>();
+ public final Dictionary<StackTrace> stackTraces = new Dictionary<>();
+ public final Dictionary<AsyncStackTraceElement> stackFrames = new Dictionary<>(); // spark
+ public final Map<Integer, String> frameTypes = new HashMap<>();
+ public final Map<Integer, String> threadStates = new HashMap<>();
+ public final List<Sample> samples = new ArrayList<>();
+
+ public JfrReader(Path path) throws IOException { // spark - Path instead of String
+ this.ch = FileChannel.open(path, StandardOpenOption.READ); // spark - Path instead of String
+ this.buf = ch.map(FileChannel.MapMode.READ_ONLY, 0, ch.size());
+
+ if (buf.getInt(0) != 0x464c5200) {
+ throw new IOException("Not a valid JFR file");
+ }
+
+ int version = buf.getInt(4);
+ if (version < 0x20000 || version > 0x2ffff) {
+ throw new IOException("Unsupported JFR version: " + (version >>> 16) + "." + (version & 0xffff));
+ }
+
+ this.startNanos = buf.getLong(32);
+ this.durationNanos = buf.getLong(40);
+ this.startTicks = buf.getLong(48);
+ this.ticksPerSec = buf.getLong(56);
+
+ readMeta();
+ readConstantPool();
+ readEvents();
+ }
+
+ @Override
+ public void close() throws IOException {
+ ch.close();
+ }
+
+ private void readMeta() {
+ buf.position(buf.getInt(META_OFFSET + 4));
+ getVarint();
+ getVarint();
+ getVarlong();
+ getVarlong();
+ getVarlong();
+
+ String[] strings = new String[getVarint()];
+ for (int i = 0; i < strings.length; i++) {
+ strings[i] = getString();
+ }
+ readElement(strings);
+ }
+
+ private Element readElement(String[] strings) {
+ String name = strings[getVarint()];
+
+ int attributeCount = getVarint();
+ Map<String, String> attributes = new HashMap<>(attributeCount);
+ for (int i = 0; i < attributeCount; i++) {
+ attributes.put(strings[getVarint()], strings[getVarint()]);
+ }
+
+ Element e = createElement(name, attributes);
+ int childCount = getVarint();
+ for (int i = 0; i < childCount; i++) {
+ e.addChild(readElement(strings));
+ }
+ return e;
+ }
+
+ private Element createElement(String name, Map<String, String> attributes) {
+ switch (name) {
+ case "class": {
+ JfrClass type = new JfrClass(attributes);
+ if (!attributes.containsKey("superType")) {
+ types.put(type.id, type);
+ }
+ typesByName.put(type.name, type);
+ return type;
+ }
+ case "field":
+ return new JfrField(attributes);
+ default:
+ return new Element();
+ }
+ }
+
+ private void readConstantPool() {
+ int offset = buf.getInt(CPOOL_OFFSET + 4);
+ while (true) {
+ buf.position(offset);
+ getVarint();
+ getVarint();
+ getVarlong();
+ getVarlong();
+ long delta = getVarlong();
+ getVarint();
+
+ int poolCount = getVarint();
+ for (int i = 0; i < poolCount; i++) {
+ int type = getVarint();
+ readConstants(types.get(type));
+ }
+
+ if (delta == 0) {
+ break;
+ }
+ offset += delta;
+ }
+ }
+
+ private void readConstants(JfrClass type) {
+ switch (type.name) {
+ case "jdk.types.ChunkHeader":
+ buf.position(buf.position() + (CHUNK_HEADER_SIZE + 3));
+ break;
+ case "java.lang.Thread":
+ readThreads(type.field("group") != null);
+ break;
+ case "java.lang.Class":
+ readClasses(type.field("hidden") != null);
+ break;
+ case "jdk.types.Symbol":
+ readSymbols();
+ break;
+ case "jdk.types.Method":
+ readMethods();
+ break;
+ case "jdk.types.StackTrace":
+ readStackTraces();
+ break;
+ case "jdk.types.FrameType":
+ readMap(frameTypes);
+ break;
+ case "jdk.types.ThreadState":
+ readMap(threadStates);
+ break;
+ default:
+ readOtherConstants(type.fields);
+ }
+ }
+
+ private void readThreads(boolean hasGroup) {
+ int count = threads.preallocate(getVarint());
+ for (int i = 0; i < count; i++) {
+ long id = getVarlong();
+ String osName = getString();
+ int osThreadId = getVarint();
+ String javaName = getString();
+ long javaThreadId = getVarlong();
+ if (hasGroup) getVarlong();
+ threads.put(id, javaName != null ? javaName : osName);
+ }
+ }
+
+ private void readClasses(boolean hasHidden) {
+ int count = classes.preallocate(getVarint());
+ for (int i = 0; i < count; i++) {
+ long id = getVarlong();
+ long loader = getVarlong();
+ long name = getVarlong();
+ long pkg = getVarlong();
+ int modifiers = getVarint();
+ if (hasHidden) getVarint();
+ classes.put(id, new ClassRef(name));
+ }
+ }
+
+ private void readMethods() {
+ int count = methods.preallocate(getVarint());
+ for (int i = 0; i < count; i++) {
+ long id = getVarlong();
+ long cls = getVarlong();
+ long name = getVarlong();
+ long sig = getVarlong();
+ int modifiers = getVarint();
+ int hidden = getVarint();
+ methods.put(id, new MethodRef(cls, name, sig));
+ }
+ stackFrames.preallocate(count); // spark
+ }
+
+ private void readStackTraces() {
+ int count = stackTraces.preallocate(getVarint());
+ for (int i = 0; i < count; i++) {
+ long id = getVarlong();
+ int truncated = getVarint();
+ StackTrace stackTrace = readStackTrace();
+ stackTraces.put(id, stackTrace);
+ }
+ }
+
+ private StackTrace readStackTrace() {
+ int depth = getVarint();
+ long[] methods = new long[depth];
+ byte[] types = new byte[depth];
+ for (int i = 0; i < depth; i++) {
+ methods[i] = getVarlong();
+ int line = getVarint();
+ int bci = getVarint();
+ types[i] = buf.get();
+ }
+ return new StackTrace(methods, types);
+ }
+
+ private void readSymbols() {
+ int count = symbols.preallocate(getVarint());
+ for (int i = 0; i < count; i++) {
+ long id = getVarlong();
+ if (buf.get() != 3) {
+ throw new IllegalArgumentException("Invalid symbol encoding");
+ }
+ symbols.put(id, getBytes());
+ }
+ }
+
+ private void readMap(Map<Integer, String> map) {
+ int count = getVarint();
+ for (int i = 0; i < count; i++) {
+ map.put(getVarint(), getString());
+ }
+ }
+
+ private void readOtherConstants(List<JfrField> fields) {
+ int stringType = getTypeId("java.lang.String");
+
+ boolean[] numeric = new boolean[fields.size()];
+ for (int i = 0; i < numeric.length; i++) {
+ JfrField f = fields.get(i);
+ numeric[i] = f.constantPool || f.type != stringType;
+ }
+
+ int count = getVarint();
+ for (int i = 0; i < count; i++) {
+ getVarlong();
+ readFields(numeric);
+ }
+ }
+
+ private void readFields(boolean[] numeric) {
+ for (boolean n : numeric) {
+ if (n) {
+ getVarlong();
+ } else {
+ getString();
+ }
+ }
+ }
+
+ private void readEvents() {
+ int executionSample = getTypeId("jdk.ExecutionSample");
+ int nativeMethodSample = getTypeId("jdk.NativeMethodSample");
+
+ buf.position(CHUNK_HEADER_SIZE);
+ while (buf.hasRemaining()) {
+ int position = buf.position();
+ int size = getVarint();
+ int type = getVarint();
+ if (type == executionSample || type == nativeMethodSample) {
+ readExecutionSample();
+ } else {
+ buf.position(position + size);
+ }
+ }
+
+ Collections.sort(samples);
+ }
+
+ private void readExecutionSample() {
+ long time = getVarlong();
+ int tid = getVarint();
+ int stackTraceId = getVarint();
+ int threadState = getVarint();
+ samples.add(new Sample(time, tid, stackTraceId, threadState));
+
+ StackTrace stackTrace = stackTraces.get(stackTraceId);
+ if (stackTrace != null) {
+ stackTrace.samples++;
+ }
+ }
+
+ private int getTypeId(String typeName) {
+ JfrClass type = typesByName.get(typeName);
+ return type != null ? type.id : -1;
+ }
+
+ private int getVarint() {
+ int result = 0;
+ for (int shift = 0; ; shift += 7) {
+ byte b = buf.get();
+ result |= (b & 0x7f) << shift;
+ if (b >= 0) {
+ return result;
+ }
+ }
+ }
+
+ private long getVarlong() {
+ long result = 0;
+ for (int shift = 0; shift < 56; shift += 7) {
+ byte b = buf.get();
+ result |= (b & 0x7fL) << shift;
+ if (b >= 0) {
+ return result;
+ }
+ }
+ return result | (buf.get() & 0xffL) << 56;
+ }
+
+ private String getString() {
+ switch (buf.get()) {
+ case 0:
+ return null;
+ case 1:
+ return "";
+ case 3:
+ return new String(getBytes(), StandardCharsets.UTF_8);
+ case 4: {
+ char[] chars = new char[getVarint()];
+ for (int i = 0; i < chars.length; i++) {
+ chars[i] = (char) getVarint();
+ }
+ return new String(chars);
+ }
+ case 5:
+ return new String(getBytes(), StandardCharsets.ISO_8859_1);
+ default:
+ throw new IllegalArgumentException("Invalid string encoding");
+ }
+ }
+
+ private byte[] getBytes() {
+ byte[] bytes = new byte[getVarint()];
+ buf.get(bytes);
+ return bytes;
+ }
+}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/MethodRef.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/MethodRef.java
new file mode 100644
index 0000000..2f9071e
--- /dev/null
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/MethodRef.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2020 Andrei Pangin
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package me.lucko.spark.common.sampler.async.jfr;
+
+public class MethodRef {
+ public final long cls;
+ public final long name;
+ public final long sig;
+
+ public MethodRef(long cls, long name, long sig) {
+ this.cls = cls;
+ this.name = name;
+ this.sig = sig;
+ }
+}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/Sample.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/Sample.java
new file mode 100644
index 0000000..095e261
--- /dev/null
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/Sample.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2020 Andrei Pangin
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package me.lucko.spark.common.sampler.async.jfr;
+
+public class Sample implements Comparable<Sample> {
+ public final long time;
+ public final int tid;
+ public final int stackTraceId;
+ public final int threadState;
+
+ public Sample(long time, int tid, int stackTraceId, int threadState) {
+ this.time = time;
+ this.tid = tid;
+ this.stackTraceId = stackTraceId;
+ this.threadState = threadState;
+ }
+
+ @Override
+ public int compareTo(Sample o) {
+ return Long.compare(time, o.time);
+ }
+}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/StackTrace.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/StackTrace.java
new file mode 100644
index 0000000..4c12c5e
--- /dev/null
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/StackTrace.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2020 Andrei Pangin
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package me.lucko.spark.common.sampler.async.jfr;
+
+public class StackTrace {
+ public final long[] methods;
+ public final byte[] types;
+ public long samples;
+
+ public StackTrace(long[] methods, byte[] types) {
+ this.methods = methods;
+ this.types = types;
+ }
+}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaDataAggregator.java
new file mode 100644
index 0000000..a81a5c1
--- /dev/null
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaDataAggregator.java
@@ -0,0 +1,116 @@
+/*
+ * This file is part of spark.
+ *
+ * Copyright (c) lucko (Luck) <luck@lucko.me>
+ * Copyright (c) contributors
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+package me.lucko.spark.common.sampler.java;
+
+import me.lucko.spark.common.sampler.ThreadGrouper;
+import me.lucko.spark.common.sampler.aggregator.AbstractDataAggregator;
+import me.lucko.spark.common.sampler.aggregator.DataAggregator;
+import me.lucko.spark.common.sampler.node.ThreadNode;
+
+import java.lang.management.ThreadInfo;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Abstract {@link DataAggregator} for the {@link JavaSampler}.
+ */
+public abstract class JavaDataAggregator extends AbstractDataAggregator {
+
+ /** The worker pool for inserting stack nodes */
+ protected final ExecutorService workerPool;
+
+ /** The interval to wait between sampling, in microseconds */
+ protected final int interval;
+
+ /** If sleeping threads should be ignored */
+ private final boolean ignoreSleeping;
+
+ /** If threads executing native code should be ignored */
+ private final boolean ignoreNative;
+
+ public JavaDataAggregator(ExecutorService workerPool, ThreadGrouper threadGrouper, int interval, boolean ignoreSleeping, boolean ignoreNative) {
+ super(threadGrouper);
+ this.workerPool = workerPool;
+ this.interval = interval;
+ this.ignoreSleeping = ignoreSleeping;
+ this.ignoreNative = ignoreNative;
+ }
+
+ /**
+ * Inserts sampling data into this aggregator
+ *
+ * @param threadInfo the thread info
+ */
+ public abstract void insertData(ThreadInfo threadInfo);
+
+ protected void writeData(ThreadInfo threadInfo) {
+ if (this.ignoreSleeping && isSleeping(threadInfo)) {
+ return;
+ }
+ if (this.ignoreNative && threadInfo.isInNative()) {
+ return;
+ }
+
+ try {
+ ThreadNode node = getNode(this.threadGrouper.getGroup(threadInfo.getThreadId(), threadInfo.getThreadName()));
+ node.log(threadInfo.getStackTrace(), this.interval);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public Map<String, ThreadNode> getData() {
+ // wait for all pending data to be inserted
+ this.workerPool.shutdown();
+ try {
+ this.workerPool.awaitTermination(15, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ return super.getData();
+ }
+
+ private static boolean isSleeping(ThreadInfo thread) {
+ if (thread.getThreadState() == Thread.State.WAITING || thread.getThreadState() == Thread.State.TIMED_WAITING) {
+ return true;
+ }
+
+ StackTraceElement[] stackTrace = thread.getStackTrace();
+ if (stackTrace.length == 0) {
+ return false;
+ }
+
+ StackTraceElement call = stackTrace[0];
+ String clazz = call.getClassName();
+ String method = call.getMethodName();
+
+ // java.lang.Thread.yield()
+ // jdk.internal.misc.Unsafe.park()
+ // sun.misc.Unsafe.park()
+ return (clazz.equals("java.lang.Thread") && method.equals("yield")) ||
+ (clazz.equals("jdk.internal.misc.Unsafe") && method.equals("park")) ||
+ (clazz.equals("sun.misc.Unsafe") && method.equals("park"));
+ }
+
+}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java
new file mode 100644
index 0000000..568609e
--- /dev/null
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java
@@ -0,0 +1,189 @@
+/*
+ * This file is part of spark.
+ *
+ * Copyright (C) Albert Pham <http://www.sk89q.com>
+ * Copyright (c) lucko (Luck) <luck@lucko.me>
+ * Copyright (c) contributors
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+package me.lucko.spark.common.sampler.java;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import me.lucko.spark.common.command.sender.CommandSender;
+import me.lucko.spark.common.platform.PlatformInfo;
+import me.lucko.spark.common.sampler.Sampler;
+import me.lucko.spark.common.sampler.ThreadDumper;
+import me.lucko.spark.common.sampler.ThreadGrouper;
+import me.lucko.spark.common.sampler.node.MergeMode;
+import me.lucko.spark.common.sampler.node.ThreadNode;
+import me.lucko.spark.common.sampler.tick.TickHook;
+import me.lucko.spark.proto.SparkProtos.SamplerData;
+import me.lucko.spark.proto.SparkProtos.SamplerMetadata;
+
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadInfo;
+import java.lang.management.ThreadMXBean;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * A sampler implementation using Java (WarmRoast).
+ */
+public class JavaSampler implements Sampler, Runnable {
+ private static final AtomicInteger THREAD_ID = new AtomicInteger(0);
+
+ /** The worker pool for inserting stack nodes */
+ private final ScheduledExecutorService workerPool = Executors.newScheduledThreadPool(
+ 6, new ThreadFactoryBuilder().setNameFormat("spark-worker-" + THREAD_ID.getAndIncrement() + "-%d").build()
+ );
+
+ /** The main sampling task */
+ private ScheduledFuture<?> task;
+
+ /** The thread management interface for the current JVM */
+ private final ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
+ /** The instance used to generate thread information for use in sampling */
+ private final ThreadDumper threadDumper;
+ /** Responsible for aggregating and then outputting collected sampling data */
+ private final JavaDataAggregator dataAggregator;
+
+ /** A future to encapsulate the completion of this sampler instance */
+ private final CompletableFuture<JavaSampler> future = new CompletableFuture<>();
+ /** The interval to wait between sampling, in microseconds */
+ private final int interval;
+ /** The time when sampling first began */
+ private long startTime = -1;
+ /** The unix timestamp (in millis) when this sampler should automatically complete. */
+ private final long endTime; // -1 for nothing
+
+ public JavaSampler(int interval, ThreadDumper threadDumper, ThreadGrouper threadGrouper, long endTime, boolean ignoreSleeping, boolean ignoreNative) {
+ this.threadDumper = threadDumper;
+ this.dataAggregator = new SimpleDataAggregator(this.workerPool, threadGrouper, interval, ignoreSleeping, ignoreNative);
+ this.interval = interval;
+ this.endTime = endTime;
+ }
+
+ public JavaSampler(int interval, ThreadDumper threadDumper, ThreadGrouper threadGrouper, long endTime, boolean ignoreSleeping, boolean ignoreNative, TickHook tickHook, int tickLengthThreshold) {
+ this.threadDumper = threadDumper;
+ this.dataAggregator = new TickedDataAggregator(this.workerPool, threadGrouper, interval, ignoreSleeping, ignoreNative, tickHook, tickLengthThreshold);
+ this.interval = interval;
+ this.endTime = endTime;
+ }
+
+ @Override
+ public void start() {
+ this.startTime = System.currentTimeMillis();
+ this.task = this.workerPool.scheduleAtFixedRate(this, 0, this.interval, TimeUnit.MICROSECONDS);
+ }
+
+ @Override
+ public long getStartTime() {
+ if (this.startTime == -1) {
+ throw new IllegalStateException("Not yet started");
+ }
+ return this.startTime;
+ }
+
+ @Override
+ public long getEndTime() {
+ return this.endTime;
+ }
+
+ @Override
+ public CompletableFuture<JavaSampler> getFuture() {
+ return this.future;
+ }
+
+ @Override
+ public void stop() {
+ this.task.cancel(false);
+ }
+
+ @Override
+ public void run() {
+ // this is effectively synchronized, the worker pool will not allow this task
+ // to concurrently execute.
+ try {
+ if (this.endTime != -1 && this.endTime <= System.currentTimeMillis()) {
+ this.future.complete(this);
+ stop();
+ return;
+ }
+
+ ThreadInfo[] threadDumps = this.threadDumper.dumpThreads(this.threadBean);
+ this.workerPool.execute(new InsertDataTask(this.dataAggregator, threadDumps));
+ } catch (Throwable t) {
+ this.future.completeExceptionally(t);
+ stop();
+ }
+ }
+
+ private static final class InsertDataTask implements Runnable {
+ private final JavaDataAggregator dataAggregator;
+ private final ThreadInfo[] threadDumps;
+
+ InsertDataTask(JavaDataAggregator dataAggregator, ThreadInfo[] threadDumps) {
+ this.dataAggregator = dataAggregator;
+ this.threadDumps = threadDumps;
+ }
+
+ @Override
+ public void run() {
+ for (ThreadInfo threadInfo : this.threadDumps) {
+ if (threadInfo.getThreadName() == null || threadInfo.getStackTrace() == null) {
+ continue;
+ }
+ this.dataAggregator.insertData(threadInfo);
+ }
+ }
+ }
+
+ @Override
+ public SamplerData toProto(PlatformInfo platformInfo, CommandSender creator, Comparator<? super Map.Entry<String, ThreadNode>> outputOrder, String comment, MergeMode mergeMode) {
+ final SamplerMetadata.Builder metadata = SamplerMetadata.newBuilder()
+ .setPlatform(platformInfo.toData().toProto())
+ .setUser(creator.toData().toProto())
+ .setStartTime(this.startTime)
+ .setInterval(this.interval)
+ .setThreadDumper(this.threadDumper.getMetadata())
+ .setDataAggregator(this.dataAggregator.getMetadata());
+
+ if (comment != null) {
+ metadata.setComment(comment);
+ }
+
+ SamplerData.Builder proto = SamplerData.newBuilder();
+ proto.setMetadata(metadata.build());
+
+ List<Map.Entry<String, ThreadNode>> data = new ArrayList<>(this.dataAggregator.getData().entrySet());
+ data.sort(outputOrder);
+
+ for (Map.Entry<String, ThreadNode> entry : data) {
+ proto.addThreads(entry.getValue().toProto(mergeMode));
+ }
+
+ return proto.build();
+ }
+
+}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/SimpleDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/SimpleDataAggregator.java
index 8463f98..e7113a1 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/SimpleDataAggregator.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/SimpleDataAggregator.java
@@ -18,21 +18,19 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-package me.lucko.spark.common.sampler.aggregator;
+package me.lucko.spark.common.sampler.java;
import me.lucko.spark.common.sampler.ThreadGrouper;
-import me.lucko.spark.common.sampler.node.ThreadNode;
+import me.lucko.spark.common.sampler.aggregator.DataAggregator;
import me.lucko.spark.proto.SparkProtos.SamplerMetadata;
import java.lang.management.ThreadInfo;
-import java.util.Map;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
/**
* Basic implementation of {@link DataAggregator}.
*/
-public class SimpleDataAggregator extends AbstractDataAggregator {
+public class SimpleDataAggregator extends JavaDataAggregator {
public SimpleDataAggregator(ExecutorService workerPool, ThreadGrouper threadGrouper, int interval, boolean ignoreSleeping, boolean ignoreNative) {
super(workerPool, threadGrouper, interval, ignoreSleeping, ignoreNative);
}
@@ -50,16 +48,4 @@ public class SimpleDataAggregator extends AbstractDataAggregator {
writeData(threadInfo);
}
- @Override
- public Map<String, ThreadNode> getData() {
- // wait for all pending data to be inserted
- this.workerPool.shutdown();
- try {
- this.workerPool.awaitTermination(15, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
-
- return this.threadData;
- }
}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/TickedDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/TickedDataAggregator.java
index 7ad6781..1a0bcdd 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/TickedDataAggregator.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/TickedDataAggregator.java
@@ -18,9 +18,10 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-package me.lucko.spark.common.sampler.aggregator;
+package me.lucko.spark.common.sampler.java;
import me.lucko.spark.common.sampler.ThreadGrouper;
+import me.lucko.spark.common.sampler.aggregator.DataAggregator;
import me.lucko.spark.common.sampler.node.ThreadNode;
import me.lucko.spark.common.sampler.tick.TickHook;
import me.lucko.spark.proto.SparkProtos.SamplerMetadata;
@@ -36,7 +37,7 @@ import java.util.concurrent.TimeUnit;
* Implementation of {@link DataAggregator} which supports only including sampling data from "ticks"
* which exceed a certain threshold in duration.
*/
-public class TickedDataAggregator extends AbstractDataAggregator {
+public class TickedDataAggregator extends JavaDataAggregator {
/** Used to monitor the current "tick" of the server */
private final TickHook tickHook;
@@ -107,15 +108,7 @@ public class TickedDataAggregator extends AbstractDataAggregator {
pushCurrentTick();
}
- // wait for all pending data to be inserted
- this.workerPool.shutdown();
- try {
- this.workerPool.awaitTermination(15, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
-
- return this.threadData;
+ return super.getData();
}
private final class TickList implements Runnable {
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/node/AbstractNode.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/node/AbstractNode.java
index fe2fd62..2ef06d3 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/node/AbstractNode.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/node/AbstractNode.java
@@ -21,6 +21,8 @@
package me.lucko.spark.common.sampler.node;
+import me.lucko.spark.common.sampler.async.AsyncStackTraceElement;
+
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -110,6 +112,34 @@ public abstract class AbstractNode {
child.log(elements, offset + 1, time);
}
+ public void log(AsyncStackTraceElement[] elements, long time) {
+ log(elements, 0, time);
+ }
+
+ private void log(AsyncStackTraceElement[] elements, int offset, long time) {
+ this.totalTime.add(time);
+
+ if (offset >= MAX_STACK_DEPTH) {
+ return;
+ }
+
+ if (elements.length - offset == 0) {
+ return;
+ }
+
+ // the first element in the array is the top of the call stack, and the last is the root
+ // offset starts at 0.
+
+ // pointer is determined by subtracting the offset from the index of the last element
+ int pointer = (elements.length - 1) - offset;
+ AsyncStackTraceElement element = elements[pointer];
+
+ // resolve a child element within the structure for the element at pointer
+ AbstractNode child = resolveChild(new StackTraceNode.Description(element.getClassName(), element.getMethodName(), element.getMethodDescription()));
+ // call the log method on the found child, with an incremented offset.
+ child.log(elements, offset + 1, time);
+ }
+
protected List<StackTraceNode> exportChildren(MergeMode mergeMode) {
if (this.children.isEmpty()) {
return Collections.emptyList();
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/node/StackTraceNode.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/node/StackTraceNode.java
index a73620e..bd731c1 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/node/StackTraceNode.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/node/StackTraceNode.java
@@ -24,6 +24,7 @@ package me.lucko.spark.common.sampler.node;
import me.lucko.spark.common.util.MethodDisambiguator;
import me.lucko.spark.proto.SparkProtos;
+import java.util.Comparator;
import java.util.Objects;
/**
@@ -73,9 +74,13 @@ public final class StackTraceNode extends AbstractNode implements Comparable<Sta
proto.setParentLineNumber(this.description.parentLineNumber);
}
- mergeMode.getMethodDisambiguator().disambiguate(this)
- .map(MethodDisambiguator.MethodDescription::getDesc)
- .ifPresent(proto::setMethodDesc);
+ if (this.description.methodDescription != null) {
+ proto.setMethodDesc(this.description.methodDescription);
+ } else {
+ mergeMode.getMethodDisambiguator().disambiguate(this)
+ .map(MethodDisambiguator.MethodDescription::getDesc)
+ .ifPresent(proto::setMethodDesc);
+ }
for (StackTraceNode child : exportChildren(mergeMode)) {
proto.addChildren(child.toProto(mergeMode));
@@ -104,19 +109,48 @@ public final class StackTraceNode extends AbstractNode implements Comparable<Sta
public static final class Description implements Comparable<Description> {
private final String className;
private final String methodName;
+
+ // async-profiler
+ private final String methodDescription;
+
+ // Java
private final int lineNumber;
private final int parentLineNumber;
private final int hash;
+ // Constructor used by the Java sampler
public Description(String className, String methodName, int lineNumber, int parentLineNumber) {
this.className = className;
this.methodName = methodName;
+ this.methodDescription = null;
this.lineNumber = lineNumber;
this.parentLineNumber = parentLineNumber;
this.hash = Objects.hash(this.className, this.methodName, this.lineNumber, this.parentLineNumber);
}
+ // Constructor used by the async-profiler sampler
+ public Description(String className, String methodName, String methodDescription) {
+ this.className = className;
+ this.methodName = methodName;
+ this.methodDescription = methodDescription;
+ this.lineNumber = StackTraceNode.NULL_LINE_NUMBER;
+ this.parentLineNumber = StackTraceNode.NULL_LINE_NUMBER;
+ this.hash = Objects.hash(this.className, this.methodName, this.methodDescription);
+ }
+
+ private static <T extends Comparable<T>> int nullCompare(T a, T b) {
+ if (a == null && b == null) {
+ return 0;
+ } else if (a == null) {
+ return -1;
+ } else if (b == null) {
+ return 1;
+ } else {
+ return a.compareTo(b);
+ }
+ }
+
@Override
public int compareTo(Description that) {
if (this == that) {
@@ -133,6 +167,18 @@ public final class StackTraceNode extends AbstractNode implements Comparable<Sta
return i;
}
+ i = nullCompare(this.methodDescription, that.methodDescription);
+ if (i != 0) {
+ return i;
+ }
+
+ if (this.methodDescription != null && that.methodDescription != null) {
+ i = this.methodDescription.compareTo(that.methodDescription);
+ if (i != 0) {
+ return i;
+ }
+ }
+
i = Integer.compare(this.lineNumber, that.lineNumber);
if (i != 0) {
return i;
@@ -146,10 +192,12 @@ public final class StackTraceNode extends AbstractNode implements Comparable<Sta
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Description description = (Description) o;
- return this.lineNumber == description.lineNumber &&
+ return this.hash == description.hash &&
+ this.lineNumber == description.lineNumber &&
this.parentLineNumber == description.parentLineNumber &&
this.className.equals(description.className) &&
- this.methodName.equals(description.methodName);
+ this.methodName.equals(description.methodName) &&
+ Objects.equals(this.methodDescription, description.methodDescription);
}
@Override