diff options
author | lucko <git@lucko.me> | 2021-03-10 15:44:14 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-03-10 15:44:14 +0000 |
commit | 9ee26a12f12a89a6df14eb86afce5d88b5a2cf25 (patch) | |
tree | 840ab480ae324b50cefc63b69dae9b38660dc94f /spark-common/src/main/java/me/lucko/spark | |
parent | fd6736fc2f0bae48dda1d4b595e867d9c7244c27 (diff) | |
download | spark-9ee26a12f12a89a6df14eb86afce5d88b5a2cf25.tar.gz spark-9ee26a12f12a89a6df14eb86afce5d88b5a2cf25.tar.bz2 spark-9ee26a12f12a89a6df14eb86afce5d88b5a2cf25.zip |
async-profiler support (#102)
Diffstat (limited to 'spark-common/src/main/java/me/lucko/spark')
28 files changed, 1791 insertions, 255 deletions
diff --git a/spark-common/src/main/java/me/lucko/spark/common/SparkPlatform.java b/spark-common/src/main/java/me/lucko/spark/common/SparkPlatform.java index cf3b0ee..a6bf332 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/SparkPlatform.java +++ b/spark-common/src/main/java/me/lucko/spark/common/SparkPlatform.java @@ -211,7 +211,7 @@ public class SparkPlatform { if (command.aliases().contains(alias)) { try { command.executor().execute(this, sender, resp, new Arguments(rawArgs)); - } catch (IllegalArgumentException e) { + } catch (Arguments.ParseException e) { resp.replyPrefixed(text(e.getMessage(), RED)); } return; diff --git a/spark-common/src/main/java/me/lucko/spark/common/command/Arguments.java b/spark-common/src/main/java/me/lucko/spark/common/command/Arguments.java index 2b202af..3cd0365 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/command/Arguments.java +++ b/spark-common/src/main/java/me/lucko/spark/common/command/Arguments.java @@ -51,7 +51,7 @@ public class Arguments { if (flag == null || matches) { if (!matches) { - throw new IllegalArgumentException("Expected flag at position " + i + " but got '" + arg + "' instead!"); + throw new ParseException("Expected flag at position " + i + " but got '" + arg + "' instead!"); } // store existing value, if present @@ -83,7 +83,7 @@ public class Arguments { try { return Math.abs(Integer.parseInt(it.next())); } catch (NumberFormatException e) { - throw new IllegalArgumentException("Invalid input for '" + key + "' argument. Please specify a number!"); + throw new ParseException("Invalid input for '" + key + "' argument. Please specify a number!"); } } return -1; // undefined @@ -95,7 +95,7 @@ public class Arguments { try { return Math.abs(Double.parseDouble(it.next())); } catch (NumberFormatException e) { - throw new IllegalArgumentException("Invalid input for '" + key + "' argument. Please specify a number!"); + throw new ParseException("Invalid input for '" + key + "' argument. Please specify a number!"); } } return -1; // undefined @@ -108,4 +108,21 @@ public class Arguments { public boolean boolFlag(String key) { return this.parsedArgs.containsKey(key); } + + public static final class ParseException extends IllegalArgumentException { + public ParseException() { + } + + public ParseException(String s) { + super(s); + } + + public ParseException(String message, Throwable cause) { + super(message, cause); + } + + public ParseException(Throwable cause) { + super(cause); + } + } } diff --git a/spark-common/src/main/java/me/lucko/spark/common/command/modules/SamplerModule.java b/spark-common/src/main/java/me/lucko/spark/common/command/modules/SamplerModule.java index eb77b24..cce3169 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/command/modules/SamplerModule.java +++ b/spark-common/src/main/java/me/lucko/spark/common/command/modules/SamplerModule.java @@ -33,6 +33,7 @@ import me.lucko.spark.common.sampler.SamplerBuilder; import me.lucko.spark.common.sampler.ThreadDumper; import me.lucko.spark.common.sampler.ThreadGrouper; import me.lucko.spark.common.sampler.ThreadNodeOrder; +import me.lucko.spark.common.sampler.async.AsyncSampler; import me.lucko.spark.common.sampler.node.MergeMode; import me.lucko.spark.common.sampler.tick.TickHook; import me.lucko.spark.common.util.MethodDisambiguator; @@ -55,13 +56,13 @@ import static net.kyori.adventure.text.format.NamedTextColor.*; public class SamplerModule implements CommandModule { private static final MediaType SPARK_SAMPLER_MEDIA_TYPE = MediaType.parse("application/x-spark-sampler"); - /** The WarmRoast instance currently running, if any */ + /** The sampler instance currently running, if any */ private Sampler activeSampler = null; @Override public void close() { if (this.activeSampler != null) { - this.activeSampler.cancel(); + this.activeSampler.stop(); this.activeSampler = null; } } @@ -83,6 +84,7 @@ public class SamplerModule implements CommandModule { .argumentUsage("only-ticks-over", "tick length millis") .argumentUsage("ignore-sleeping", null) .argumentUsage("ignore-native", null) + .argumentUsage("force-java-sampler", null) .argumentUsage("order-by-time", null) .argumentUsage("separate-parent-calls", null) .executor((platform, sender, resp, arguments) -> { @@ -118,7 +120,7 @@ public class SamplerModule implements CommandModule { if (this.activeSampler == null) { resp.replyPrefixed(text("There isn't an active sampling task running.")); } else { - this.activeSampler.cancel(); + this.activeSampler.stop(); resp.broadcastPrefixed(text("The active sampling operation has been stopped! Uploading results...")); ThreadNodeOrder threadOrder = arguments.boolFlag("order-by-time") ? ThreadNodeOrder.BY_TIME : ThreadNodeOrder.BY_NAME; String comment = Iterables.getFirst(arguments.stringFlag("comment"), null); @@ -149,6 +151,7 @@ public class SamplerModule implements CommandModule { boolean ignoreSleeping = arguments.boolFlag("ignore-sleeping"); boolean ignoreNative = arguments.boolFlag("ignore-native"); + boolean forceJavaSampler = arguments.boolFlag("force-java-sampler"); Set<String> threads = arguments.stringFlag("thread"); ThreadDumper threadDumper; @@ -201,19 +204,25 @@ public class SamplerModule implements CommandModule { builder.samplingInterval(intervalMillis); builder.ignoreSleeping(ignoreSleeping); builder.ignoreNative(ignoreNative); + builder.forceJavaSampler(forceJavaSampler); if (ticksOver != -1) { builder.ticksOver(ticksOver, tickHook); } Sampler sampler = this.activeSampler = builder.start(); - resp.broadcastPrefixed(text("Profiler now active!", GOLD)); + resp.broadcastPrefixed(text() + .append(text("Profiler now active!", GOLD)) + .append(space()) + .append(text("(" + (sampler instanceof AsyncSampler ? "async" : "built-in java") + ")", DARK_GRAY)) + .build() + ); if (timeoutSeconds == -1) { resp.broadcastPrefixed(text("Use '/" + platform.getPlugin().getCommandName() + " profiler --stop' to stop profiling and upload the results.")); } else { resp.broadcastPrefixed(text("The results will be automatically returned after the profiler has been running for " + timeoutSeconds + " seconds.")); } - CompletableFuture<Sampler> future = this.activeSampler.getFuture(); + CompletableFuture<? extends Sampler> future = this.activeSampler.getFuture(); // send message if profiling fails future.whenCompleteAsync((s, throwable) -> { @@ -253,8 +262,8 @@ public class SamplerModule implements CommandModule { List<String> opts = new ArrayList<>(Arrays.asList("--info", "--stop", "--cancel", "--timeout", "--regex", "--combine-all", "--not-combined", "--interval", - "--only-ticks-over", "--ignore-sleeping", "--ignore-native", "--order-by-time", - "--separate-parent-calls", "--comment")); + "--only-ticks-over", "--ignore-sleeping", "--ignore-native", "--force-java-sampler", + "--order-by-time", "--separate-parent-calls", "--comment")); opts.removeAll(arguments); opts.add("--thread"); // allowed multiple times diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java index e772cb3..5088ed7 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java @@ -1,7 +1,6 @@ /* * This file is part of spark. * - * Copyright (C) Albert Pham <http://www.sk89q.com> * Copyright (c) lucko (Luck) <luck@lucko.me> * Copyright (c) contributors * @@ -21,174 +20,66 @@ package me.lucko.spark.common.sampler; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import me.lucko.spark.common.command.sender.CommandSender; import me.lucko.spark.common.platform.PlatformInfo; -import me.lucko.spark.common.sampler.aggregator.DataAggregator; -import me.lucko.spark.common.sampler.aggregator.SimpleDataAggregator; -import me.lucko.spark.common.sampler.aggregator.TickedDataAggregator; import me.lucko.spark.common.sampler.node.MergeMode; import me.lucko.spark.common.sampler.node.ThreadNode; -import me.lucko.spark.common.sampler.tick.TickHook; import me.lucko.spark.proto.SparkProtos.SamplerData; -import me.lucko.spark.proto.SparkProtos.SamplerMetadata; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.OutputStream; -import java.lang.management.ManagementFactory; -import java.lang.management.ThreadInfo; -import java.lang.management.ThreadMXBean; -import java.util.ArrayList; import java.util.Comparator; -import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import java.util.zip.GZIPOutputStream; /** - * Main sampler class. + * Abstract superinterface for all sampler implementations. */ -public class Sampler implements Runnable { - private static final AtomicInteger THREAD_ID = new AtomicInteger(0); - - /** The worker pool for inserting stack nodes */ - private final ScheduledExecutorService workerPool = Executors.newScheduledThreadPool( - 6, new ThreadFactoryBuilder().setNameFormat("spark-worker-" + THREAD_ID.getAndIncrement() + "-%d").build() - ); - - /** The main sampling task */ - private ScheduledFuture<?> task; - - /** The thread management interface for the current JVM */ - private final ThreadMXBean threadBean = ManagementFactory.getThreadMXBean(); - /** The instance used to generate thread information for use in sampling */ - private final ThreadDumper threadDumper; - /** Responsible for aggregating and then outputting collected sampling data */ - private final DataAggregator dataAggregator; - - /** A future to encapsulation the completion of this sampler instance */ - private final CompletableFuture<Sampler> future = new CompletableFuture<>(); - - /** The interval to wait between sampling, in microseconds */ - private final int interval; - /** The time when sampling first began */ - private long startTime = -1; - /** The unix timestamp (in millis) when this sampler should automatically complete.*/ - private final long endTime; // -1 for nothing - - public Sampler(int interval, ThreadDumper threadDumper, ThreadGrouper threadGrouper, long endTime, boolean ignoreSleeping, boolean ignoreNative) { - this.threadDumper = threadDumper; - this.dataAggregator = new SimpleDataAggregator(this.workerPool, threadGrouper, interval, ignoreSleeping, ignoreNative); - this.interval = interval; - this.endTime = endTime; - } - - public Sampler(int interval, ThreadDumper threadDumper, ThreadGrouper threadGrouper, long endTime, boolean ignoreSleeping, boolean ignoreNative, TickHook tickHook, int tickLengthThreshold) { - this.threadDumper = threadDumper; - this.dataAggregator = new TickedDataAggregator(this.workerPool, threadGrouper, interval, ignoreSleeping, ignoreNative, tickHook, tickLengthThreshold); - this.interval = interval; - this.endTime = endTime; - } +public interface Sampler { /** * Starts the sampler. */ - public void start() { - this.startTime = System.currentTimeMillis(); - this.task = this.workerPool.scheduleAtFixedRate(this, 0, this.interval, TimeUnit.MICROSECONDS); - } - - public long getStartTime() { - if (this.startTime == -1) { - throw new IllegalStateException("Not yet started"); - } - return this.startTime; - } - - public long getEndTime() { - return this.endTime; - } - - public CompletableFuture<Sampler> getFuture() { - return this.future; - } + void start(); - public void cancel() { - this.task.cancel(false); - } - - @Override - public void run() { - // this is effectively synchronized, the worker pool will not allow this task - // to concurrently execute. - try { - if (this.endTime != -1 && this.endTime <= System.currentTimeMillis()) { - this.future.complete(this); - cancel(); - return; - } - - ThreadInfo[] threadDumps = this.threadDumper.dumpThreads(this.threadBean); - this.workerPool.execute(new InsertDataTask(this.dataAggregator, threadDumps)); - } catch (Throwable t) { - this.future.completeExceptionally(t); - cancel(); - } - } - - private static final class InsertDataTask implements Runnable { - private final DataAggregator dataAggregator; - private final ThreadInfo[] threadDumps; - - InsertDataTask(DataAggregator dataAggregator, ThreadInfo[] threadDumps) { - this.dataAggregator = dataAggregator; - this.threadDumps = threadDumps; - } - - @Override - public void run() { - for (ThreadInfo threadInfo : this.threadDumps) { - if (threadInfo.getThreadName() == null || threadInfo.getStackTrace() == null) { - continue; - } - this.dataAggregator.insertData(threadInfo); - } - } - } - - private SamplerData toProto(PlatformInfo platformInfo, CommandSender creator, Comparator<? super Map.Entry<String, ThreadNode>> outputOrder, String comment, MergeMode mergeMode) { - final SamplerMetadata.Builder metadata = SamplerMetadata.newBuilder() - .setPlatform(platformInfo.toData().toProto()) - .setUser(creator.toData().toProto()) - .setStartTime(this.startTime) - .setInterval(this.interval) - .setThreadDumper(this.threadDumper.getMetadata()) - .setDataAggregator(this.dataAggregator.getMetadata()); - - if (comment != null) { - metadata.setComment(comment); - } - - SamplerData.Builder proto = SamplerData.newBuilder(); - proto.setMetadata(metadata.build()); + /** + * Stops the sampler. + */ + void stop(); - List<Map.Entry<String, ThreadNode>> data = new ArrayList<>(this.dataAggregator.getData().entrySet()); - data.sort(outputOrder); + /** + * Gets the time when the sampler started (unix timestamp in millis) + * + * @return the start time + */ + long getStartTime(); - for (Map.Entry<String, ThreadNode> entry : data) { - proto.addThreads(entry.getValue().toProto(mergeMode)); - } + /** + * Gets the time when the sampler should automatically stop (unix timestamp in millis) + * + * @return the end time, or -1 if undefined + */ + long getEndTime(); - return proto.build(); - } + /** + * Gets a future to encapsulate the completion of the sampler + * + * @return a future + */ + CompletableFuture<? extends Sampler> getFuture(); + + // Methods used to export the sampler data to the web viewer. + SamplerData toProto( + PlatformInfo platformInfo, + CommandSender creator, + Comparator<? super Map.Entry<String, ThreadNode>> outputOrder, + String comment, + MergeMode mergeMode + ); - public byte[] formCompressedDataPayload(PlatformInfo platformInfo, CommandSender creator, Comparator<? super Map.Entry<String, ThreadNode>> outputOrder, String comment, MergeMode mergeMode) { + default byte[] formCompressedDataPayload(PlatformInfo platformInfo, CommandSender creator, Comparator<? super Map.Entry<String, ThreadNode>> outputOrder, String comment, MergeMode mergeMode) { SamplerData proto = toProto(platformInfo, creator, outputOrder, comment, mergeMode); ByteArrayOutputStream byteOut = new ByteArrayOutputStream(); diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java index 8993445..7abe1a7 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java @@ -20,6 +20,9 @@ package me.lucko.spark.common.sampler; +import me.lucko.spark.common.sampler.async.AsyncProfilerAccess; +import me.lucko.spark.common.sampler.async.AsyncSampler; +import me.lucko.spark.common.sampler.java.JavaSampler; import me.lucko.spark.common.sampler.tick.TickHook; import java.util.concurrent.TimeUnit; @@ -32,6 +35,7 @@ public class SamplerBuilder { private double samplingInterval = 4; // milliseconds private boolean ignoreSleeping = false; private boolean ignoreNative = false; + private boolean useAsyncProfiler = true; private long timeout = -1; private ThreadDumper threadDumper = ThreadDumper.ALL; private ThreadGrouper threadGrouper = ThreadGrouper.BY_NAME; @@ -81,14 +85,23 @@ public class SamplerBuilder { return this; } + public SamplerBuilder forceJavaSampler(boolean forceJavaSampler) { + this.useAsyncProfiler = !forceJavaSampler; + return this; + } + public Sampler start() { Sampler sampler; int intervalMicros = (int) (this.samplingInterval * 1000d); if (this.ticksOver == -1 || this.tickHook == null) { - sampler = new Sampler(intervalMicros, this.threadDumper, this.threadGrouper, this.timeout, this.ignoreSleeping, this.ignoreNative); + if (this.useAsyncProfiler && this.timeout == -1 && !(this.threadDumper instanceof ThreadDumper.Regex) && AsyncProfilerAccess.INSTANCE.isSupported()) { + sampler = new AsyncSampler(intervalMicros, this.threadDumper, this.threadGrouper); + } else { + sampler = new JavaSampler(intervalMicros, this.threadDumper, this.threadGrouper, this.timeout, this.ignoreSleeping, this.ignoreNative); + } } else { - sampler = new Sampler(intervalMicros, this.threadDumper, this.threadGrouper, this.timeout, this.ignoreSleeping, this.ignoreNative, this.tickHook, this.ticksOver); + sampler = new JavaSampler(intervalMicros, this.threadDumper, this.threadGrouper, this.timeout, this.ignoreSleeping, this.ignoreNative, this.tickHook, this.ticksOver); } sampler.start(); diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadDumper.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadDumper.java index 224918e..4863482 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadDumper.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadDumper.java @@ -76,17 +76,38 @@ public interface ThreadDumper { */ final class Specific implements ThreadDumper { private final long[] ids; + private Set<Thread> threads; + private Set<String> threadNamesLowerCase; public Specific(long[] ids) { this.ids = ids; } public Specific(Set<String> names) { - Set<String> namesLower = names.stream().map(String::toLowerCase).collect(Collectors.toSet()); + this.threadNamesLowerCase = names.stream().map(String::toLowerCase).collect(Collectors.toSet()); this.ids = new ThreadFinder().getThreads() - .filter(t -> namesLower.contains(t.getName().toLowerCase())) + .filter(t -> this.threadNamesLowerCase.contains(t.getName().toLowerCase())) .mapToLong(Thread::getId) .toArray(); + Arrays.sort(this.ids); + } + + public Set<Thread> getThreads() { + if (this.threads == null) { + this.threads = new ThreadFinder().getThreads() + .filter(t -> Arrays.binarySearch(this.ids, t.getId()) >= 0) + .collect(Collectors.toSet()); + } + return this.threads; + } + + public Set<String> getThreadNames() { + if (this.threadNamesLowerCase == null) { + this.threadNamesLowerCase = getThreads().stream() + .map(t -> t.getName().toLowerCase()) + .collect(Collectors.toSet()); + } + return this.threadNamesLowerCase; } @Override diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/AbstractDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/AbstractDataAggregator.java index c9a4f37..7640d60 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/AbstractDataAggregator.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/AbstractDataAggregator.java @@ -23,37 +23,22 @@ package me.lucko.spark.common.sampler.aggregator; import me.lucko.spark.common.sampler.ThreadGrouper; import me.lucko.spark.common.sampler.node.ThreadNode; -import java.lang.management.ThreadInfo; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; +/** + * Abstract implementation of {@link DataAggregator}. + */ public abstract class AbstractDataAggregator implements DataAggregator { /** A map of root stack nodes for each thread with sampling data */ protected final Map<String, ThreadNode> threadData = new ConcurrentHashMap<>(); - /** The worker pool for inserting stack nodes */ - protected final ExecutorService workerPool; - /** The instance used to group threads together */ protected final ThreadGrouper threadGrouper; - /** The interval to wait between sampling, in microseconds */ - protected final int interval; - - /** If sleeping threads should be ignored */ - private final boolean ignoreSleeping; - - /** If threads executing native code should be ignored */ - private final boolean ignoreNative; - - public AbstractDataAggregator(ExecutorService workerPool, ThreadGrouper threadGrouper, int interval, boolean ignoreSleeping, boolean ignoreNative) { - this.workerPool = workerPool; + protected AbstractDataAggregator(ThreadGrouper threadGrouper) { this.threadGrouper = threadGrouper; - this.interval = interval; - this.ignoreSleeping = ignoreSleeping; - this.ignoreNative = ignoreNative; } protected ThreadNode getNode(String group) { @@ -64,42 +49,8 @@ public abstract class AbstractDataAggregator implements DataAggregator { return this.threadData.computeIfAbsent(group, ThreadNode::new); } - protected void writeData(ThreadInfo threadInfo) { - if (this.ignoreSleeping && isSleeping(threadInfo)) { - return; - } - if (this.ignoreNative && threadInfo.isInNative()) { - return; - } - - try { - ThreadNode node = getNode(this.threadGrouper.getGroup(threadInfo.getThreadId(), threadInfo.getThreadName())); - node.log(threadInfo.getStackTrace(), this.interval); - } catch (Exception e) { - e.printStackTrace(); - } + @Override + public Map<String, ThreadNode> getData() { + return this.threadData; } - - private static boolean isSleeping(ThreadInfo thread) { - if (thread.getThreadState() == Thread.State.WAITING || thread.getThreadState() == Thread.State.TIMED_WAITING) { - return true; - } - - StackTraceElement[] stackTrace = thread.getStackTrace(); - if (stackTrace.length == 0) { - return false; - } - - StackTraceElement call = stackTrace[0]; - String clazz = call.getClassName(); - String method = call.getMethodName(); - - // java.lang.Thread.yield() - // jdk.internal.misc.Unsafe.park() - // sun.misc.Unsafe.park() - return (clazz.equals("java.lang.Thread") && method.equals("yield")) || - (clazz.equals("jdk.internal.misc.Unsafe") && method.equals("park")) || - (clazz.equals("sun.misc.Unsafe") && method.equals("park")); - } - } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/DataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/DataAggregator.java index 8318fbd..a91a998 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/DataAggregator.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/DataAggregator.java @@ -39,13 +39,6 @@ public interface DataAggregator { Map<String, ThreadNode> getData(); /** - * Inserts sampling data into this aggregator - * - * @param threadInfo the thread info - */ - void insertData(ThreadInfo threadInfo); - - /** * Gets metadata about the data aggregator instance. */ SamplerMetadata.DataAggregator getMetadata(); diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncDataAggregator.java new file mode 100644 index 0000000..cb3bd6f --- /dev/null +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncDataAggregator.java @@ -0,0 +1,53 @@ +/* + * This file is part of spark. + * + * Copyright (c) lucko (Luck) <luck@lucko.me> + * Copyright (c) contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +package me.lucko.spark.common.sampler.async; + +import me.lucko.spark.common.sampler.ThreadGrouper; +import me.lucko.spark.common.sampler.aggregator.AbstractDataAggregator; +import me.lucko.spark.common.sampler.node.ThreadNode; +import me.lucko.spark.proto.SparkProtos.SamplerMetadata; + +/** + * Data aggregator for {@link AsyncSampler}. + */ +public class AsyncDataAggregator extends AbstractDataAggregator { + protected AsyncDataAggregator(ThreadGrouper threadGrouper) { + super(threadGrouper); + } + + @Override + public SamplerMetadata.DataAggregator getMetadata() { + return SamplerMetadata.DataAggregator.newBuilder() + .setType(SamplerMetadata.DataAggregator.Type.SIMPLE) + .setThreadGrouper(this.threadGrouper.asProto()) + .build(); + } + + public void insertData(ProfileSegment element) { + try { + ThreadNode node = getNode(this.threadGrouper.getGroup(element.getNativeThreadId(), element.getThreadName())); + node.log(element.getStackTrace(), element.getTime()); + } catch (Exception e) { + e.printStackTrace(); + } + } + +} diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerAccess.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerAccess.java new file mode 100644 index 0000000..3d53903 --- /dev/null +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerAccess.java @@ -0,0 +1,117 @@ +/* + * This file is part of spark. + * + * Copyright (c) lucko (Luck) <luck@lucko.me> + * Copyright (c) contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +package me.lucko.spark.common.sampler.async; + +import one.profiler.AsyncProfiler; + +import java.io.InputStream; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; + +/** + * Provides a bridge between spark and async-profiler. + */ +public final class AsyncProfilerAccess { + + // Singleton + public static final AsyncProfilerAccess INSTANCE = new AsyncProfilerAccess(); + + // Only support Linux x86_64 + private static final String SUPPORTED_OS = "linux"; + private static final String SUPPORTED_ARCH = "amd64"; + + private static AsyncProfiler load() throws Exception { + // check compatibility + String os = System.getProperty("os.name"); + if (!SUPPORTED_OS.equalsIgnoreCase(os)) { + throw new UnsupportedOperationException("Only supported on Linux x86_64, your OS: " + os); + } + + String arch = System.getProperty("os.arch"); + if (!SUPPORTED_ARCH.equalsIgnoreCase(arch)) { + throw new UnsupportedOperationException("Only supported on Linux x86_64, your arch: " + os); + } + + // extract the profiler binary from the spark jar file + URL profilerResource = AsyncProfilerAccess.class.getClassLoader().getResource("libasyncProfiler.so"); + if (profilerResource == null) { + throw new IllegalStateException("Could not find libasyncProfiler.so in spark jar file"); + } + + Path extractPath = Files.createTempFile("spark-", "-libasyncProfiler.so.tmp"); + extractPath.toFile().deleteOnExit(); + + try (InputStream in = profilerResource.openStream()) { + Files.copy(in, extractPath, StandardCopyOption.REPLACE_EXISTING); + } + + // get an instance of async-profiler + return AsyncProfiler.getInstance(extractPath.toAbsolutePath().toString()); + } + + /** An instance of the async-profiler Java API. */ + private final AsyncProfiler profiler; + + /** If profiler is null, contains the reason why setup failed */ + private final Exception setupException; + + private AsyncProfilerAccess() { + AsyncProfiler profiler; + Exception setupException = null; + + try { + profiler = load(); + ensureCpuEventSupported(profiler); + } catch (Exception e) { + profiler = null; + setupException = e; + } + + this.profiler = profiler; + this.setupException = setupException; + } + + /** + * Checks the {@code profiler} to ensure the CPU event is supported. + * + * @param profiler the profiler instance + * @throws Exception if the event is not supported + */ + private static void ensureCpuEventSupported(AsyncProfiler profiler) throws Exception { + String resp = profiler.execute("check,event=cpu").trim(); + if (!resp.equalsIgnoreCase("ok")) { + throw new UnsupportedOperationException("CPU event is not supported"); + } + } + + public AsyncProfiler getProfiler() { + if (this.profiler == null) { + throw new UnsupportedOperationException("async-profiler not supported", this.setupException); + } + return this.profiler; + } + + public boolean isSupported() { + return this.profiler != null; + } +} diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java new file mode 100644 index 0000000..a109be7 --- /dev/null +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java @@ -0,0 +1,279 @@ +/* + * This file is part of spark. + * + * Copyright (c) lucko (Luck) <luck@lucko.me> + * Copyright (c) contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +package me.lucko.spark.common.sampler.async; + +import me.lucko.spark.common.command.sender.CommandSender; +import me.lucko.spark.common.platform.PlatformInfo; +import me.lucko.spark.common.sampler.Sampler; +import me.lucko.spark.common.sampler.ThreadDumper; +import me.lucko.spark.common.sampler.ThreadGrouper; +import me.lucko.spark.common.sampler.async.jfr.ClassRef; +import me.lucko.spark.common.sampler.async.jfr.JfrReader; +import me.lucko.spark.common.sampler.async.jfr.MethodRef; +import me.lucko.spark.common.sampler.async.jfr.Sample; +import me.lucko.spark.common.sampler.async.jfr.StackTrace; +import me.lucko.spark.common.sampler.node.MergeMode; +import me.lucko.spark.common.sampler.node.ThreadNode; +import me.lucko.spark.proto.SparkProtos; + +import one.profiler.AsyncProfiler; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; + +/** + * A sampler implementation using async-profiler. + */ +public class AsyncSampler implements Sampler { + private final AsyncProfiler profiler; + + /** The instance used to generate thread information for use in sampling */ + private final ThreadDumper threadDumper; + /** Responsible for aggregating and then outputting collected sampling data */ + private final AsyncDataAggregator dataAggregator; + /** Flag to mark if the output has been completed */ + private boolean outputComplete = false; + /** The temporary output file */ + private Path outputFile; + + /** The interval to wait between sampling, in microseconds */ + private final int interval; + /** The time when sampling first began */ + private long startTime = -1; + + public AsyncSampler(int interval, ThreadDumper threadDumper, ThreadGrouper threadGrouper) { + this.profiler = AsyncProfilerAccess.INSTANCE.getProfiler(); + this.threadDumper = threadDumper; + this.dataAggregator = new AsyncDataAggregator(threadGrouper); + this.interval = interval; + } + + @Override + public long getStartTime() { + if (this.startTime == -1) { + throw new IllegalStateException("Not yet started"); + } + return this.startTime; + } + + @Override + public long getEndTime() { + return -1; + } + + @Override + public CompletableFuture<? extends Sampler> getFuture() { + return new CompletableFuture<>(); + } + + /** + * Executes a profiler command. + * + * @param command the command to execute + * @return the response + */ + private String execute(String command) { + try { + return this.profiler.execute(command); + } catch (IOException e) { + throw new RuntimeException("Exception whilst executing profiler command", e); + } + } + + /** + * Starts the profiler. + */ + @Override + public void start() { + this.startTime = System.currentTimeMillis(); + + try { + this.outputFile = Files.createTempFile("spark-profile-", ".jfr.tmp"); + this.outputFile.toFile().deleteOnExit(); + } catch (IOException e) { + throw new RuntimeException("Unable to create temporary output file", e); + } + + String command = "start,event=cpu,interval=" + this.interval + "us,threads,jfr,file=" + this.outputFile.toString(); + if (this.threadDumper instanceof ThreadDumper.Specific) { + command += ",filter"; + } + + String resp = execute(command).trim(); + if (!resp.equalsIgnoreCase("profiling started")) { + throw new RuntimeException("Unexpected response: " + resp); + } + + if (this.threadDumper instanceof ThreadDumper.Specific) { + ThreadDumper.Specific threadDumper = (ThreadDumper.Specific) this.threadDumper; + for (Thread thread : threadDumper.getThreads()) { + this.profiler.addThread(thread); + } + } + } + + /** + * Stops the profiler. + */ + @Override + public void stop() { + this.profiler.stop(); + } + + @Override + public SparkProtos.SamplerData toProto(PlatformInfo platformInfo, CommandSender creator, Comparator<? super Map.Entry<String, ThreadNode>> outputOrder, String comment, MergeMode mergeMode) { + final SparkProtos.SamplerMetadata.Builder metadata = SparkProtos.SamplerMetadata.newBuilder() + .setPlatform(platformInfo.toData().toProto()) + .setUser(creator.toData().toProto()) + .setStartTime(this.startTime) + .setInterval(this.interval) + .setThreadDumper(this.threadDumper.getMetadata()) + .setDataAggregator(this.dataAggregator.getMetadata()); + + if (comment != null) { + metadata.setComment(comment); + } + + SparkProtos.SamplerData.Builder proto = SparkProtos.SamplerData.newBuilder(); + proto.setMetadata(metadata.build()); + + aggregateOutput(); + + List<Map.Entry<String, ThreadNode>> data = new ArrayList<>(this.dataAggregator.getData().entrySet()); + data.sort(outputOrder); + + for (Map.Entry<String, ThreadNode> entry : data) { + proto.addThreads(entry.getValue().toProto(mergeMode)); + } + + return proto.build(); + } + + private void aggregateOutput() { + if (this.outputComplete) { + return; + } + this.outputComplete = true; + + Predicate<String> threadFilter; + if (this.threadDumper instanceof ThreadDumper.Specific) { + ThreadDumper.Specific threadDumper = (ThreadDumper.Specific) this.threadDumper; + threadFilter = n -> threadDumper.getThreadNames().contains(n.toLowerCase()); + } else { + threadFilter = n -> true; + } + + // read the jfr file produced by async-profiler + try (JfrReader reader = new JfrReader(this.outputFile)) { + readSegments(reader, threadFilter); + } catch (IOException e) { + throw new RuntimeException("Read error", e); + } + + // delete the output file after reading + try { + Files.deleteIfExists(this.outputFile); + } catch (IOException e) { + // ignore + } + } + + private void readSegments(JfrReader reader, Predicate<String> threadFilter) { + List<Sample> samples = reader.samples; + for (int i = 0; i < samples.size(); i++) { + Sample sample = samples.get(i); + + long duration; + if (i == 0) { + // we don't really know the duration of the first sample, so just use the sampling + // interval + duration = this.interval; + } else { + // calculate the duration of the sample by calculating the time elapsed since the + // previous sample + duration = TimeUnit.NANOSECONDS.toMicros(sample.time - samples.get(i - 1).time); + } + + String threadName = reader.threads.get(sample.tid); + if (!threadFilter.test(threadName)) { + continue; + } + + // parse the segment and give it to the data aggregator + ProfileSegment segment = parseSegment(reader, sample, threadName, duration); + this.dataAggregator.insertData(segment); + } + } + + private static ProfileSegment parseSegment(JfrReader reader, Sample sample, String threadName, long duration) { + StackTrace stackTrace = reader.stackTraces.get(sample.stackTraceId); + int len = stackTrace.methods.length; + + AsyncStackTraceElement[] stack = new AsyncStackTraceElement[len]; + for (int i = 0; i < len; i++) { + stack[i] = parseStackFrame(reader, stackTrace.methods[i]); + } + + return new ProfileSegment(sample.tid, threadName, stack, duration); + } + + private static AsyncStackTraceElement parseStackFrame(JfrReader reader, long methodId) { + AsyncStackTraceElement result = reader.stackFrames.get(methodId); + if (result != null) { + return result; + } + + MethodRef methodRef = reader.methods.get(methodId); + ClassRef classRef = reader.classes.get(methodRef.cls); + + byte[] className = reader.symbols.get(classRef.name); + byte[] methodName = reader.symbols.get(methodRef.name); + + if (className == null || className.length == 0) { + // native call + result = new AsyncStackTraceElement( + "native", + new String(methodName, StandardCharsets.UTF_8), + null + ); + } else { + // java method + byte[] methodDesc = reader.symbols.get(methodRef.sig); + result = new AsyncStackTraceElement( + new String(className, StandardCharsets.UTF_8).replace('/', '.'), + new String(methodName, StandardCharsets.UTF_8), + new String(methodDesc, StandardCharsets.UTF_8) + ); + } + + reader.stackFrames.put(methodId, result); + return result; + } +} diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncStackTraceElement.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncStackTraceElement.java new file mode 100644 index 0000000..cf66ded --- /dev/null +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncStackTraceElement.java @@ -0,0 +1,52 @@ +/* + * This file is part of spark. + * + * Copyright (c) lucko (Luck) <luck@lucko.me> + * Copyright (c) contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +package me.lucko.spark.common.sampler.async; + +/** + * Version of {@link StackTraceElement} for async-profiler output. + */ +public class AsyncStackTraceElement { + + /** The name of the class */ + private final String className; + /** The name of the method */ + private final String methodName; + /** The method description */ + private final String methodDescription; + + public AsyncStackTraceElement(String className, String methodName, String methodDescription) { + this.className = className; + this.methodName = methodName; + this.methodDescription = methodDescription; + } + + public String getClassName() { + return this.className; + } + + public String getMethodName() { + return this.methodName; + } + + public String getMethodDescription() { + return this.methodDescription; + } +} diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/ProfileSegment.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/ProfileSegment.java new file mode 100644 index 0000000..154e6fe --- /dev/null +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/ProfileSegment.java @@ -0,0 +1,61 @@ +/* + * This file is part of spark. + * + * Copyright (c) lucko (Luck) <luck@lucko.me> + * Copyright (c) contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +package me.lucko.spark.common.sampler.async; + +/** + * Represents a profile "segment". + * + * <p>async-profiler groups unique stack traces together per-thread in its output.</p> + */ +public class ProfileSegment { + + /** The native thread id (does not correspond to Thread#getId) */ + private final int nativeThreadId; + /** The name of the thread */ + private final String threadName; + /** The stack trace for this segment */ + private final AsyncStackTraceElement[] stackTrace; + /** The time spent executing this segment in microseconds */ + private final long time; + + public ProfileSegment(int nativeThreadId, String threadName, AsyncStackTraceElement[] stackTrace, long time) { + this.nativeThreadId = nativeThreadId; + this.threadName = threadName; + this.stackTrace = stackTrace; + this.time = time; + } + + public int getNativeThreadId() { + return this.nativeThreadId; + } + + public String getThreadName() { + return this.threadName; + } + + public AsyncStackTraceElement[] getStackTrace() { + return this.stackTrace; + } + + public long getTime() { + return this.time; + } +} diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/ClassRef.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/ClassRef.java new file mode 100644 index 0000000..2366fa6 --- /dev/null +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/ClassRef.java @@ -0,0 +1,25 @@ +/* + * Copyright 2020 Andrei Pangin + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package me.lucko.spark.common.sampler.async.jfr; + +public class ClassRef { + public final long name; + + public ClassRef(long name) { + this.name = name; + } +} diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/Dictionary.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/Dictionary.java new file mode 100644 index 0000000..55f78ba --- /dev/null +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/Dictionary.java @@ -0,0 +1,107 @@ +/* + * Copyright 2020 Andrei Pangin + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package me.lucko.spark.common.sampler.async.jfr; + +/** + * Fast and compact long->Object map. + */ +public class Dictionary<T> { + private static final int INITIAL_CAPACITY = 16; + + private long[] keys; + private Object[] values; + private int size; + + public Dictionary() { + this.keys = new long[INITIAL_CAPACITY]; + this.values = new Object[INITIAL_CAPACITY]; + } + + public void put(long key, T value) { + if (key == 0) { + throw new IllegalArgumentException("Zero key not allowed"); + } + + if (++size * 2 > keys.length) { + resize(keys.length * 2); + } + + int mask = keys.length - 1; + int i = hashCode(key) & mask; + while (keys[i] != 0 && keys[i] != key) { + i = (i + 1) & mask; + } + keys[i] = key; + values[i] = value; + } + + @SuppressWarnings("unchecked") + public T get(long key) { + int mask = keys.length - 1; + int i = hashCode(key) & mask; + while (keys[i] != key && keys[i] != 0) { + i = (i + 1) & mask; + } + return (T) values[i]; + } + + @SuppressWarnings("unchecked") + public void forEach(Visitor<T> visitor) { + for (int i = 0; i < keys.length; i++) { + if (keys[i] != 0) { + visitor.visit(keys[i], (T) values[i]); + } + } + } + + public int preallocate(int count) { + int newSize = size + count; + if (newSize * 2 > keys.length) { + resize(Integer.highestOneBit(newSize * 4 - 1)); + } + return count; + } + + private void resize(int newCapacity) { + long[] newKeys = new long[newCapacity]; + Object[] newValues = new Object[newCapacity]; + int mask = newKeys.length - 1; + + for (int i = 0; i < keys.length; i++) { + if (keys[i] != 0) { + for (int j = hashCode(keys[i]) & mask; ; j = (j + 1) & mask) { + if (newKeys[j] == 0) { + newKeys[j] = keys[i]; + newValues[j] = values[i]; + break; + } + } + } + } + + keys = newKeys; + values = newValues; + } + + private static int hashCode(long key) { + return (int) (key ^ (key >>> 32)); + } + + public interface Visitor<T> { + void visit(long key, T value); + } +} diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/Element.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/Element.java new file mode 100644 index 0000000..9d6b6c7 --- /dev/null +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/Element.java @@ -0,0 +1,23 @@ +/* + * Copyright 2020 Andrei Pangin + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package me.lucko.spark.common.sampler.async.jfr; + +class Element { + + void addChild(Element e) { + } +} diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/JfrClass.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/JfrClass.java new file mode 100644 index 0000000..a171552 --- /dev/null +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/JfrClass.java @@ -0,0 +1,49 @@ +/* + * Copyright 2020 Andrei Pangin + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package me.lucko.spark.common.sampler.async.jfr; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +class JfrClass extends Element { + final int id; + final String name; + final List<JfrField> fields; + + JfrClass(Map<String, String> attributes) { + this.id = Integer.parseInt(attributes.get("id")); + this.name = attributes.get("name"); + this.fields = new ArrayList<>(2); + } + + @Override + void addChild(Element e) { + if (e instanceof JfrField) { + fields.add((JfrField) e); + } + } + + JfrField field(String name) { + for (JfrField field : fields) { + if (field.name.equals(name)) { + return field; + } + } + return null; + } +} diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/JfrField.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/JfrField.java new file mode 100644 index 0000000..7a78f2c --- /dev/null +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/JfrField.java @@ -0,0 +1,31 @@ +/* + * Copyright 2020 Andrei Pangin + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package me.lucko.spark.common.sampler.async.jfr; + +import java.util.Map; + +class JfrField extends Element { + final String name; + final int type; + final boolean constantPool; + + JfrField(Map<String, String> attributes) { + this.name = attributes.get("name"); + this.type = Integer.parseInt(attributes.get("class")); + this.constantPool = "true".equals(attributes.get("constantPool")); + } +} diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/JfrReader.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/JfrReader.java new file mode 100644 index 0000000..49a3474 --- /dev/null +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/JfrReader.java @@ -0,0 +1,389 @@ +/* + * Copyright 2020 Andrei Pangin + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package me.lucko.spark.common.sampler.async.jfr; + +import me.lucko.spark.common.sampler.async.AsyncStackTraceElement; + +import java.io.Closeable; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.charset.StandardCharsets; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Parses JFR output produced by async-profiler. + */ +public class JfrReader implements Closeable { + private static final int CHUNK_HEADER_SIZE = 68; + private static final int CPOOL_OFFSET = 16; + private static final int META_OFFSET = 24; + + private final FileChannel ch; + private final ByteBuffer buf; + + public final long startNanos; + public final long durationNanos; + public final long startTicks; + public final long ticksPerSec; + + public final Dictionary<JfrClass> types = new Dictionary<>(); + public final Map<String, JfrClass> typesByName = new HashMap<>(); + public final Dictionary<String> threads = new Dictionary<>(); + public final Dictionary<ClassRef> classes = new Dictionary<>(); + public final Dictionary<byte[]> symbols = new Dictionary<>(); + public final Dictionary<MethodRef> methods = new Dictionary<>(); + public final Dictionary<StackTrace> stackTraces = new Dictionary<>(); + public final Dictionary<AsyncStackTraceElement> stackFrames = new Dictionary<>(); // spark + public final Map<Integer, String> frameTypes = new HashMap<>(); + public final Map<Integer, String> threadStates = new HashMap<>(); + public final List<Sample> samples = new ArrayList<>(); + + public JfrReader(Path path) throws IOException { // spark - Path instead of String + this.ch = FileChannel.open(path, StandardOpenOption.READ); // spark - Path instead of String + this.buf = ch.map(FileChannel.MapMode.READ_ONLY, 0, ch.size()); + + if (buf.getInt(0) != 0x464c5200) { + throw new IOException("Not a valid JFR file"); + } + + int version = buf.getInt(4); + if (version < 0x20000 || version > 0x2ffff) { + throw new IOException("Unsupported JFR version: " + (version >>> 16) + "." + (version & 0xffff)); + } + + this.startNanos = buf.getLong(32); + this.durationNanos = buf.getLong(40); + this.startTicks = buf.getLong(48); + this.ticksPerSec = buf.getLong(56); + + readMeta(); + readConstantPool(); + readEvents(); + } + + @Override + public void close() throws IOException { + ch.close(); + } + + private void readMeta() { + buf.position(buf.getInt(META_OFFSET + 4)); + getVarint(); + getVarint(); + getVarlong(); + getVarlong(); + getVarlong(); + + String[] strings = new String[getVarint()]; + for (int i = 0; i < strings.length; i++) { + strings[i] = getString(); + } + readElement(strings); + } + + private Element readElement(String[] strings) { + String name = strings[getVarint()]; + + int attributeCount = getVarint(); + Map<String, String> attributes = new HashMap<>(attributeCount); + for (int i = 0; i < attributeCount; i++) { + attributes.put(strings[getVarint()], strings[getVarint()]); + } + + Element e = createElement(name, attributes); + int childCount = getVarint(); + for (int i = 0; i < childCount; i++) { + e.addChild(readElement(strings)); + } + return e; + } + + private Element createElement(String name, Map<String, String> attributes) { + switch (name) { + case "class": { + JfrClass type = new JfrClass(attributes); + if (!attributes.containsKey("superType")) { + types.put(type.id, type); + } + typesByName.put(type.name, type); + return type; + } + case "field": + return new JfrField(attributes); + default: + return new Element(); + } + } + + private void readConstantPool() { + int offset = buf.getInt(CPOOL_OFFSET + 4); + while (true) { + buf.position(offset); + getVarint(); + getVarint(); + getVarlong(); + getVarlong(); + long delta = getVarlong(); + getVarint(); + + int poolCount = getVarint(); + for (int i = 0; i < poolCount; i++) { + int type = getVarint(); + readConstants(types.get(type)); + } + + if (delta == 0) { + break; + } + offset += delta; + } + } + + private void readConstants(JfrClass type) { + switch (type.name) { + case "jdk.types.ChunkHeader": + buf.position(buf.position() + (CHUNK_HEADER_SIZE + 3)); + break; + case "java.lang.Thread": + readThreads(type.field("group") != null); + break; + case "java.lang.Class": + readClasses(type.field("hidden") != null); + break; + case "jdk.types.Symbol": + readSymbols(); + break; + case "jdk.types.Method": + readMethods(); + break; + case "jdk.types.StackTrace": + readStackTraces(); + break; + case "jdk.types.FrameType": + readMap(frameTypes); + break; + case "jdk.types.ThreadState": + readMap(threadStates); + break; + default: + readOtherConstants(type.fields); + } + } + + private void readThreads(boolean hasGroup) { + int count = threads.preallocate(getVarint()); + for (int i = 0; i < count; i++) { + long id = getVarlong(); + String osName = getString(); + int osThreadId = getVarint(); + String javaName = getString(); + long javaThreadId = getVarlong(); + if (hasGroup) getVarlong(); + threads.put(id, javaName != null ? javaName : osName); + } + } + + private void readClasses(boolean hasHidden) { + int count = classes.preallocate(getVarint()); + for (int i = 0; i < count; i++) { + long id = getVarlong(); + long loader = getVarlong(); + long name = getVarlong(); + long pkg = getVarlong(); + int modifiers = getVarint(); + if (hasHidden) getVarint(); + classes.put(id, new ClassRef(name)); + } + } + + private void readMethods() { + int count = methods.preallocate(getVarint()); + for (int i = 0; i < count; i++) { + long id = getVarlong(); + long cls = getVarlong(); + long name = getVarlong(); + long sig = getVarlong(); + int modifiers = getVarint(); + int hidden = getVarint(); + methods.put(id, new MethodRef(cls, name, sig)); + } + stackFrames.preallocate(count); // spark + } + + private void readStackTraces() { + int count = stackTraces.preallocate(getVarint()); + for (int i = 0; i < count; i++) { + long id = getVarlong(); + int truncated = getVarint(); + StackTrace stackTrace = readStackTrace(); + stackTraces.put(id, stackTrace); + } + } + + private StackTrace readStackTrace() { + int depth = getVarint(); + long[] methods = new long[depth]; + byte[] types = new byte[depth]; + for (int i = 0; i < depth; i++) { + methods[i] = getVarlong(); + int line = getVarint(); + int bci = getVarint(); + types[i] = buf.get(); + } + return new StackTrace(methods, types); + } + + private void readSymbols() { + int count = symbols.preallocate(getVarint()); + for (int i = 0; i < count; i++) { + long id = getVarlong(); + if (buf.get() != 3) { + throw new IllegalArgumentException("Invalid symbol encoding"); + } + symbols.put(id, getBytes()); + } + } + + private void readMap(Map<Integer, String> map) { + int count = getVarint(); + for (int i = 0; i < count; i++) { + map.put(getVarint(), getString()); + } + } + + private void readOtherConstants(List<JfrField> fields) { + int stringType = getTypeId("java.lang.String"); + + boolean[] numeric = new boolean[fields.size()]; + for (int i = 0; i < numeric.length; i++) { + JfrField f = fields.get(i); + numeric[i] = f.constantPool || f.type != stringType; + } + + int count = getVarint(); + for (int i = 0; i < count; i++) { + getVarlong(); + readFields(numeric); + } + } + + private void readFields(boolean[] numeric) { + for (boolean n : numeric) { + if (n) { + getVarlong(); + } else { + getString(); + } + } + } + + private void readEvents() { + int executionSample = getTypeId("jdk.ExecutionSample"); + int nativeMethodSample = getTypeId("jdk.NativeMethodSample"); + + buf.position(CHUNK_HEADER_SIZE); + while (buf.hasRemaining()) { + int position = buf.position(); + int size = getVarint(); + int type = getVarint(); + if (type == executionSample || type == nativeMethodSample) { + readExecutionSample(); + } else { + buf.position(position + size); + } + } + + Collections.sort(samples); + } + + private void readExecutionSample() { + long time = getVarlong(); + int tid = getVarint(); + int stackTraceId = getVarint(); + int threadState = getVarint(); + samples.add(new Sample(time, tid, stackTraceId, threadState)); + + StackTrace stackTrace = stackTraces.get(stackTraceId); + if (stackTrace != null) { + stackTrace.samples++; + } + } + + private int getTypeId(String typeName) { + JfrClass type = typesByName.get(typeName); + return type != null ? type.id : -1; + } + + private int getVarint() { + int result = 0; + for (int shift = 0; ; shift += 7) { + byte b = buf.get(); + result |= (b & 0x7f) << shift; + if (b >= 0) { + return result; + } + } + } + + private long getVarlong() { + long result = 0; + for (int shift = 0; shift < 56; shift += 7) { + byte b = buf.get(); + result |= (b & 0x7fL) << shift; + if (b >= 0) { + return result; + } + } + return result | (buf.get() & 0xffL) << 56; + } + + private String getString() { + switch (buf.get()) { + case 0: + return null; + case 1: + return ""; + case 3: + return new String(getBytes(), StandardCharsets.UTF_8); + case 4: { + char[] chars = new char[getVarint()]; + for (int i = 0; i < chars.length; i++) { + chars[i] = (char) getVarint(); + } + return new String(chars); + } + case 5: + return new String(getBytes(), StandardCharsets.ISO_8859_1); + default: + throw new IllegalArgumentException("Invalid string encoding"); + } + } + + private byte[] getBytes() { + byte[] bytes = new byte[getVarint()]; + buf.get(bytes); + return bytes; + } +} diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/MethodRef.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/MethodRef.java new file mode 100644 index 0000000..2f9071e --- /dev/null +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/MethodRef.java @@ -0,0 +1,29 @@ +/* + * Copyright 2020 Andrei Pangin + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package me.lucko.spark.common.sampler.async.jfr; + +public class MethodRef { + public final long cls; + public final long name; + public final long sig; + + public MethodRef(long cls, long name, long sig) { + this.cls = cls; + this.name = name; + this.sig = sig; + } +} diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/Sample.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/Sample.java new file mode 100644 index 0000000..095e261 --- /dev/null +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/Sample.java @@ -0,0 +1,36 @@ +/* + * Copyright 2020 Andrei Pangin + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package me.lucko.spark.common.sampler.async.jfr; + +public class Sample implements Comparable<Sample> { + public final long time; + public final int tid; + public final int stackTraceId; + public final int threadState; + + public Sample(long time, int tid, int stackTraceId, int threadState) { + this.time = time; + this.tid = tid; + this.stackTraceId = stackTraceId; + this.threadState = threadState; + } + + @Override + public int compareTo(Sample o) { + return Long.compare(time, o.time); + } +} diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/StackTrace.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/StackTrace.java new file mode 100644 index 0000000..4c12c5e --- /dev/null +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/StackTrace.java @@ -0,0 +1,28 @@ +/* + * Copyright 2020 Andrei Pangin + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package me.lucko.spark.common.sampler.async.jfr; + +public class StackTrace { + public final long[] methods; + public final byte[] types; + public long samples; + + public StackTrace(long[] methods, byte[] types) { + this.methods = methods; + this.types = types; + } +} diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaDataAggregator.java new file mode 100644 index 0000000..a81a5c1 --- /dev/null +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaDataAggregator.java @@ -0,0 +1,116 @@ +/* + * This file is part of spark. + * + * Copyright (c) lucko (Luck) <luck@lucko.me> + * Copyright (c) contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +package me.lucko.spark.common.sampler.java; + +import me.lucko.spark.common.sampler.ThreadGrouper; +import me.lucko.spark.common.sampler.aggregator.AbstractDataAggregator; +import me.lucko.spark.common.sampler.aggregator.DataAggregator; +import me.lucko.spark.common.sampler.node.ThreadNode; + +import java.lang.management.ThreadInfo; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * Abstract {@link DataAggregator} for the {@link JavaSampler}. + */ +public abstract class JavaDataAggregator extends AbstractDataAggregator { + + /** The worker pool for inserting stack nodes */ + protected final ExecutorService workerPool; + + /** The interval to wait between sampling, in microseconds */ + protected final int interval; + + /** If sleeping threads should be ignored */ + private final boolean ignoreSleeping; + + /** If threads executing native code should be ignored */ + private final boolean ignoreNative; + + public JavaDataAggregator(ExecutorService workerPool, ThreadGrouper threadGrouper, int interval, boolean ignoreSleeping, boolean ignoreNative) { + super(threadGrouper); + this.workerPool = workerPool; + this.interval = interval; + this.ignoreSleeping = ignoreSleeping; + this.ignoreNative = ignoreNative; + } + + /** + * Inserts sampling data into this aggregator + * + * @param threadInfo the thread info + */ + public abstract void insertData(ThreadInfo threadInfo); + + protected void writeData(ThreadInfo threadInfo) { + if (this.ignoreSleeping && isSleeping(threadInfo)) { + return; + } + if (this.ignoreNative && threadInfo.isInNative()) { + return; + } + + try { + ThreadNode node = getNode(this.threadGrouper.getGroup(threadInfo.getThreadId(), threadInfo.getThreadName())); + node.log(threadInfo.getStackTrace(), this.interval); + } catch (Exception e) { + e.printStackTrace(); + } + } + + @Override + public Map<String, ThreadNode> getData() { + // wait for all pending data to be inserted + this.workerPool.shutdown(); + try { + this.workerPool.awaitTermination(15, TimeUnit.SECONDS); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + return super.getData(); + } + + private static boolean isSleeping(ThreadInfo thread) { + if (thread.getThreadState() == Thread.State.WAITING || thread.getThreadState() == Thread.State.TIMED_WAITING) { + return true; + } + + StackTraceElement[] stackTrace = thread.getStackTrace(); + if (stackTrace.length == 0) { + return false; + } + + StackTraceElement call = stackTrace[0]; + String clazz = call.getClassName(); + String method = call.getMethodName(); + + // java.lang.Thread.yield() + // jdk.internal.misc.Unsafe.park() + // sun.misc.Unsafe.park() + return (clazz.equals("java.lang.Thread") && method.equals("yield")) || + (clazz.equals("jdk.internal.misc.Unsafe") && method.equals("park")) || + (clazz.equals("sun.misc.Unsafe") && method.equals("park")); + } + +} diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java new file mode 100644 index 0000000..568609e --- /dev/null +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java @@ -0,0 +1,189 @@ +/* + * This file is part of spark. + * + * Copyright (C) Albert Pham <http://www.sk89q.com> + * Copyright (c) lucko (Luck) <luck@lucko.me> + * Copyright (c) contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +package me.lucko.spark.common.sampler.java; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import me.lucko.spark.common.command.sender.CommandSender; +import me.lucko.spark.common.platform.PlatformInfo; +import me.lucko.spark.common.sampler.Sampler; +import me.lucko.spark.common.sampler.ThreadDumper; +import me.lucko.spark.common.sampler.ThreadGrouper; +import me.lucko.spark.common.sampler.node.MergeMode; +import me.lucko.spark.common.sampler.node.ThreadNode; +import me.lucko.spark.common.sampler.tick.TickHook; +import me.lucko.spark.proto.SparkProtos.SamplerData; +import me.lucko.spark.proto.SparkProtos.SamplerMetadata; + +import java.lang.management.ManagementFactory; +import java.lang.management.ThreadInfo; +import java.lang.management.ThreadMXBean; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * A sampler implementation using Java (WarmRoast). + */ +public class JavaSampler implements Sampler, Runnable { + private static final AtomicInteger THREAD_ID = new AtomicInteger(0); + + /** The worker pool for inserting stack nodes */ + private final ScheduledExecutorService workerPool = Executors.newScheduledThreadPool( + 6, new ThreadFactoryBuilder().setNameFormat("spark-worker-" + THREAD_ID.getAndIncrement() + "-%d").build() + ); + + /** The main sampling task */ + private ScheduledFuture<?> task; + + /** The thread management interface for the current JVM */ + private final ThreadMXBean threadBean = ManagementFactory.getThreadMXBean(); + /** The instance used to generate thread information for use in sampling */ + private final ThreadDumper threadDumper; + /** Responsible for aggregating and then outputting collected sampling data */ + private final JavaDataAggregator dataAggregator; + + /** A future to encapsulate the completion of this sampler instance */ + private final CompletableFuture<JavaSampler> future = new CompletableFuture<>(); + /** The interval to wait between sampling, in microseconds */ + private final int interval; + /** The time when sampling first began */ + private long startTime = -1; + /** The unix timestamp (in millis) when this sampler should automatically complete. */ + private final long endTime; // -1 for nothing + + public JavaSampler(int interval, ThreadDumper threadDumper, ThreadGrouper threadGrouper, long endTime, boolean ignoreSleeping, boolean ignoreNative) { + this.threadDumper = threadDumper; + this.dataAggregator = new SimpleDataAggregator(this.workerPool, threadGrouper, interval, ignoreSleeping, ignoreNative); + this.interval = interval; + this.endTime = endTime; + } + + public JavaSampler(int interval, ThreadDumper threadDumper, ThreadGrouper threadGrouper, long endTime, boolean ignoreSleeping, boolean ignoreNative, TickHook tickHook, int tickLengthThreshold) { + this.threadDumper = threadDumper; + this.dataAggregator = new TickedDataAggregator(this.workerPool, threadGrouper, interval, ignoreSleeping, ignoreNative, tickHook, tickLengthThreshold); + this.interval = interval; + this.endTime = endTime; + } + + @Override + public void start() { + this.startTime = System.currentTimeMillis(); + this.task = this.workerPool.scheduleAtFixedRate(this, 0, this.interval, TimeUnit.MICROSECONDS); + } + + @Override + public long getStartTime() { + if (this.startTime == -1) { + throw new IllegalStateException("Not yet started"); + } + return this.startTime; + } + + @Override + public long getEndTime() { + return this.endTime; + } + + @Override + public CompletableFuture<JavaSampler> getFuture() { + return this.future; + } + + @Override + public void stop() { + this.task.cancel(false); + } + + @Override + public void run() { + // this is effectively synchronized, the worker pool will not allow this task + // to concurrently execute. + try { + if (this.endTime != -1 && this.endTime <= System.currentTimeMillis()) { + this.future.complete(this); + stop(); + return; + } + + ThreadInfo[] threadDumps = this.threadDumper.dumpThreads(this.threadBean); + this.workerPool.execute(new InsertDataTask(this.dataAggregator, threadDumps)); + } catch (Throwable t) { + this.future.completeExceptionally(t); + stop(); + } + } + + private static final class InsertDataTask implements Runnable { + private final JavaDataAggregator dataAggregator; + private final ThreadInfo[] threadDumps; + + InsertDataTask(JavaDataAggregator dataAggregator, ThreadInfo[] threadDumps) { + this.dataAggregator = dataAggregator; + this.threadDumps = threadDumps; + } + + @Override + public void run() { + for (ThreadInfo threadInfo : this.threadDumps) { + if (threadInfo.getThreadName() == null || threadInfo.getStackTrace() == null) { + continue; + } + this.dataAggregator.insertData(threadInfo); + } + } + } + + @Override + public SamplerData toProto(PlatformInfo platformInfo, CommandSender creator, Comparator<? super Map.Entry<String, ThreadNode>> outputOrder, String comment, MergeMode mergeMode) { + final SamplerMetadata.Builder metadata = SamplerMetadata.newBuilder() + .setPlatform(platformInfo.toData().toProto()) + .setUser(creator.toData().toProto()) + .setStartTime(this.startTime) + .setInterval(this.interval) + .setThreadDumper(this.threadDumper.getMetadata()) + .setDataAggregator(this.dataAggregator.getMetadata()); + + if (comment != null) { + metadata.setComment(comment); + } + + SamplerData.Builder proto = SamplerData.newBuilder(); + proto.setMetadata(metadata.build()); + + List<Map.Entry<String, ThreadNode>> data = new ArrayList<>(this.dataAggregator.getData().entrySet()); + data.sort(outputOrder); + + for (Map.Entry<String, ThreadNode> entry : data) { + proto.addThreads(entry.getValue().toProto(mergeMode)); + } + + return proto.build(); + } + +} diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/SimpleDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/SimpleDataAggregator.java index 8463f98..e7113a1 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/SimpleDataAggregator.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/SimpleDataAggregator.java @@ -18,21 +18,19 @@ * along with this program. If not, see <http://www.gnu.org/licenses/>. */ -package me.lucko.spark.common.sampler.aggregator; +package me.lucko.spark.common.sampler.java; import me.lucko.spark.common.sampler.ThreadGrouper; -import me.lucko.spark.common.sampler.node.ThreadNode; +import me.lucko.spark.common.sampler.aggregator.DataAggregator; import me.lucko.spark.proto.SparkProtos.SamplerMetadata; import java.lang.management.ThreadInfo; -import java.util.Map; import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; /** * Basic implementation of {@link DataAggregator}. */ -public class SimpleDataAggregator extends AbstractDataAggregator { +public class SimpleDataAggregator extends JavaDataAggregator { public SimpleDataAggregator(ExecutorService workerPool, ThreadGrouper threadGrouper, int interval, boolean ignoreSleeping, boolean ignoreNative) { super(workerPool, threadGrouper, interval, ignoreSleeping, ignoreNative); } @@ -50,16 +48,4 @@ public class SimpleDataAggregator extends AbstractDataAggregator { writeData(threadInfo); } - @Override - public Map<String, ThreadNode> getData() { - // wait for all pending data to be inserted - this.workerPool.shutdown(); - try { - this.workerPool.awaitTermination(15, TimeUnit.SECONDS); - } catch (InterruptedException e) { - e.printStackTrace(); - } - - return this.threadData; - } } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/TickedDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/TickedDataAggregator.java index 7ad6781..1a0bcdd 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/TickedDataAggregator.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/TickedDataAggregator.java @@ -18,9 +18,10 @@ * along with this program. If not, see <http://www.gnu.org/licenses/>. */ -package me.lucko.spark.common.sampler.aggregator; +package me.lucko.spark.common.sampler.java; import me.lucko.spark.common.sampler.ThreadGrouper; +import me.lucko.spark.common.sampler.aggregator.DataAggregator; import me.lucko.spark.common.sampler.node.ThreadNode; import me.lucko.spark.common.sampler.tick.TickHook; import me.lucko.spark.proto.SparkProtos.SamplerMetadata; @@ -36,7 +37,7 @@ import java.util.concurrent.TimeUnit; * Implementation of {@link DataAggregator} which supports only including sampling data from "ticks" * which exceed a certain threshold in duration. */ -public class TickedDataAggregator extends AbstractDataAggregator { +public class TickedDataAggregator extends JavaDataAggregator { /** Used to monitor the current "tick" of the server */ private final TickHook tickHook; @@ -107,15 +108,7 @@ public class TickedDataAggregator extends AbstractDataAggregator { pushCurrentTick(); } - // wait for all pending data to be inserted - this.workerPool.shutdown(); - try { - this.workerPool.awaitTermination(15, TimeUnit.SECONDS); - } catch (InterruptedException e) { - e.printStackTrace(); - } - - return this.threadData; + return super.getData(); } private final class TickList implements Runnable { diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/node/AbstractNode.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/node/AbstractNode.java index fe2fd62..2ef06d3 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/node/AbstractNode.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/node/AbstractNode.java @@ -21,6 +21,8 @@ package me.lucko.spark.common.sampler.node; +import me.lucko.spark.common.sampler.async.AsyncStackTraceElement; + import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -110,6 +112,34 @@ public abstract class AbstractNode { child.log(elements, offset + 1, time); } + public void log(AsyncStackTraceElement[] elements, long time) { + log(elements, 0, time); + } + + private void log(AsyncStackTraceElement[] elements, int offset, long time) { + this.totalTime.add(time); + + if (offset >= MAX_STACK_DEPTH) { + return; + } + + if (elements.length - offset == 0) { + return; + } + + // the first element in the array is the top of the call stack, and the last is the root + // offset starts at 0. + + // pointer is determined by subtracting the offset from the index of the last element + int pointer = (elements.length - 1) - offset; + AsyncStackTraceElement element = elements[pointer]; + + // resolve a child element within the structure for the element at pointer + AbstractNode child = resolveChild(new StackTraceNode.Description(element.getClassName(), element.getMethodName(), element.getMethodDescription())); + // call the log method on the found child, with an incremented offset. + child.log(elements, offset + 1, time); + } + protected List<StackTraceNode> exportChildren(MergeMode mergeMode) { if (this.children.isEmpty()) { return Collections.emptyList(); diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/node/StackTraceNode.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/node/StackTraceNode.java index a73620e..bd731c1 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/node/StackTraceNode.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/node/StackTraceNode.java @@ -24,6 +24,7 @@ package me.lucko.spark.common.sampler.node; import me.lucko.spark.common.util.MethodDisambiguator; import me.lucko.spark.proto.SparkProtos; +import java.util.Comparator; import java.util.Objects; /** @@ -73,9 +74,13 @@ public final class StackTraceNode extends AbstractNode implements Comparable<Sta proto.setParentLineNumber(this.description.parentLineNumber); } - mergeMode.getMethodDisambiguator().disambiguate(this) - .map(MethodDisambiguator.MethodDescription::getDesc) - .ifPresent(proto::setMethodDesc); + if (this.description.methodDescription != null) { + proto.setMethodDesc(this.description.methodDescription); + } else { + mergeMode.getMethodDisambiguator().disambiguate(this) + .map(MethodDisambiguator.MethodDescription::getDesc) + .ifPresent(proto::setMethodDesc); + } for (StackTraceNode child : exportChildren(mergeMode)) { proto.addChildren(child.toProto(mergeMode)); @@ -104,19 +109,48 @@ public final class StackTraceNode extends AbstractNode implements Comparable<Sta public static final class Description implements Comparable<Description> { private final String className; private final String methodName; + + // async-profiler + private final String methodDescription; + + // Java private final int lineNumber; private final int parentLineNumber; private final int hash; + // Constructor used by the Java sampler public Description(String className, String methodName, int lineNumber, int parentLineNumber) { this.className = className; this.methodName = methodName; + this.methodDescription = null; this.lineNumber = lineNumber; this.parentLineNumber = parentLineNumber; this.hash = Objects.hash(this.className, this.methodName, this.lineNumber, this.parentLineNumber); } + // Constructor used by the async-profiler sampler + public Description(String className, String methodName, String methodDescription) { + this.className = className; + this.methodName = methodName; + this.methodDescription = methodDescription; + this.lineNumber = StackTraceNode.NULL_LINE_NUMBER; + this.parentLineNumber = StackTraceNode.NULL_LINE_NUMBER; + this.hash = Objects.hash(this.className, this.methodName, this.methodDescription); + } + + private static <T extends Comparable<T>> int nullCompare(T a, T b) { + if (a == null && b == null) { + return 0; + } else if (a == null) { + return -1; + } else if (b == null) { + return 1; + } else { + return a.compareTo(b); + } + } + @Override public int compareTo(Description that) { if (this == that) { @@ -133,6 +167,18 @@ public final class StackTraceNode extends AbstractNode implements Comparable<Sta return i; } + i = nullCompare(this.methodDescription, that.methodDescription); + if (i != 0) { + return i; + } + + if (this.methodDescription != null && that.methodDescription != null) { + i = this.methodDescription.compareTo(that.methodDescription); + if (i != 0) { + return i; + } + } + i = Integer.compare(this.lineNumber, that.lineNumber); if (i != 0) { return i; @@ -146,10 +192,12 @@ public final class StackTraceNode extends AbstractNode implements Comparable<Sta if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; Description description = (Description) o; - return this.lineNumber == description.lineNumber && + return this.hash == description.hash && + this.lineNumber == description.lineNumber && this.parentLineNumber == description.parentLineNumber && this.className.equals(description.className) && - this.methodName.equals(description.methodName); + this.methodName.equals(description.methodName) && + Objects.equals(this.methodDescription, description.methodDescription); } @Override |