From 9ee26a12f12a89a6df14eb86afce5d88b5a2cf25 Mon Sep 17 00:00:00 2001 From: lucko Date: Wed, 10 Mar 2021 15:44:14 +0000 Subject: async-profiler support (#102) --- .../java/me/lucko/spark/common/SparkPlatform.java | 2 +- .../me/lucko/spark/common/command/Arguments.java | 23 +- .../common/command/modules/SamplerModule.java | 23 +- .../me/lucko/spark/common/sampler/Sampler.java | 179 ++-------- .../lucko/spark/common/sampler/SamplerBuilder.java | 17 +- .../lucko/spark/common/sampler/ThreadDumper.java | 25 +- .../sampler/aggregator/AbstractDataAggregator.java | 63 +--- .../common/sampler/aggregator/DataAggregator.java | 7 - .../sampler/aggregator/SimpleDataAggregator.java | 65 ---- .../sampler/aggregator/TickedDataAggregator.java | 143 -------- .../common/sampler/async/AsyncDataAggregator.java | 53 +++ .../common/sampler/async/AsyncProfilerAccess.java | 117 +++++++ .../spark/common/sampler/async/AsyncSampler.java | 279 +++++++++++++++ .../sampler/async/AsyncStackTraceElement.java | 52 +++ .../spark/common/sampler/async/ProfileSegment.java | 61 ++++ .../spark/common/sampler/async/jfr/ClassRef.java | 25 ++ .../spark/common/sampler/async/jfr/Dictionary.java | 107 ++++++ .../spark/common/sampler/async/jfr/Element.java | 23 ++ .../spark/common/sampler/async/jfr/JfrClass.java | 49 +++ .../spark/common/sampler/async/jfr/JfrField.java | 31 ++ .../spark/common/sampler/async/jfr/JfrReader.java | 389 +++++++++++++++++++++ .../spark/common/sampler/async/jfr/MethodRef.java | 29 ++ .../spark/common/sampler/async/jfr/Sample.java | 36 ++ .../spark/common/sampler/async/jfr/StackTrace.java | 28 ++ .../common/sampler/java/JavaDataAggregator.java | 116 ++++++ .../spark/common/sampler/java/JavaSampler.java | 189 ++++++++++ .../common/sampler/java/SimpleDataAggregator.java | 51 +++ .../common/sampler/java/TickedDataAggregator.java | 136 +++++++ .../spark/common/sampler/node/AbstractNode.java | 30 ++ .../spark/common/sampler/node/StackTraceNode.java | 58 ++- .../src/main/resources/libasyncProfiler.so | Bin 0 -> 309494 bytes 31 files changed, 1971 insertions(+), 435 deletions(-) delete mode 100644 spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/SimpleDataAggregator.java delete mode 100644 spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/TickedDataAggregator.java create mode 100644 spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncDataAggregator.java create mode 100644 spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerAccess.java create mode 100644 spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java create mode 100644 spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncStackTraceElement.java create mode 100644 spark-common/src/main/java/me/lucko/spark/common/sampler/async/ProfileSegment.java create mode 100644 spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/ClassRef.java create mode 100644 spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/Dictionary.java create mode 100644 spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/Element.java create mode 100644 spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/JfrClass.java create mode 100644 spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/JfrField.java create mode 100644 spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/JfrReader.java create mode 100644 spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/MethodRef.java create mode 100644 spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/Sample.java create mode 100644 spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/StackTrace.java create mode 100644 spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaDataAggregator.java create mode 100644 spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java create mode 100644 spark-common/src/main/java/me/lucko/spark/common/sampler/java/SimpleDataAggregator.java create mode 100644 spark-common/src/main/java/me/lucko/spark/common/sampler/java/TickedDataAggregator.java create mode 100755 spark-common/src/main/resources/libasyncProfiler.so (limited to 'spark-common/src') diff --git a/spark-common/src/main/java/me/lucko/spark/common/SparkPlatform.java b/spark-common/src/main/java/me/lucko/spark/common/SparkPlatform.java index cf3b0ee..a6bf332 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/SparkPlatform.java +++ b/spark-common/src/main/java/me/lucko/spark/common/SparkPlatform.java @@ -211,7 +211,7 @@ public class SparkPlatform { if (command.aliases().contains(alias)) { try { command.executor().execute(this, sender, resp, new Arguments(rawArgs)); - } catch (IllegalArgumentException e) { + } catch (Arguments.ParseException e) { resp.replyPrefixed(text(e.getMessage(), RED)); } return; diff --git a/spark-common/src/main/java/me/lucko/spark/common/command/Arguments.java b/spark-common/src/main/java/me/lucko/spark/common/command/Arguments.java index 2b202af..3cd0365 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/command/Arguments.java +++ b/spark-common/src/main/java/me/lucko/spark/common/command/Arguments.java @@ -51,7 +51,7 @@ public class Arguments { if (flag == null || matches) { if (!matches) { - throw new IllegalArgumentException("Expected flag at position " + i + " but got '" + arg + "' instead!"); + throw new ParseException("Expected flag at position " + i + " but got '" + arg + "' instead!"); } // store existing value, if present @@ -83,7 +83,7 @@ public class Arguments { try { return Math.abs(Integer.parseInt(it.next())); } catch (NumberFormatException e) { - throw new IllegalArgumentException("Invalid input for '" + key + "' argument. Please specify a number!"); + throw new ParseException("Invalid input for '" + key + "' argument. Please specify a number!"); } } return -1; // undefined @@ -95,7 +95,7 @@ public class Arguments { try { return Math.abs(Double.parseDouble(it.next())); } catch (NumberFormatException e) { - throw new IllegalArgumentException("Invalid input for '" + key + "' argument. Please specify a number!"); + throw new ParseException("Invalid input for '" + key + "' argument. Please specify a number!"); } } return -1; // undefined @@ -108,4 +108,21 @@ public class Arguments { public boolean boolFlag(String key) { return this.parsedArgs.containsKey(key); } + + public static final class ParseException extends IllegalArgumentException { + public ParseException() { + } + + public ParseException(String s) { + super(s); + } + + public ParseException(String message, Throwable cause) { + super(message, cause); + } + + public ParseException(Throwable cause) { + super(cause); + } + } } diff --git a/spark-common/src/main/java/me/lucko/spark/common/command/modules/SamplerModule.java b/spark-common/src/main/java/me/lucko/spark/common/command/modules/SamplerModule.java index eb77b24..cce3169 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/command/modules/SamplerModule.java +++ b/spark-common/src/main/java/me/lucko/spark/common/command/modules/SamplerModule.java @@ -33,6 +33,7 @@ import me.lucko.spark.common.sampler.SamplerBuilder; import me.lucko.spark.common.sampler.ThreadDumper; import me.lucko.spark.common.sampler.ThreadGrouper; import me.lucko.spark.common.sampler.ThreadNodeOrder; +import me.lucko.spark.common.sampler.async.AsyncSampler; import me.lucko.spark.common.sampler.node.MergeMode; import me.lucko.spark.common.sampler.tick.TickHook; import me.lucko.spark.common.util.MethodDisambiguator; @@ -55,13 +56,13 @@ import static net.kyori.adventure.text.format.NamedTextColor.*; public class SamplerModule implements CommandModule { private static final MediaType SPARK_SAMPLER_MEDIA_TYPE = MediaType.parse("application/x-spark-sampler"); - /** The WarmRoast instance currently running, if any */ + /** The sampler instance currently running, if any */ private Sampler activeSampler = null; @Override public void close() { if (this.activeSampler != null) { - this.activeSampler.cancel(); + this.activeSampler.stop(); this.activeSampler = null; } } @@ -83,6 +84,7 @@ public class SamplerModule implements CommandModule { .argumentUsage("only-ticks-over", "tick length millis") .argumentUsage("ignore-sleeping", null) .argumentUsage("ignore-native", null) + .argumentUsage("force-java-sampler", null) .argumentUsage("order-by-time", null) .argumentUsage("separate-parent-calls", null) .executor((platform, sender, resp, arguments) -> { @@ -118,7 +120,7 @@ public class SamplerModule implements CommandModule { if (this.activeSampler == null) { resp.replyPrefixed(text("There isn't an active sampling task running.")); } else { - this.activeSampler.cancel(); + this.activeSampler.stop(); resp.broadcastPrefixed(text("The active sampling operation has been stopped! Uploading results...")); ThreadNodeOrder threadOrder = arguments.boolFlag("order-by-time") ? ThreadNodeOrder.BY_TIME : ThreadNodeOrder.BY_NAME; String comment = Iterables.getFirst(arguments.stringFlag("comment"), null); @@ -149,6 +151,7 @@ public class SamplerModule implements CommandModule { boolean ignoreSleeping = arguments.boolFlag("ignore-sleeping"); boolean ignoreNative = arguments.boolFlag("ignore-native"); + boolean forceJavaSampler = arguments.boolFlag("force-java-sampler"); Set threads = arguments.stringFlag("thread"); ThreadDumper threadDumper; @@ -201,19 +204,25 @@ public class SamplerModule implements CommandModule { builder.samplingInterval(intervalMillis); builder.ignoreSleeping(ignoreSleeping); builder.ignoreNative(ignoreNative); + builder.forceJavaSampler(forceJavaSampler); if (ticksOver != -1) { builder.ticksOver(ticksOver, tickHook); } Sampler sampler = this.activeSampler = builder.start(); - resp.broadcastPrefixed(text("Profiler now active!", GOLD)); + resp.broadcastPrefixed(text() + .append(text("Profiler now active!", GOLD)) + .append(space()) + .append(text("(" + (sampler instanceof AsyncSampler ? "async" : "built-in java") + ")", DARK_GRAY)) + .build() + ); if (timeoutSeconds == -1) { resp.broadcastPrefixed(text("Use '/" + platform.getPlugin().getCommandName() + " profiler --stop' to stop profiling and upload the results.")); } else { resp.broadcastPrefixed(text("The results will be automatically returned after the profiler has been running for " + timeoutSeconds + " seconds.")); } - CompletableFuture future = this.activeSampler.getFuture(); + CompletableFuture future = this.activeSampler.getFuture(); // send message if profiling fails future.whenCompleteAsync((s, throwable) -> { @@ -253,8 +262,8 @@ public class SamplerModule implements CommandModule { List opts = new ArrayList<>(Arrays.asList("--info", "--stop", "--cancel", "--timeout", "--regex", "--combine-all", "--not-combined", "--interval", - "--only-ticks-over", "--ignore-sleeping", "--ignore-native", "--order-by-time", - "--separate-parent-calls", "--comment")); + "--only-ticks-over", "--ignore-sleeping", "--ignore-native", "--force-java-sampler", + "--order-by-time", "--separate-parent-calls", "--comment")); opts.removeAll(arguments); opts.add("--thread"); // allowed multiple times diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java index e772cb3..5088ed7 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java @@ -1,7 +1,6 @@ /* * This file is part of spark. * - * Copyright (C) Albert Pham * Copyright (c) lucko (Luck) * Copyright (c) contributors * @@ -21,174 +20,66 @@ package me.lucko.spark.common.sampler; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import me.lucko.spark.common.command.sender.CommandSender; import me.lucko.spark.common.platform.PlatformInfo; -import me.lucko.spark.common.sampler.aggregator.DataAggregator; -import me.lucko.spark.common.sampler.aggregator.SimpleDataAggregator; -import me.lucko.spark.common.sampler.aggregator.TickedDataAggregator; import me.lucko.spark.common.sampler.node.MergeMode; import me.lucko.spark.common.sampler.node.ThreadNode; -import me.lucko.spark.common.sampler.tick.TickHook; import me.lucko.spark.proto.SparkProtos.SamplerData; -import me.lucko.spark.proto.SparkProtos.SamplerMetadata; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.OutputStream; -import java.lang.management.ManagementFactory; -import java.lang.management.ThreadInfo; -import java.lang.management.ThreadMXBean; -import java.util.ArrayList; import java.util.Comparator; -import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import java.util.zip.GZIPOutputStream; /** - * Main sampler class. + * Abstract superinterface for all sampler implementations. */ -public class Sampler implements Runnable { - private static final AtomicInteger THREAD_ID = new AtomicInteger(0); - - /** The worker pool for inserting stack nodes */ - private final ScheduledExecutorService workerPool = Executors.newScheduledThreadPool( - 6, new ThreadFactoryBuilder().setNameFormat("spark-worker-" + THREAD_ID.getAndIncrement() + "-%d").build() - ); - - /** The main sampling task */ - private ScheduledFuture task; - - /** The thread management interface for the current JVM */ - private final ThreadMXBean threadBean = ManagementFactory.getThreadMXBean(); - /** The instance used to generate thread information for use in sampling */ - private final ThreadDumper threadDumper; - /** Responsible for aggregating and then outputting collected sampling data */ - private final DataAggregator dataAggregator; - - /** A future to encapsulation the completion of this sampler instance */ - private final CompletableFuture future = new CompletableFuture<>(); - - /** The interval to wait between sampling, in microseconds */ - private final int interval; - /** The time when sampling first began */ - private long startTime = -1; - /** The unix timestamp (in millis) when this sampler should automatically complete.*/ - private final long endTime; // -1 for nothing - - public Sampler(int interval, ThreadDumper threadDumper, ThreadGrouper threadGrouper, long endTime, boolean ignoreSleeping, boolean ignoreNative) { - this.threadDumper = threadDumper; - this.dataAggregator = new SimpleDataAggregator(this.workerPool, threadGrouper, interval, ignoreSleeping, ignoreNative); - this.interval = interval; - this.endTime = endTime; - } - - public Sampler(int interval, ThreadDumper threadDumper, ThreadGrouper threadGrouper, long endTime, boolean ignoreSleeping, boolean ignoreNative, TickHook tickHook, int tickLengthThreshold) { - this.threadDumper = threadDumper; - this.dataAggregator = new TickedDataAggregator(this.workerPool, threadGrouper, interval, ignoreSleeping, ignoreNative, tickHook, tickLengthThreshold); - this.interval = interval; - this.endTime = endTime; - } +public interface Sampler { /** * Starts the sampler. */ - public void start() { - this.startTime = System.currentTimeMillis(); - this.task = this.workerPool.scheduleAtFixedRate(this, 0, this.interval, TimeUnit.MICROSECONDS); - } - - public long getStartTime() { - if (this.startTime == -1) { - throw new IllegalStateException("Not yet started"); - } - return this.startTime; - } - - public long getEndTime() { - return this.endTime; - } - - public CompletableFuture getFuture() { - return this.future; - } + void start(); - public void cancel() { - this.task.cancel(false); - } - - @Override - public void run() { - // this is effectively synchronized, the worker pool will not allow this task - // to concurrently execute. - try { - if (this.endTime != -1 && this.endTime <= System.currentTimeMillis()) { - this.future.complete(this); - cancel(); - return; - } - - ThreadInfo[] threadDumps = this.threadDumper.dumpThreads(this.threadBean); - this.workerPool.execute(new InsertDataTask(this.dataAggregator, threadDumps)); - } catch (Throwable t) { - this.future.completeExceptionally(t); - cancel(); - } - } - - private static final class InsertDataTask implements Runnable { - private final DataAggregator dataAggregator; - private final ThreadInfo[] threadDumps; - - InsertDataTask(DataAggregator dataAggregator, ThreadInfo[] threadDumps) { - this.dataAggregator = dataAggregator; - this.threadDumps = threadDumps; - } - - @Override - public void run() { - for (ThreadInfo threadInfo : this.threadDumps) { - if (threadInfo.getThreadName() == null || threadInfo.getStackTrace() == null) { - continue; - } - this.dataAggregator.insertData(threadInfo); - } - } - } - - private SamplerData toProto(PlatformInfo platformInfo, CommandSender creator, Comparator> outputOrder, String comment, MergeMode mergeMode) { - final SamplerMetadata.Builder metadata = SamplerMetadata.newBuilder() - .setPlatform(platformInfo.toData().toProto()) - .setUser(creator.toData().toProto()) - .setStartTime(this.startTime) - .setInterval(this.interval) - .setThreadDumper(this.threadDumper.getMetadata()) - .setDataAggregator(this.dataAggregator.getMetadata()); - - if (comment != null) { - metadata.setComment(comment); - } - - SamplerData.Builder proto = SamplerData.newBuilder(); - proto.setMetadata(metadata.build()); + /** + * Stops the sampler. + */ + void stop(); - List> data = new ArrayList<>(this.dataAggregator.getData().entrySet()); - data.sort(outputOrder); + /** + * Gets the time when the sampler started (unix timestamp in millis) + * + * @return the start time + */ + long getStartTime(); - for (Map.Entry entry : data) { - proto.addThreads(entry.getValue().toProto(mergeMode)); - } + /** + * Gets the time when the sampler should automatically stop (unix timestamp in millis) + * + * @return the end time, or -1 if undefined + */ + long getEndTime(); - return proto.build(); - } + /** + * Gets a future to encapsulate the completion of the sampler + * + * @return a future + */ + CompletableFuture getFuture(); + + // Methods used to export the sampler data to the web viewer. + SamplerData toProto( + PlatformInfo platformInfo, + CommandSender creator, + Comparator> outputOrder, + String comment, + MergeMode mergeMode + ); - public byte[] formCompressedDataPayload(PlatformInfo platformInfo, CommandSender creator, Comparator> outputOrder, String comment, MergeMode mergeMode) { + default byte[] formCompressedDataPayload(PlatformInfo platformInfo, CommandSender creator, Comparator> outputOrder, String comment, MergeMode mergeMode) { SamplerData proto = toProto(platformInfo, creator, outputOrder, comment, mergeMode); ByteArrayOutputStream byteOut = new ByteArrayOutputStream(); diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java index 8993445..7abe1a7 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java @@ -20,6 +20,9 @@ package me.lucko.spark.common.sampler; +import me.lucko.spark.common.sampler.async.AsyncProfilerAccess; +import me.lucko.spark.common.sampler.async.AsyncSampler; +import me.lucko.spark.common.sampler.java.JavaSampler; import me.lucko.spark.common.sampler.tick.TickHook; import java.util.concurrent.TimeUnit; @@ -32,6 +35,7 @@ public class SamplerBuilder { private double samplingInterval = 4; // milliseconds private boolean ignoreSleeping = false; private boolean ignoreNative = false; + private boolean useAsyncProfiler = true; private long timeout = -1; private ThreadDumper threadDumper = ThreadDumper.ALL; private ThreadGrouper threadGrouper = ThreadGrouper.BY_NAME; @@ -81,14 +85,23 @@ public class SamplerBuilder { return this; } + public SamplerBuilder forceJavaSampler(boolean forceJavaSampler) { + this.useAsyncProfiler = !forceJavaSampler; + return this; + } + public Sampler start() { Sampler sampler; int intervalMicros = (int) (this.samplingInterval * 1000d); if (this.ticksOver == -1 || this.tickHook == null) { - sampler = new Sampler(intervalMicros, this.threadDumper, this.threadGrouper, this.timeout, this.ignoreSleeping, this.ignoreNative); + if (this.useAsyncProfiler && this.timeout == -1 && !(this.threadDumper instanceof ThreadDumper.Regex) && AsyncProfilerAccess.INSTANCE.isSupported()) { + sampler = new AsyncSampler(intervalMicros, this.threadDumper, this.threadGrouper); + } else { + sampler = new JavaSampler(intervalMicros, this.threadDumper, this.threadGrouper, this.timeout, this.ignoreSleeping, this.ignoreNative); + } } else { - sampler = new Sampler(intervalMicros, this.threadDumper, this.threadGrouper, this.timeout, this.ignoreSleeping, this.ignoreNative, this.tickHook, this.ticksOver); + sampler = new JavaSampler(intervalMicros, this.threadDumper, this.threadGrouper, this.timeout, this.ignoreSleeping, this.ignoreNative, this.tickHook, this.ticksOver); } sampler.start(); diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadDumper.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadDumper.java index 224918e..4863482 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadDumper.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadDumper.java @@ -76,17 +76,38 @@ public interface ThreadDumper { */ final class Specific implements ThreadDumper { private final long[] ids; + private Set threads; + private Set threadNamesLowerCase; public Specific(long[] ids) { this.ids = ids; } public Specific(Set names) { - Set namesLower = names.stream().map(String::toLowerCase).collect(Collectors.toSet()); + this.threadNamesLowerCase = names.stream().map(String::toLowerCase).collect(Collectors.toSet()); this.ids = new ThreadFinder().getThreads() - .filter(t -> namesLower.contains(t.getName().toLowerCase())) + .filter(t -> this.threadNamesLowerCase.contains(t.getName().toLowerCase())) .mapToLong(Thread::getId) .toArray(); + Arrays.sort(this.ids); + } + + public Set getThreads() { + if (this.threads == null) { + this.threads = new ThreadFinder().getThreads() + .filter(t -> Arrays.binarySearch(this.ids, t.getId()) >= 0) + .collect(Collectors.toSet()); + } + return this.threads; + } + + public Set getThreadNames() { + if (this.threadNamesLowerCase == null) { + this.threadNamesLowerCase = getThreads().stream() + .map(t -> t.getName().toLowerCase()) + .collect(Collectors.toSet()); + } + return this.threadNamesLowerCase; } @Override diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/AbstractDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/AbstractDataAggregator.java index c9a4f37..7640d60 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/AbstractDataAggregator.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/AbstractDataAggregator.java @@ -23,37 +23,22 @@ package me.lucko.spark.common.sampler.aggregator; import me.lucko.spark.common.sampler.ThreadGrouper; import me.lucko.spark.common.sampler.node.ThreadNode; -import java.lang.management.ThreadInfo; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; +/** + * Abstract implementation of {@link DataAggregator}. + */ public abstract class AbstractDataAggregator implements DataAggregator { /** A map of root stack nodes for each thread with sampling data */ protected final Map threadData = new ConcurrentHashMap<>(); - /** The worker pool for inserting stack nodes */ - protected final ExecutorService workerPool; - /** The instance used to group threads together */ protected final ThreadGrouper threadGrouper; - /** The interval to wait between sampling, in microseconds */ - protected final int interval; - - /** If sleeping threads should be ignored */ - private final boolean ignoreSleeping; - - /** If threads executing native code should be ignored */ - private final boolean ignoreNative; - - public AbstractDataAggregator(ExecutorService workerPool, ThreadGrouper threadGrouper, int interval, boolean ignoreSleeping, boolean ignoreNative) { - this.workerPool = workerPool; + protected AbstractDataAggregator(ThreadGrouper threadGrouper) { this.threadGrouper = threadGrouper; - this.interval = interval; - this.ignoreSleeping = ignoreSleeping; - this.ignoreNative = ignoreNative; } protected ThreadNode getNode(String group) { @@ -64,42 +49,8 @@ public abstract class AbstractDataAggregator implements DataAggregator { return this.threadData.computeIfAbsent(group, ThreadNode::new); } - protected void writeData(ThreadInfo threadInfo) { - if (this.ignoreSleeping && isSleeping(threadInfo)) { - return; - } - if (this.ignoreNative && threadInfo.isInNative()) { - return; - } - - try { - ThreadNode node = getNode(this.threadGrouper.getGroup(threadInfo.getThreadId(), threadInfo.getThreadName())); - node.log(threadInfo.getStackTrace(), this.interval); - } catch (Exception e) { - e.printStackTrace(); - } + @Override + public Map getData() { + return this.threadData; } - - private static boolean isSleeping(ThreadInfo thread) { - if (thread.getThreadState() == Thread.State.WAITING || thread.getThreadState() == Thread.State.TIMED_WAITING) { - return true; - } - - StackTraceElement[] stackTrace = thread.getStackTrace(); - if (stackTrace.length == 0) { - return false; - } - - StackTraceElement call = stackTrace[0]; - String clazz = call.getClassName(); - String method = call.getMethodName(); - - // java.lang.Thread.yield() - // jdk.internal.misc.Unsafe.park() - // sun.misc.Unsafe.park() - return (clazz.equals("java.lang.Thread") && method.equals("yield")) || - (clazz.equals("jdk.internal.misc.Unsafe") && method.equals("park")) || - (clazz.equals("sun.misc.Unsafe") && method.equals("park")); - } - } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/DataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/DataAggregator.java index 8318fbd..a91a998 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/DataAggregator.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/DataAggregator.java @@ -38,13 +38,6 @@ public interface DataAggregator { */ Map getData(); - /** - * Inserts sampling data into this aggregator - * - * @param threadInfo the thread info - */ - void insertData(ThreadInfo threadInfo); - /** * Gets metadata about the data aggregator instance. */ diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/SimpleDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/SimpleDataAggregator.java deleted file mode 100644 index 8463f98..0000000 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/SimpleDataAggregator.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * This file is part of spark. - * - * Copyright (c) lucko (Luck) - * Copyright (c) contributors - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ - -package me.lucko.spark.common.sampler.aggregator; - -import me.lucko.spark.common.sampler.ThreadGrouper; -import me.lucko.spark.common.sampler.node.ThreadNode; -import me.lucko.spark.proto.SparkProtos.SamplerMetadata; - -import java.lang.management.ThreadInfo; -import java.util.Map; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; - -/** - * Basic implementation of {@link DataAggregator}. - */ -public class SimpleDataAggregator extends AbstractDataAggregator { - public SimpleDataAggregator(ExecutorService workerPool, ThreadGrouper threadGrouper, int interval, boolean ignoreSleeping, boolean ignoreNative) { - super(workerPool, threadGrouper, interval, ignoreSleeping, ignoreNative); - } - - @Override - public SamplerMetadata.DataAggregator getMetadata() { - return SamplerMetadata.DataAggregator.newBuilder() - .setType(SamplerMetadata.DataAggregator.Type.SIMPLE) - .setThreadGrouper(this.threadGrouper.asProto()) - .build(); - } - - @Override - public void insertData(ThreadInfo threadInfo) { - writeData(threadInfo); - } - - @Override - public Map getData() { - // wait for all pending data to be inserted - this.workerPool.shutdown(); - try { - this.workerPool.awaitTermination(15, TimeUnit.SECONDS); - } catch (InterruptedException e) { - e.printStackTrace(); - } - - return this.threadData; - } -} diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/TickedDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/TickedDataAggregator.java deleted file mode 100644 index 7ad6781..0000000 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/TickedDataAggregator.java +++ /dev/null @@ -1,143 +0,0 @@ -/* - * This file is part of spark. - * - * Copyright (c) lucko (Luck) - * Copyright (c) contributors - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ - -package me.lucko.spark.common.sampler.aggregator; - -import me.lucko.spark.common.sampler.ThreadGrouper; -import me.lucko.spark.common.sampler.node.ThreadNode; -import me.lucko.spark.common.sampler.tick.TickHook; -import me.lucko.spark.proto.SparkProtos.SamplerMetadata; - -import java.lang.management.ThreadInfo; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; - -/** - * Implementation of {@link DataAggregator} which supports only including sampling data from "ticks" - * which exceed a certain threshold in duration. - */ -public class TickedDataAggregator extends AbstractDataAggregator { - - /** Used to monitor the current "tick" of the server */ - private final TickHook tickHook; - - /** Tick durations under this threshold will not be inserted, measured in microseconds */ - private final long tickLengthThreshold; - - /** The expected number of samples in each tick */ - private final int expectedSize; - - private final Object mutex = new Object(); - - // state - private int currentTick = -1; - private TickList currentData = new TickList(0); - - public TickedDataAggregator(ExecutorService workerPool, ThreadGrouper threadGrouper, int interval, boolean ignoreSleeping, boolean ignoreNative, TickHook tickHook, int tickLengthThreshold) { - super(workerPool, threadGrouper, interval, ignoreSleeping, ignoreNative); - this.tickHook = tickHook; - this.tickLengthThreshold = TimeUnit.MILLISECONDS.toMicros(tickLengthThreshold); - // 50 millis in a tick, plus 10 so we have a bit of room to go over - double intervalMilliseconds = interval / 1000d; - this.expectedSize = (int) ((50 / intervalMilliseconds) + 10); - } - - @Override - public SamplerMetadata.DataAggregator getMetadata() { - return SamplerMetadata.DataAggregator.newBuilder() - .setType(SamplerMetadata.DataAggregator.Type.TICKED) - .setThreadGrouper(this.threadGrouper.asProto()) - .setTickLengthThreshold(this.tickLengthThreshold) - .build(); - } - - @Override - public void insertData(ThreadInfo threadInfo) { - synchronized (this.mutex) { - int tick = this.tickHook.getCurrentTick(); - if (this.currentTick != tick) { - pushCurrentTick(); - this.currentTick = tick; - this.currentData = new TickList(this.expectedSize); - } - - this.currentData.addData(threadInfo); - } - } - - // guarded by 'mutex' - private void pushCurrentTick() { - TickList currentData = this.currentData; - - // approximate how long the tick lasted - int tickLengthMicros = currentData.getList().size() * this.interval; - - // don't push data below the threshold - if (tickLengthMicros < this.tickLengthThreshold) { - return; - } - - this.workerPool.submit(currentData); - } - - @Override - public Map getData() { - // push the current tick - synchronized (this.mutex) { - pushCurrentTick(); - } - - // wait for all pending data to be inserted - this.workerPool.shutdown(); - try { - this.workerPool.awaitTermination(15, TimeUnit.SECONDS); - } catch (InterruptedException e) { - e.printStackTrace(); - } - - return this.threadData; - } - - private final class TickList implements Runnable { - private final List list; - - TickList(int expectedSize) { - this.list = new ArrayList<>(expectedSize); - } - - @Override - public void run() { - for (ThreadInfo data : this.list) { - writeData(data); - } - } - - public List getList() { - return this.list; - } - - public void addData(ThreadInfo data) { - this.list.add(data); - } - } -} diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncDataAggregator.java new file mode 100644 index 0000000..cb3bd6f --- /dev/null +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncDataAggregator.java @@ -0,0 +1,53 @@ +/* + * This file is part of spark. + * + * Copyright (c) lucko (Luck) + * Copyright (c) contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +package me.lucko.spark.common.sampler.async; + +import me.lucko.spark.common.sampler.ThreadGrouper; +import me.lucko.spark.common.sampler.aggregator.AbstractDataAggregator; +import me.lucko.spark.common.sampler.node.ThreadNode; +import me.lucko.spark.proto.SparkProtos.SamplerMetadata; + +/** + * Data aggregator for {@link AsyncSampler}. + */ +public class AsyncDataAggregator extends AbstractDataAggregator { + protected AsyncDataAggregator(ThreadGrouper threadGrouper) { + super(threadGrouper); + } + + @Override + public SamplerMetadata.DataAggregator getMetadata() { + return SamplerMetadata.DataAggregator.newBuilder() + .setType(SamplerMetadata.DataAggregator.Type.SIMPLE) + .setThreadGrouper(this.threadGrouper.asProto()) + .build(); + } + + public void insertData(ProfileSegment element) { + try { + ThreadNode node = getNode(this.threadGrouper.getGroup(element.getNativeThreadId(), element.getThreadName())); + node.log(element.getStackTrace(), element.getTime()); + } catch (Exception e) { + e.printStackTrace(); + } + } + +} diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerAccess.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerAccess.java new file mode 100644 index 0000000..3d53903 --- /dev/null +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerAccess.java @@ -0,0 +1,117 @@ +/* + * This file is part of spark. + * + * Copyright (c) lucko (Luck) + * Copyright (c) contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +package me.lucko.spark.common.sampler.async; + +import one.profiler.AsyncProfiler; + +import java.io.InputStream; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; + +/** + * Provides a bridge between spark and async-profiler. + */ +public final class AsyncProfilerAccess { + + // Singleton + public static final AsyncProfilerAccess INSTANCE = new AsyncProfilerAccess(); + + // Only support Linux x86_64 + private static final String SUPPORTED_OS = "linux"; + private static final String SUPPORTED_ARCH = "amd64"; + + private static AsyncProfiler load() throws Exception { + // check compatibility + String os = System.getProperty("os.name"); + if (!SUPPORTED_OS.equalsIgnoreCase(os)) { + throw new UnsupportedOperationException("Only supported on Linux x86_64, your OS: " + os); + } + + String arch = System.getProperty("os.arch"); + if (!SUPPORTED_ARCH.equalsIgnoreCase(arch)) { + throw new UnsupportedOperationException("Only supported on Linux x86_64, your arch: " + os); + } + + // extract the profiler binary from the spark jar file + URL profilerResource = AsyncProfilerAccess.class.getClassLoader().getResource("libasyncProfiler.so"); + if (profilerResource == null) { + throw new IllegalStateException("Could not find libasyncProfiler.so in spark jar file"); + } + + Path extractPath = Files.createTempFile("spark-", "-libasyncProfiler.so.tmp"); + extractPath.toFile().deleteOnExit(); + + try (InputStream in = profilerResource.openStream()) { + Files.copy(in, extractPath, StandardCopyOption.REPLACE_EXISTING); + } + + // get an instance of async-profiler + return AsyncProfiler.getInstance(extractPath.toAbsolutePath().toString()); + } + + /** An instance of the async-profiler Java API. */ + private final AsyncProfiler profiler; + + /** If profiler is null, contains the reason why setup failed */ + private final Exception setupException; + + private AsyncProfilerAccess() { + AsyncProfiler profiler; + Exception setupException = null; + + try { + profiler = load(); + ensureCpuEventSupported(profiler); + } catch (Exception e) { + profiler = null; + setupException = e; + } + + this.profiler = profiler; + this.setupException = setupException; + } + + /** + * Checks the {@code profiler} to ensure the CPU event is supported. + * + * @param profiler the profiler instance + * @throws Exception if the event is not supported + */ + private static void ensureCpuEventSupported(AsyncProfiler profiler) throws Exception { + String resp = profiler.execute("check,event=cpu").trim(); + if (!resp.equalsIgnoreCase("ok")) { + throw new UnsupportedOperationException("CPU event is not supported"); + } + } + + public AsyncProfiler getProfiler() { + if (this.profiler == null) { + throw new UnsupportedOperationException("async-profiler not supported", this.setupException); + } + return this.profiler; + } + + public boolean isSupported() { + return this.profiler != null; + } +} diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java new file mode 100644 index 0000000..a109be7 --- /dev/null +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java @@ -0,0 +1,279 @@ +/* + * This file is part of spark. + * + * Copyright (c) lucko (Luck) + * Copyright (c) contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +package me.lucko.spark.common.sampler.async; + +import me.lucko.spark.common.command.sender.CommandSender; +import me.lucko.spark.common.platform.PlatformInfo; +import me.lucko.spark.common.sampler.Sampler; +import me.lucko.spark.common.sampler.ThreadDumper; +import me.lucko.spark.common.sampler.ThreadGrouper; +import me.lucko.spark.common.sampler.async.jfr.ClassRef; +import me.lucko.spark.common.sampler.async.jfr.JfrReader; +import me.lucko.spark.common.sampler.async.jfr.MethodRef; +import me.lucko.spark.common.sampler.async.jfr.Sample; +import me.lucko.spark.common.sampler.async.jfr.StackTrace; +import me.lucko.spark.common.sampler.node.MergeMode; +import me.lucko.spark.common.sampler.node.ThreadNode; +import me.lucko.spark.proto.SparkProtos; + +import one.profiler.AsyncProfiler; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; + +/** + * A sampler implementation using async-profiler. + */ +public class AsyncSampler implements Sampler { + private final AsyncProfiler profiler; + + /** The instance used to generate thread information for use in sampling */ + private final ThreadDumper threadDumper; + /** Responsible for aggregating and then outputting collected sampling data */ + private final AsyncDataAggregator dataAggregator; + /** Flag to mark if the output has been completed */ + private boolean outputComplete = false; + /** The temporary output file */ + private Path outputFile; + + /** The interval to wait between sampling, in microseconds */ + private final int interval; + /** The time when sampling first began */ + private long startTime = -1; + + public AsyncSampler(int interval, ThreadDumper threadDumper, ThreadGrouper threadGrouper) { + this.profiler = AsyncProfilerAccess.INSTANCE.getProfiler(); + this.threadDumper = threadDumper; + this.dataAggregator = new AsyncDataAggregator(threadGrouper); + this.interval = interval; + } + + @Override + public long getStartTime() { + if (this.startTime == -1) { + throw new IllegalStateException("Not yet started"); + } + return this.startTime; + } + + @Override + public long getEndTime() { + return -1; + } + + @Override + public CompletableFuture getFuture() { + return new CompletableFuture<>(); + } + + /** + * Executes a profiler command. + * + * @param command the command to execute + * @return the response + */ + private String execute(String command) { + try { + return this.profiler.execute(command); + } catch (IOException e) { + throw new RuntimeException("Exception whilst executing profiler command", e); + } + } + + /** + * Starts the profiler. + */ + @Override + public void start() { + this.startTime = System.currentTimeMillis(); + + try { + this.outputFile = Files.createTempFile("spark-profile-", ".jfr.tmp"); + this.outputFile.toFile().deleteOnExit(); + } catch (IOException e) { + throw new RuntimeException("Unable to create temporary output file", e); + } + + String command = "start,event=cpu,interval=" + this.interval + "us,threads,jfr,file=" + this.outputFile.toString(); + if (this.threadDumper instanceof ThreadDumper.Specific) { + command += ",filter"; + } + + String resp = execute(command).trim(); + if (!resp.equalsIgnoreCase("profiling started")) { + throw new RuntimeException("Unexpected response: " + resp); + } + + if (this.threadDumper instanceof ThreadDumper.Specific) { + ThreadDumper.Specific threadDumper = (ThreadDumper.Specific) this.threadDumper; + for (Thread thread : threadDumper.getThreads()) { + this.profiler.addThread(thread); + } + } + } + + /** + * Stops the profiler. + */ + @Override + public void stop() { + this.profiler.stop(); + } + + @Override + public SparkProtos.SamplerData toProto(PlatformInfo platformInfo, CommandSender creator, Comparator> outputOrder, String comment, MergeMode mergeMode) { + final SparkProtos.SamplerMetadata.Builder metadata = SparkProtos.SamplerMetadata.newBuilder() + .setPlatform(platformInfo.toData().toProto()) + .setUser(creator.toData().toProto()) + .setStartTime(this.startTime) + .setInterval(this.interval) + .setThreadDumper(this.threadDumper.getMetadata()) + .setDataAggregator(this.dataAggregator.getMetadata()); + + if (comment != null) { + metadata.setComment(comment); + } + + SparkProtos.SamplerData.Builder proto = SparkProtos.SamplerData.newBuilder(); + proto.setMetadata(metadata.build()); + + aggregateOutput(); + + List> data = new ArrayList<>(this.dataAggregator.getData().entrySet()); + data.sort(outputOrder); + + for (Map.Entry entry : data) { + proto.addThreads(entry.getValue().toProto(mergeMode)); + } + + return proto.build(); + } + + private void aggregateOutput() { + if (this.outputComplete) { + return; + } + this.outputComplete = true; + + Predicate threadFilter; + if (this.threadDumper instanceof ThreadDumper.Specific) { + ThreadDumper.Specific threadDumper = (ThreadDumper.Specific) this.threadDumper; + threadFilter = n -> threadDumper.getThreadNames().contains(n.toLowerCase()); + } else { + threadFilter = n -> true; + } + + // read the jfr file produced by async-profiler + try (JfrReader reader = new JfrReader(this.outputFile)) { + readSegments(reader, threadFilter); + } catch (IOException e) { + throw new RuntimeException("Read error", e); + } + + // delete the output file after reading + try { + Files.deleteIfExists(this.outputFile); + } catch (IOException e) { + // ignore + } + } + + private void readSegments(JfrReader reader, Predicate threadFilter) { + List samples = reader.samples; + for (int i = 0; i < samples.size(); i++) { + Sample sample = samples.get(i); + + long duration; + if (i == 0) { + // we don't really know the duration of the first sample, so just use the sampling + // interval + duration = this.interval; + } else { + // calculate the duration of the sample by calculating the time elapsed since the + // previous sample + duration = TimeUnit.NANOSECONDS.toMicros(sample.time - samples.get(i - 1).time); + } + + String threadName = reader.threads.get(sample.tid); + if (!threadFilter.test(threadName)) { + continue; + } + + // parse the segment and give it to the data aggregator + ProfileSegment segment = parseSegment(reader, sample, threadName, duration); + this.dataAggregator.insertData(segment); + } + } + + private static ProfileSegment parseSegment(JfrReader reader, Sample sample, String threadName, long duration) { + StackTrace stackTrace = reader.stackTraces.get(sample.stackTraceId); + int len = stackTrace.methods.length; + + AsyncStackTraceElement[] stack = new AsyncStackTraceElement[len]; + for (int i = 0; i < len; i++) { + stack[i] = parseStackFrame(reader, stackTrace.methods[i]); + } + + return new ProfileSegment(sample.tid, threadName, stack, duration); + } + + private static AsyncStackTraceElement parseStackFrame(JfrReader reader, long methodId) { + AsyncStackTraceElement result = reader.stackFrames.get(methodId); + if (result != null) { + return result; + } + + MethodRef methodRef = reader.methods.get(methodId); + ClassRef classRef = reader.classes.get(methodRef.cls); + + byte[] className = reader.symbols.get(classRef.name); + byte[] methodName = reader.symbols.get(methodRef.name); + + if (className == null || className.length == 0) { + // native call + result = new AsyncStackTraceElement( + "native", + new String(methodName, StandardCharsets.UTF_8), + null + ); + } else { + // java method + byte[] methodDesc = reader.symbols.get(methodRef.sig); + result = new AsyncStackTraceElement( + new String(className, StandardCharsets.UTF_8).replace('/', '.'), + new String(methodName, StandardCharsets.UTF_8), + new String(methodDesc, StandardCharsets.UTF_8) + ); + } + + reader.stackFrames.put(methodId, result); + return result; + } +} diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncStackTraceElement.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncStackTraceElement.java new file mode 100644 index 0000000..cf66ded --- /dev/null +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncStackTraceElement.java @@ -0,0 +1,52 @@ +/* + * This file is part of spark. + * + * Copyright (c) lucko (Luck) + * Copyright (c) contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +package me.lucko.spark.common.sampler.async; + +/** + * Version of {@link StackTraceElement} for async-profiler output. + */ +public class AsyncStackTraceElement { + + /** The name of the class */ + private final String className; + /** The name of the method */ + private final String methodName; + /** The method description */ + private final String methodDescription; + + public AsyncStackTraceElement(String className, String methodName, String methodDescription) { + this.className = className; + this.methodName = methodName; + this.methodDescription = methodDescription; + } + + public String getClassName() { + return this.className; + } + + public String getMethodName() { + return this.methodName; + } + + public String getMethodDescription() { + return this.methodDescription; + } +} diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/ProfileSegment.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/ProfileSegment.java new file mode 100644 index 0000000..154e6fe --- /dev/null +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/ProfileSegment.java @@ -0,0 +1,61 @@ +/* + * This file is part of spark. + * + * Copyright (c) lucko (Luck) + * Copyright (c) contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +package me.lucko.spark.common.sampler.async; + +/** + * Represents a profile "segment". + * + *

async-profiler groups unique stack traces together per-thread in its output.

+ */ +public class ProfileSegment { + + /** The native thread id (does not correspond to Thread#getId) */ + private final int nativeThreadId; + /** The name of the thread */ + private final String threadName; + /** The stack trace for this segment */ + private final AsyncStackTraceElement[] stackTrace; + /** The time spent executing this segment in microseconds */ + private final long time; + + public ProfileSegment(int nativeThreadId, String threadName, AsyncStackTraceElement[] stackTrace, long time) { + this.nativeThreadId = nativeThreadId; + this.threadName = threadName; + this.stackTrace = stackTrace; + this.time = time; + } + + public int getNativeThreadId() { + return this.nativeThreadId; + } + + public String getThreadName() { + return this.threadName; + } + + public AsyncStackTraceElement[] getStackTrace() { + return this.stackTrace; + } + + public long getTime() { + return this.time; + } +} diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/ClassRef.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/ClassRef.java new file mode 100644 index 0000000..2366fa6 --- /dev/null +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/ClassRef.java @@ -0,0 +1,25 @@ +/* + * Copyright 2020 Andrei Pangin + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package me.lucko.spark.common.sampler.async.jfr; + +public class ClassRef { + public final long name; + + public ClassRef(long name) { + this.name = name; + } +} diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/Dictionary.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/Dictionary.java new file mode 100644 index 0000000..55f78ba --- /dev/null +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/Dictionary.java @@ -0,0 +1,107 @@ +/* + * Copyright 2020 Andrei Pangin + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package me.lucko.spark.common.sampler.async.jfr; + +/** + * Fast and compact long->Object map. + */ +public class Dictionary { + private static final int INITIAL_CAPACITY = 16; + + private long[] keys; + private Object[] values; + private int size; + + public Dictionary() { + this.keys = new long[INITIAL_CAPACITY]; + this.values = new Object[INITIAL_CAPACITY]; + } + + public void put(long key, T value) { + if (key == 0) { + throw new IllegalArgumentException("Zero key not allowed"); + } + + if (++size * 2 > keys.length) { + resize(keys.length * 2); + } + + int mask = keys.length - 1; + int i = hashCode(key) & mask; + while (keys[i] != 0 && keys[i] != key) { + i = (i + 1) & mask; + } + keys[i] = key; + values[i] = value; + } + + @SuppressWarnings("unchecked") + public T get(long key) { + int mask = keys.length - 1; + int i = hashCode(key) & mask; + while (keys[i] != key && keys[i] != 0) { + i = (i + 1) & mask; + } + return (T) values[i]; + } + + @SuppressWarnings("unchecked") + public void forEach(Visitor visitor) { + for (int i = 0; i < keys.length; i++) { + if (keys[i] != 0) { + visitor.visit(keys[i], (T) values[i]); + } + } + } + + public int preallocate(int count) { + int newSize = size + count; + if (newSize * 2 > keys.length) { + resize(Integer.highestOneBit(newSize * 4 - 1)); + } + return count; + } + + private void resize(int newCapacity) { + long[] newKeys = new long[newCapacity]; + Object[] newValues = new Object[newCapacity]; + int mask = newKeys.length - 1; + + for (int i = 0; i < keys.length; i++) { + if