From d31f3c7bdf03c874ff9518d47d060adc18322d6b Mon Sep 17 00:00:00 2001 From: lucko Date: Fri, 7 Oct 2022 20:26:24 +0100 Subject: Split profiler output into windows (#253) --- .../common/command/modules/SamplerModule.java | 14 +- .../spark/common/sampler/AbstractSampler.java | 42 ++-- .../me/lucko/spark/common/sampler/Sampler.java | 4 +- .../spark/common/sampler/ThreadNodeOrder.java | 52 ---- .../common/sampler/async/AsyncDataAggregator.java | 4 +- .../common/sampler/async/AsyncProfilerAccess.java | 4 +- .../common/sampler/async/AsyncProfilerJob.java | 264 ++++++++++++++++++++ .../spark/common/sampler/async/AsyncSampler.java | 255 ++++++-------------- .../common/sampler/async/JfrParsingException.java | 27 +++ .../spark/common/sampler/async/ProfileSegment.java | 50 ++++ .../spark/common/sampler/async/jfr/Dictionary.java | 4 + .../common/sampler/java/JavaDataAggregator.java | 7 +- .../spark/common/sampler/java/JavaSampler.java | 56 ++++- .../common/sampler/java/SimpleDataAggregator.java | 4 +- .../common/sampler/java/TickedDataAggregator.java | 41 ++-- .../spark/common/sampler/node/AbstractNode.java | 70 +++--- .../spark/common/sampler/node/StackTraceNode.java | 77 +----- .../spark/common/sampler/node/ThreadNode.java | 42 +++- .../sampler/window/ProfilingWindowUtils.java | 36 +++ .../common/sampler/window/ProtoTimeEncoder.java | 94 ++++++++ .../sampler/window/WindowStatisticsCollector.java | 267 +++++++++++++++++++++ spark-common/src/main/proto/spark/spark.proto | 9 + .../src/main/proto/spark/spark_sampler.proto | 15 +- 23 files changed, 1027 insertions(+), 411 deletions(-) delete mode 100644 spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadNodeOrder.java create mode 100644 spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerJob.java create mode 100644 spark-common/src/main/java/me/lucko/spark/common/sampler/async/JfrParsingException.java create mode 100644 spark-common/src/main/java/me/lucko/spark/common/sampler/window/ProfilingWindowUtils.java create mode 100644 spark-common/src/main/java/me/lucko/spark/common/sampler/window/ProtoTimeEncoder.java create mode 100644 spark-common/src/main/java/me/lucko/spark/common/sampler/window/WindowStatisticsCollector.java (limited to 'spark-common/src/main') diff --git a/spark-common/src/main/java/me/lucko/spark/common/command/modules/SamplerModule.java b/spark-common/src/main/java/me/lucko/spark/common/command/modules/SamplerModule.java index 2afed64..c1e4981 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/command/modules/SamplerModule.java +++ b/spark-common/src/main/java/me/lucko/spark/common/command/modules/SamplerModule.java @@ -35,7 +35,6 @@ import me.lucko.spark.common.sampler.Sampler; import me.lucko.spark.common.sampler.SamplerBuilder; import me.lucko.spark.common.sampler.ThreadDumper; import me.lucko.spark.common.sampler.ThreadGrouper; -import me.lucko.spark.common.sampler.ThreadNodeOrder; import me.lucko.spark.common.sampler.async.AsyncSampler; import me.lucko.spark.common.sampler.node.MergeMode; import me.lucko.spark.common.sampler.source.ClassSourceLookup; @@ -94,7 +93,6 @@ public class SamplerModule implements CommandModule { .argumentUsage("not-combined", null) .argumentUsage("force-java-sampler", null) .argumentUsage("stop --comment", "comment") - .argumentUsage("stop --order-by-time", null) .argumentUsage("stop --save-to-file", null) .executor(this::profiler) .tabCompleter((platform, sender, arguments) -> { @@ -103,7 +101,7 @@ public class SamplerModule implements CommandModule { } if (arguments.contains("--stop") || arguments.contains("--upload")) { - return TabCompleter.completeForOpts(arguments, "--order-by-time", "--comment", "--save-to-file"); + return TabCompleter.completeForOpts(arguments, "--comment", "--save-to-file"); } List opts = new ArrayList<>(Arrays.asList("--info", "--stop", "--cancel", @@ -249,14 +247,13 @@ public class SamplerModule implements CommandModule { // await the result if (timeoutSeconds != -1) { - ThreadNodeOrder threadOrder = arguments.boolFlag("order-by-time") ? ThreadNodeOrder.BY_TIME : ThreadNodeOrder.BY_NAME; String comment = Iterables.getFirst(arguments.stringFlag("comment"), null); MethodDisambiguator methodDisambiguator = new MethodDisambiguator(); MergeMode mergeMode = arguments.boolFlag("separate-parent-calls") ? MergeMode.separateParentCalls(methodDisambiguator) : MergeMode.sameMethod(methodDisambiguator); boolean saveToFile = arguments.boolFlag("save-to-file"); future.thenAcceptAsync(s -> { resp.broadcastPrefixed(text("The active profiler has completed! Uploading results...")); - handleUpload(platform, resp, s, threadOrder, comment, mergeMode, saveToFile); + handleUpload(platform, resp, s, comment, mergeMode, saveToFile); }); } } @@ -293,18 +290,17 @@ public class SamplerModule implements CommandModule { } else { this.activeSampler.stop(); resp.broadcastPrefixed(text("The active profiler has been stopped! Uploading results...")); - ThreadNodeOrder threadOrder = arguments.boolFlag("order-by-time") ? ThreadNodeOrder.BY_TIME : ThreadNodeOrder.BY_NAME; String comment = Iterables.getFirst(arguments.stringFlag("comment"), null); MethodDisambiguator methodDisambiguator = new MethodDisambiguator(); MergeMode mergeMode = arguments.boolFlag("separate-parent-calls") ? MergeMode.separateParentCalls(methodDisambiguator) : MergeMode.sameMethod(methodDisambiguator); boolean saveToFile = arguments.boolFlag("save-to-file"); - handleUpload(platform, resp, this.activeSampler, threadOrder, comment, mergeMode, saveToFile); + handleUpload(platform, resp, this.activeSampler, comment, mergeMode, saveToFile); this.activeSampler = null; } } - private void handleUpload(SparkPlatform platform, CommandResponseHandler resp, Sampler sampler, ThreadNodeOrder threadOrder, String comment, MergeMode mergeMode, boolean saveToFileFlag) { - SparkSamplerProtos.SamplerData output = sampler.toProto(platform, resp.sender(), threadOrder, comment, mergeMode, ClassSourceLookup.create(platform)); + private void handleUpload(SparkPlatform platform, CommandResponseHandler resp, Sampler sampler, String comment, MergeMode mergeMode, boolean saveToFileFlag) { + SparkSamplerProtos.SamplerData output = sampler.toProto(platform, resp.sender(), comment, mergeMode, ClassSourceLookup.create(platform)); boolean saveToFile = false; if (saveToFileFlag) { diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/AbstractSampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/AbstractSampler.java index 6fc5a10..c650738 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/AbstractSampler.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/AbstractSampler.java @@ -30,7 +30,10 @@ import me.lucko.spark.common.sampler.node.MergeMode; import me.lucko.spark.common.sampler.node.ThreadNode; import me.lucko.spark.common.sampler.source.ClassSourceLookup; import me.lucko.spark.common.sampler.source.SourceMetadata; +import me.lucko.spark.common.sampler.window.ProtoTimeEncoder; +import me.lucko.spark.common.sampler.window.WindowStatisticsCollector; import me.lucko.spark.common.tick.TickHook; +import me.lucko.spark.proto.SparkProtos; import me.lucko.spark.proto.SparkSamplerProtos.SamplerData; import me.lucko.spark.proto.SparkSamplerProtos.SamplerMetadata; @@ -58,12 +61,12 @@ public abstract class AbstractSampler implements Sampler { /** The time when sampling first began */ protected long startTime = -1; - /** The game tick when sampling first began */ - protected int startTick = -1; - /** The unix timestamp (in millis) when this sampler should automatically complete. */ protected final long autoEndTime; // -1 for nothing + /** Collects statistics for each window in the sample */ + protected final WindowStatisticsCollector windowStatisticsCollector; + /** A future to encapsulate the completion of this sampler instance */ protected final CompletableFuture future = new CompletableFuture<>(); @@ -75,6 +78,7 @@ public abstract class AbstractSampler implements Sampler { this.interval = interval; this.threadDumper = threadDumper; this.autoEndTime = autoEndTime; + this.windowStatisticsCollector = new WindowStatisticsCollector(platform); } @Override @@ -106,11 +110,11 @@ public abstract class AbstractSampler implements Sampler { @Override public void start() { this.startTime = System.currentTimeMillis(); + } - TickHook tickHook = this.platform.getTickHook(); - if (tickHook != null) { - this.startTick = tickHook.getCurrentTick(); - } + @Override + public void stop() { + this.windowStatisticsCollector.stop(); } protected void writeMetadataToProto(SamplerData.Builder proto, SparkPlatform platform, CommandSender creator, String comment, DataAggregator dataAggregator) { @@ -127,12 +131,9 @@ public abstract class AbstractSampler implements Sampler { metadata.setComment(comment); } - if (this.startTick != -1) { - TickHook tickHook = this.platform.getTickHook(); - if (tickHook != null) { - int numberOfTicks = tickHook.getCurrentTick() - this.startTick; - metadata.setNumberOfTicks(numberOfTicks); - } + int totalTicks = this.windowStatisticsCollector.getTotalTicks(); + if (totalTicks != -1) { + metadata.setNumberOfTicks(totalTicks); } try { @@ -171,14 +172,23 @@ public abstract class AbstractSampler implements Sampler { proto.setMetadata(metadata); } - protected void writeDataToProto(SamplerData.Builder proto, DataAggregator dataAggregator, Comparator outputOrder, MergeMode mergeMode, ClassSourceLookup classSourceLookup) { + protected void writeDataToProto(SamplerData.Builder proto, DataAggregator dataAggregator, MergeMode mergeMode, ClassSourceLookup classSourceLookup) { List data = dataAggregator.exportData(); - data.sort(outputOrder); + data.sort(Comparator.comparing(ThreadNode::getThreadLabel)); ClassSourceLookup.Visitor classSourceVisitor = ClassSourceLookup.createVisitor(classSourceLookup); + ProtoTimeEncoder timeEncoder = new ProtoTimeEncoder(data); + int[] timeWindows = timeEncoder.getKeys(); + for (int timeWindow : timeWindows) { + proto.addTimeWindows(timeWindow); + } + + this.windowStatisticsCollector.ensureHasStatisticsForAllWindows(timeWindows); + proto.putAllTimeWindowStatistics(this.windowStatisticsCollector.export()); + for (ThreadNode entry : data) { - proto.addThreads(entry.toProto(mergeMode)); + proto.addThreads(entry.toProto(mergeMode, timeEncoder)); classSourceVisitor.visit(entry); } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java index 98281de..e06cba6 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java @@ -23,11 +23,9 @@ package me.lucko.spark.common.sampler; import me.lucko.spark.common.SparkPlatform; import me.lucko.spark.common.command.sender.CommandSender; import me.lucko.spark.common.sampler.node.MergeMode; -import me.lucko.spark.common.sampler.node.ThreadNode; import me.lucko.spark.common.sampler.source.ClassSourceLookup; import me.lucko.spark.proto.SparkSamplerProtos.SamplerData; -import java.util.Comparator; import java.util.concurrent.CompletableFuture; /** @@ -67,6 +65,6 @@ public interface Sampler { CompletableFuture getFuture(); // Methods used to export the sampler data to the web viewer. - SamplerData toProto(SparkPlatform platform, CommandSender creator, Comparator outputOrder, String comment, MergeMode mergeMode, ClassSourceLookup classSourceLookup); + SamplerData toProto(SparkPlatform platform, CommandSender creator, String comment, MergeMode mergeMode, ClassSourceLookup classSourceLookup); } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadNodeOrder.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadNodeOrder.java deleted file mode 100644 index adcedcd..0000000 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadNodeOrder.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * This file is part of spark. - * - * Copyright (c) lucko (Luck) - * Copyright (c) contributors - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ - -package me.lucko.spark.common.sampler; - -import me.lucko.spark.common.sampler.node.ThreadNode; - -import java.util.Comparator; - -/** - * Methods of ordering {@link ThreadNode}s in the output data. - */ -public enum ThreadNodeOrder implements Comparator { - - /** - * Order by the name of the thread (alphabetically) - */ - BY_NAME { - @Override - public int compare(ThreadNode o1, ThreadNode o2) { - return o1.getThreadLabel().compareTo(o2.getThreadLabel()); - } - }, - - /** - * Order by the time taken by the thread (most time taken first) - */ - BY_TIME { - @Override - public int compare(ThreadNode o1, ThreadNode o2) { - return -Double.compare(o1.getTotalTime(), o2.getTotalTime()); - } - } - -} diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncDataAggregator.java index 3de3943..402330a 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncDataAggregator.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncDataAggregator.java @@ -47,10 +47,10 @@ public class AsyncDataAggregator extends AbstractDataAggregator { .build(); } - public void insertData(ProfileSegment element) { + public void insertData(ProfileSegment element, int window) { try { ThreadNode node = getNode(this.threadGrouper.getGroup(element.getNativeThreadId(), element.getThreadName())); - node.log(STACK_TRACE_DESCRIBER, element.getStackTrace(), element.getTime()); + node.log(STACK_TRACE_DESCRIBER, element.getStackTrace(), element.getTime(), window); } catch (Exception e) { e.printStackTrace(); } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerAccess.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerAccess.java index abde21d..1480650 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerAccess.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerAccess.java @@ -87,11 +87,11 @@ public class AsyncProfilerAccess { this.setupException = setupException; } - public AsyncProfiler getProfiler() { + public AsyncProfilerJob startNewProfilerJob() { if (this.profiler == null) { throw new UnsupportedOperationException("async-profiler not supported", this.setupException); } - return this.profiler; + return AsyncProfilerJob.createNew(this, this.profiler); } public ProfilingEvent getProfilingEvent() { diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerJob.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerJob.java new file mode 100644 index 0000000..7b123a7 --- /dev/null +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerJob.java @@ -0,0 +1,264 @@ +/* + * This file is part of spark. + * + * Copyright (c) lucko (Luck) + * Copyright (c) contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +package me.lucko.spark.common.sampler.async; + +import me.lucko.spark.common.SparkPlatform; +import me.lucko.spark.common.sampler.ThreadDumper; +import me.lucko.spark.common.sampler.async.jfr.JfrReader; + +import one.profiler.AsyncProfiler; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Predicate; + +/** + * Represents a profiling job within async-profiler. + * + *

Only one job can be running at a time. This is guarded by + * {@link #createNew(AsyncProfilerAccess, AsyncProfiler)}.

+ */ +public class AsyncProfilerJob { + + /** + * The currently active job. + */ + private static final AtomicReference ACTIVE = new AtomicReference<>(); + + /** + * Creates a new {@link AsyncProfilerJob}. + * + *

Will throw an {@link IllegalStateException} if another job is already active.

+ * + * @param access the profiler access object + * @param profiler the profiler + * @return the job + */ + static AsyncProfilerJob createNew(AsyncProfilerAccess access, AsyncProfiler profiler) { + synchronized (ACTIVE) { + AsyncProfilerJob existing = ACTIVE.get(); + if (existing != null) { + throw new IllegalStateException("Another profiler is already active: " + existing); + } + + AsyncProfilerJob job = new AsyncProfilerJob(access, profiler); + ACTIVE.set(job); + return job; + } + } + + /** The async-profiler access object */ + private final AsyncProfilerAccess access; + /** The async-profiler instance */ + private final AsyncProfiler profiler; + + // Set on init + /** The platform */ + private SparkPlatform platform; + /** The sampling interval in microseconds */ + private int interval; + /** The thread dumper */ + private ThreadDumper threadDumper; + /** The profiling window */ + private int window; + + /** The file used by async-profiler to output data */ + private Path outputFile; + + private AsyncProfilerJob(AsyncProfilerAccess access, AsyncProfiler profiler) { + this.access = access; + this.profiler = profiler; + } + + /** + * Executes an async-profiler command. + * + * @param command the command + * @return the output + */ + private String execute(String command) { + try { + return this.profiler.execute(command); + } catch (IOException e) { + throw new RuntimeException("Exception whilst executing profiler command", e); + } + } + + /** + * Checks to ensure that this job is still active. + */ + private void checkActive() { + if (ACTIVE.get() != this) { + throw new IllegalStateException("Profiler job no longer active!"); + } + } + + // Initialise the job + public void init(SparkPlatform platform, int interval, ThreadDumper threadDumper, int window) { + this.platform = platform; + this.interval = interval; + this.threadDumper = threadDumper; + this.window = window; + } + + /** + * Starts the job. + */ + public void start() { + checkActive(); + + try { + // create a new temporary output file + try { + this.outputFile = this.platform.getTemporaryFiles().create("spark-", "-profile-data.jfr.tmp"); + } catch (IOException e) { + throw new RuntimeException("Unable to create temporary output file", e); + } + + // construct a command to send to async-profiler + String command = "start,event=" + this.access.getProfilingEvent() + ",interval=" + this.interval + "us,threads,jfr,file=" + this.outputFile.toString(); + if (this.threadDumper instanceof ThreadDumper.Specific) { + command += ",filter"; + } + + // start the profiler + String resp = execute(command).trim(); + + if (!resp.equalsIgnoreCase("profiling started")) { + throw new RuntimeException("Unexpected response: " + resp); + } + + // append threads to be profiled, if necessary + if (this.threadDumper instanceof ThreadDumper.Specific) { + ThreadDumper.Specific threadDumper = (ThreadDumper.Specific) this.threadDumper; + for (Thread thread : threadDumper.getThreads()) { + this.profiler.addThread(thread); + } + } + + } catch (Exception e) { + try { + this.profiler.stop(); + } catch (Exception e2) { + // ignore + } + close(); + + throw e; + } + } + + /** + * Stops the job. + */ + public void stop() { + checkActive(); + + try { + this.profiler.stop(); + } catch (IllegalStateException e) { + if (!e.getMessage().equals("Profiler is not active")) { // ignore + throw e; + } + } finally { + close(); + } + } + + /** + * Aggregates the collected data. + */ + public void aggregate(AsyncDataAggregator dataAggregator) { + + Predicate threadFilter; + if (this.threadDumper instanceof ThreadDumper.Specific) { + ThreadDumper.Specific specificDumper = (ThreadDumper.Specific) this.threadDumper; + threadFilter = n -> specificDumper.getThreadNames().contains(n.toLowerCase()); + } else { + threadFilter = n -> true; + } + + // read the jfr file produced by async-profiler + try (JfrReader reader = new JfrReader(this.outputFile)) { + readSegments(reader, threadFilter, dataAggregator, this.window); + } catch (Exception e) { + boolean fileExists; + try { + fileExists = Files.exists(this.outputFile) && Files.size(this.outputFile) != 0; + } catch (IOException ex) { + fileExists = false; + } + + if (fileExists) { + throw new JfrParsingException("Error parsing JFR data from profiler output", e); + } else { + throw new JfrParsingException("Error parsing JFR data from profiler output - file " + this.outputFile + " does not exist!", e); + } + } + + // delete the output file after reading + try { + Files.deleteIfExists(this.outputFile); + } catch (IOException e) { + // ignore + } + + } + + private void readSegments(JfrReader reader, Predicate threadFilter, AsyncDataAggregator dataAggregator, int window) throws IOException { + List samples = reader.readAllEvents(JfrReader.ExecutionSample.class); + for (int i = 0; i < samples.size(); i++) { + JfrReader.ExecutionSample sample = samples.get(i); + + long duration; + if (i == 0) { + // we don't really know the duration of the first sample, so just use the sampling + // interval + duration = this.interval; + } else { + // calculate the duration of the sample by calculating the time elapsed since the + // previous sample + duration = TimeUnit.NANOSECONDS.toMicros(sample.time - samples.get(i - 1).time); + } + + String threadName = reader.threads.get(sample.tid); + if (!threadFilter.test(threadName)) { + continue; + } + + // parse the segment and give it to the data aggregator + ProfileSegment segment = ProfileSegment.parseSegment(reader, sample, threadName, duration); + dataAggregator.insertData(segment, window); + } + } + + public int getWindow() { + return this.window; + } + + private void close() { + ACTIVE.compareAndSet(this, null); + } +} diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java index 7d9cb81..2c9bb5f 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java @@ -27,61 +27,41 @@ import me.lucko.spark.common.command.sender.CommandSender; import me.lucko.spark.common.sampler.AbstractSampler; import me.lucko.spark.common.sampler.ThreadDumper; import me.lucko.spark.common.sampler.ThreadGrouper; -import me.lucko.spark.common.sampler.async.jfr.JfrReader; import me.lucko.spark.common.sampler.node.MergeMode; -import me.lucko.spark.common.sampler.node.ThreadNode; import me.lucko.spark.common.sampler.source.ClassSourceLookup; +import me.lucko.spark.common.sampler.window.ProfilingWindowUtils; +import me.lucko.spark.common.tick.TickHook; import me.lucko.spark.proto.SparkSamplerProtos.SamplerData; -import one.profiler.AsyncProfiler; - -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.Comparator; -import java.util.List; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.function.Predicate; /** * A sampler implementation using async-profiler. */ public class AsyncSampler extends AbstractSampler { - private final AsyncProfiler profiler; + private final AsyncProfilerAccess profilerAccess; /** Responsible for aggregating and then outputting collected sampling data */ private final AsyncDataAggregator dataAggregator; - /** Flag to mark if the output has been completed */ - private boolean outputComplete = false; + /** Mutex for the current profiler job */ + private final Object[] currentJobMutex = new Object[0]; - /** The temporary output file */ - private Path outputFile; + /** Current profiler job */ + private AsyncProfilerJob currentJob; - /** The executor used for timeouts */ - private ScheduledExecutorService timeoutExecutor; + /** The executor used for scheduling and management */ + private ScheduledExecutorService scheduler; public AsyncSampler(SparkPlatform platform, int interval, ThreadDumper threadDumper, ThreadGrouper threadGrouper, long endTime) { super(platform, interval, threadDumper, endTime); - this.profiler = AsyncProfilerAccess.getInstance(platform).getProfiler(); + this.profilerAccess = AsyncProfilerAccess.getInstance(platform); this.dataAggregator = new AsyncDataAggregator(threadGrouper); - } - - /** - * Executes a profiler command. - * - * @param command the command to execute - * @return the response - */ - private String execute(String command) { - try { - return this.profiler.execute(command); - } catch (IOException e) { - throw new RuntimeException("Exception whilst executing profiler command", e); - } + this.scheduler = Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder().setNameFormat("spark-asyncsampler-worker-thread").build() + ); } /** @@ -91,33 +71,58 @@ public class AsyncSampler extends AbstractSampler { public void start() { super.start(); - try { - this.outputFile = this.platform.getTemporaryFiles().create("spark-", "-profile-data.jfr.tmp"); - } catch (IOException e) { - throw new RuntimeException("Unable to create temporary output file", e); + TickHook tickHook = this.platform.getTickHook(); + if (tickHook != null) { + this.windowStatisticsCollector.startCountingTicks(tickHook); } - String command = "start,event=" + AsyncProfilerAccess.getInstance(this.platform).getProfilingEvent() + ",interval=" + this.interval + "us,threads,jfr,file=" + this.outputFile.toString(); - if (this.threadDumper instanceof ThreadDumper.Specific) { - command += ",filter"; - } + int window = ProfilingWindowUtils.unixMillisToWindow(System.currentTimeMillis()); - String resp = execute(command).trim(); - if (!resp.equalsIgnoreCase("profiling started")) { - throw new RuntimeException("Unexpected response: " + resp); - } + AsyncProfilerJob job = this.profilerAccess.startNewProfilerJob(); + job.init(this.platform, this.interval, this.threadDumper, window); + job.start(); + this.currentJob = job; - if (this.threadDumper instanceof ThreadDumper.Specific) { - ThreadDumper.Specific threadDumper = (ThreadDumper.Specific) this.threadDumper; - for (Thread thread : threadDumper.getThreads()) { - this.profiler.addThread(thread); - } - } + // rotate the sampler job every minute to put data into a new window + this.scheduler.scheduleAtFixedRate(this::rotateProfilerJob, 1, 1, TimeUnit.MINUTES); recordInitialGcStats(); scheduleTimeout(); } + private void rotateProfilerJob() { + try { + synchronized (this.currentJobMutex) { + AsyncProfilerJob previousJob = this.currentJob; + if (previousJob == null) { + return; + } + + try { + // stop the previous job + previousJob.stop(); + + // collect statistics for the window + this.windowStatisticsCollector.measureNow(previousJob.getWindow()); + } catch (Exception e) { + e.printStackTrace(); + } + + // start a new job + int window = previousJob.getWindow() + 1; + AsyncProfilerJob newJob = this.profilerAccess.startNewProfilerJob(); + newJob.init(this.platform, this.interval, this.threadDumper, window); + newJob.start(); + this.currentJob = newJob; + + // aggregate the output of the previous job + previousJob.aggregate(this.dataAggregator); + } + } catch (Throwable e) { + e.printStackTrace(); + } + } + private void scheduleTimeout() { if (this.autoEndTime == -1) { return; @@ -128,11 +133,7 @@ public class AsyncSampler extends AbstractSampler { return; } - this.timeoutExecutor = Executors.newSingleThreadScheduledExecutor( - new ThreadFactoryBuilder().setNameFormat("spark-asyncsampler-timeout-thread").build() - ); - - this.timeoutExecutor.schedule(() -> { + this.scheduler.schedule(() -> { stop(); this.future.complete(this); }, delay, TimeUnit.MILLISECONDS); @@ -143,145 +144,27 @@ public class AsyncSampler extends AbstractSampler { */ @Override public void stop() { - try { - this.profiler.stop(); - } catch (IllegalStateException e) { - if (!e.getMessage().equals("Profiler is not active")) { // ignore - throw e; - } - } + super.stop(); + synchronized (this.currentJobMutex) { + this.currentJob.stop(); + this.windowStatisticsCollector.measureNow(this.currentJob.getWindow()); + this.currentJob.aggregate(this.dataAggregator); + this.currentJob = null; + } - if (this.timeoutExecutor != null) { - this.timeoutExecutor.shutdown(); - this.timeoutExecutor = null; + if (this.scheduler != null) { + this.scheduler.shutdown(); + this.scheduler = null; } } @Override - public SamplerData toProto(SparkPlatform platform, CommandSender creator, Comparator outputOrder, String comment, MergeMode mergeMode, ClassSourceLookup classSourceLookup) { + public SamplerData toProto(SparkPlatform platform, CommandSender creator, String comment, MergeMode mergeMode, ClassSourceLookup classSourceLookup) { SamplerData.Builder proto = SamplerData.newBuilder(); writeMetadataToProto(proto, platform, creator, comment, this.dataAggregator); - aggregateOutput(); - writeDataToProto(proto, this.dataAggregator, outputOrder, mergeMode, classSourceLookup); + writeDataToProto(proto, this.dataAggregator, mergeMode, classSourceLookup); return proto.build(); } - private void aggregateOutput() { - if (this.outputComplete) { - return; - } - this.outputComplete = true; - - Predicate threadFilter; - if (this.threadDumper instanceof ThreadDumper.Specific) { - ThreadDumper.Specific threadDumper = (ThreadDumper.Specific) this.threadDumper; - threadFilter = n -> threadDumper.getThreadNames().contains(n.toLowerCase()); - } else { - threadFilter = n -> true; - } - - // read the jfr file produced by async-profiler - try (JfrReader reader = new JfrReader(this.outputFile)) { - readSegments(reader, threadFilter); - } catch (Exception e) { - boolean fileExists; - try { - fileExists = Files.exists(this.outputFile) && Files.size(this.outputFile) != 0; - } catch (IOException ex) { - fileExists = false; - } - - if (fileExists) { - throw new JfrParsingException("Error parsing JFR data from profiler output", e); - } else { - throw new JfrParsingException("Error parsing JFR data from profiler output - file " + this.outputFile + " does not exist!", e); - } - } - - // delete the output file after reading - try { - Files.deleteIfExists(this.outputFile); - } catch (IOException e) { - // ignore - } - } - - private void readSegments(JfrReader reader, Predicate threadFilter) throws IOException { - List samples = reader.readAllEvents(JfrReader.ExecutionSample.class); - for (int i = 0; i < samples.size(); i++) { - JfrReader.ExecutionSample sample = samples.get(i); - - long duration; - if (i == 0) { - // we don't really know the duration of the first sample, so just use the sampling - // interval - duration = this.interval; - } else { - // calculate the duration of the sample by calculating the time elapsed since the - // previous sample - duration = TimeUnit.NANOSECONDS.toMicros(sample.time - samples.get(i - 1).time); - } - - String threadName = reader.threads.get(sample.tid); - if (!threadFilter.test(threadName)) { - continue; - } - - // parse the segment and give it to the data aggregator - ProfileSegment segment = parseSegment(reader, sample, threadName, duration); - this.dataAggregator.insertData(segment); - } - } - - private static ProfileSegment parseSegment(JfrReader reader, JfrReader.ExecutionSample sample, String threadName, long duration) { - JfrReader.StackTrace stackTrace = reader.stackTraces.get(sample.stackTraceId); - int len = stackTrace.methods.length; - - AsyncStackTraceElement[] stack = new AsyncStackTraceElement[len]; - for (int i = 0; i < len; i++) { - stack[i] = parseStackFrame(reader, stackTrace.methods[i]); - } - - return new ProfileSegment(sample.tid, threadName, stack, duration); - } - - private static AsyncStackTraceElement parseStackFrame(JfrReader reader, long methodId) { - AsyncStackTraceElement result = reader.stackFrames.get(methodId); - if (result != null) { - return result; - } - - JfrReader.MethodRef methodRef = reader.methods.get(methodId); - JfrReader.ClassRef classRef = reader.classes.get(methodRef.cls); - - byte[] className = reader.symbols.get(classRef.name); - byte[] methodName = reader.symbols.get(methodRef.name); - - if (className == null || className.length == 0) { - // native call - result = new AsyncStackTraceElement( - AsyncStackTraceElement.NATIVE_CALL, - new String(methodName, StandardCharsets.UTF_8), - null - ); - } else { - // java method - byte[] methodDesc = reader.symbols.get(methodRef.sig); - result = new AsyncStackTraceElement( - new String(className, StandardCharsets.UTF_8).replace('/', '.'), - new String(methodName, StandardCharsets.UTF_8), - new String(methodDesc, StandardCharsets.UTF_8) - ); - } - - reader.stackFrames.put(methodId, result); - return result; - } - - private static final class JfrParsingException extends RuntimeException { - public JfrParsingException(String message, Throwable cause) { - super(message, cause); - } - } } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/JfrParsingException.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/JfrParsingException.java new file mode 100644 index 0000000..6dab359 --- /dev/null +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/JfrParsingException.java @@ -0,0 +1,27 @@ +/* + * This file is part of spark. + * + * Copyright (c) lucko (Luck) + * Copyright (c) contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +package me.lucko.spark.common.sampler.async; + +public class JfrParsingException extends RuntimeException { + public JfrParsingException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/ProfileSegment.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/ProfileSegment.java index 154e6fe..26debaf 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/ProfileSegment.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/ProfileSegment.java @@ -20,6 +20,10 @@ package me.lucko.spark.common.sampler.async; +import me.lucko.spark.common.sampler.async.jfr.JfrReader; + +import java.nio.charset.StandardCharsets; + /** * Represents a profile "segment". * @@ -58,4 +62,50 @@ public class ProfileSegment { public long getTime() { return this.time; } + + public static ProfileSegment parseSegment(JfrReader reader, JfrReader.ExecutionSample sample, String threadName, long duration) { + JfrReader.StackTrace stackTrace = reader.stackTraces.get(sample.stackTraceId); + int len = stackTrace.methods.length; + + AsyncStackTraceElement[] stack = new AsyncStackTraceElement[len]; + for (int i = 0; i < len; i++) { + stack[i] = parseStackFrame(reader, stackTrace.methods[i]); + } + + return new ProfileSegment(sample.tid, threadName, stack, duration); + } + + private static AsyncStackTraceElement parseStackFrame(JfrReader reader, long methodId) { + AsyncStackTraceElement result = reader.stackFrames.get(methodId); + if (result != null) { + return result; + } + + JfrReader.MethodRef methodRef = reader.methods.get(methodId); + JfrReader.ClassRef classRef = reader.classes.get(methodRef.cls); + + byte[] className = reader.symbols.get(classRef.name); + byte[] methodName = reader.symbols.get(methodRef.name); + + if (className == null || className.length == 0) { + // native call + result = new AsyncStackTraceElement( + AsyncStackTraceElement.NATIVE_CALL, + new String(methodName, StandardCharsets.UTF_8), + null + ); + } else { + // java method + byte[] methodDesc = reader.symbols.get(methodRef.sig); + result = new AsyncStackTraceElement( + new String(className, StandardCharsets.UTF_8).replace('/', '.'), + new String(methodName, StandardCharsets.UTF_8), + new String(methodDesc, StandardCharsets.UTF_8) + ); + } + + reader.stackFrames.put(methodId, result); + return result; + } + } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/Dictionary.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/Dictionary.java index 23223a2..60f6543 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/Dictionary.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/Dictionary.java @@ -37,6 +37,10 @@ public class Dictionary { size = 0; } + public int size() { + return this.size; + } + public void put(long key, T value) { if (key == 0) { throw new IllegalArgumentException("Zero key not allowed"); diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaDataAggregator.java index cc530d6..c51ec05 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaDataAggregator.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaDataAggregator.java @@ -66,10 +66,11 @@ public abstract class JavaDataAggregator extends AbstractDataAggregator { * Inserts sampling data into this aggregator * * @param threadInfo the thread info + * @param window the window */ - public abstract void insertData(ThreadInfo threadInfo); + public abstract void insertData(ThreadInfo threadInfo, int window); - protected void writeData(ThreadInfo threadInfo) { + protected void writeData(ThreadInfo threadInfo, int window) { if (this.ignoreSleeping && isSleeping(threadInfo)) { return; } @@ -79,7 +80,7 @@ public abstract class JavaDataAggregator extends AbstractDataAggregator { try { ThreadNode node = getNode(this.threadGrouper.getGroup(threadInfo.getThreadId(), threadInfo.getThreadName())); - node.log(STACK_TRACE_DESCRIBER, threadInfo.getStackTrace(), this.interval); + node.log(STACK_TRACE_DESCRIBER, threadInfo.getStackTrace(), this.interval, window); } catch (Exception e) { e.printStackTrace(); } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java index 0f73a9f..8c96fd3 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java @@ -28,15 +28,17 @@ import me.lucko.spark.common.sampler.AbstractSampler; import me.lucko.spark.common.sampler.ThreadDumper; import me.lucko.spark.common.sampler.ThreadGrouper; import me.lucko.spark.common.sampler.node.MergeMode; -import me.lucko.spark.common.sampler.node.ThreadNode; import me.lucko.spark.common.sampler.source.ClassSourceLookup; +import me.lucko.spark.common.sampler.window.ProfilingWindowUtils; +import me.lucko.spark.common.sampler.window.WindowStatisticsCollector; import me.lucko.spark.common.tick.TickHook; import me.lucko.spark.proto.SparkSamplerProtos.SamplerData; +import org.checkerframework.checker.units.qual.A; + import java.lang.management.ManagementFactory; import java.lang.management.ThreadInfo; import java.lang.management.ThreadMXBean; -import java.util.Comparator; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; @@ -62,6 +64,9 @@ public class JavaSampler extends AbstractSampler implements Runnable { /** Responsible for aggregating and then outputting collected sampling data */ private final JavaDataAggregator dataAggregator; + + /** The last window that was profiled */ + private final AtomicInteger lastWindow = new AtomicInteger(); public JavaSampler(SparkPlatform platform, int interval, ThreadDumper threadDumper, ThreadGrouper threadGrouper, long endTime, boolean ignoreSleeping, boolean ignoreNative) { super(platform, interval, threadDumper, endTime); @@ -76,12 +81,28 @@ public class JavaSampler extends AbstractSampler implements Runnable { @Override public void start() { super.start(); + + TickHook tickHook = this.platform.getTickHook(); + if (tickHook != null) { + if (this.dataAggregator instanceof TickedDataAggregator) { + WindowStatisticsCollector.ExplicitTickCounter counter = this.windowStatisticsCollector.startCountingTicksExplicit(tickHook); + ((TickedDataAggregator) this.dataAggregator).setTickCounter(counter); + } else { + this.windowStatisticsCollector.startCountingTicks(tickHook); + } + } + this.task = this.workerPool.scheduleAtFixedRate(this, 0, this.interval, TimeUnit.MICROSECONDS); } @Override public void stop() { + super.stop(); + this.task.cancel(false); + + // collect statistics for the final window + this.windowStatisticsCollector.measureNow(this.lastWindow.get()); } @Override @@ -89,27 +110,30 @@ public class JavaSampler extends AbstractSampler implements Runnable { // this is effectively synchronized, the worker pool will not allow this task // to concurrently execute. try { - if (this.autoEndTime != -1 && this.autoEndTime <= System.currentTimeMillis()) { - this.future.complete(this); + long time = System.currentTimeMillis(); + + if (this.autoEndTime != -1 && this.autoEndTime <= time) { stop(); + this.future.complete(this); return; } + int window = ProfilingWindowUtils.unixMillisToWindow(time); ThreadInfo[] threadDumps = this.threadDumper.dumpThreads(this.threadBean); - this.workerPool.execute(new InsertDataTask(this.dataAggregator, threadDumps)); + this.workerPool.execute(new InsertDataTask(threadDumps, window)); } catch (Throwable t) { - this.future.completeExceptionally(t); stop(); + this.future.completeExceptionally(t); } } - private static final class InsertDataTask implements Runnable { - private final JavaDataAggregator dataAggregator; + private final class InsertDataTask implements Runnable { private final ThreadInfo[] threadDumps; + private final int window; - InsertDataTask(JavaDataAggregator dataAggregator, ThreadInfo[] threadDumps) { - this.dataAggregator = dataAggregator; + InsertDataTask(ThreadInfo[] threadDumps, int window) { this.threadDumps = threadDumps; + this.window = window; } @Override @@ -118,16 +142,22 @@ public class JavaSampler extends AbstractSampler implements Runnable { if (threadInfo.getThreadName() == null || threadInfo.getStackTrace() == null) { continue; } - this.dataAggregator.insertData(threadInfo); + JavaSampler.this.dataAggregator.insertData(threadInfo, this.window); + } + + // if we have just stepped over into a new window, collect statistics for the previous window + int previousWindow = JavaSampler.this.lastWindow.getAndSet(this.window); + if (previousWindow != 0 && previousWindow != this.window) { + JavaSampler.this.windowStatisticsCollector.measureNow(previousWindow); } } } @Override - public SamplerData toProto(SparkPlatform platform, CommandSender creator, Comparator outputOrder, String comment, MergeMode mergeMode, ClassSourceLookup classSourceLookup) { + public SamplerData toProto(SparkPlatform platform, CommandSender creator, String comment, MergeMode mergeMode, ClassSourceLookup classSourceLookup) { SamplerData.Builder proto = SamplerData.newBuilder(); writeMetadataToProto(proto, platform, creator, comment, this.dataAggregator); - writeDataToProto(proto, this.dataAggregator, outputOrder, mergeMode, classSourceLookup); + writeDataToProto(proto, this.dataAggregator, mergeMode, classSourceLookup); return proto.build(); } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/SimpleDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/SimpleDataAggregator.java index 39e21aa..54173fe 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/SimpleDataAggregator.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/SimpleDataAggregator.java @@ -44,8 +44,8 @@ public class SimpleDataAggregator extends JavaDataAggregator { } @Override - public void insertData(ThreadInfo threadInfo) { - writeData(threadInfo); + public void insertData(ThreadInfo threadInfo, int window) { + writeData(threadInfo, window); } } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/TickedDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/TickedDataAggregator.java index e062f31..d537b96 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/TickedDataAggregator.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/TickedDataAggregator.java @@ -23,6 +23,7 @@ package me.lucko.spark.common.sampler.java; import me.lucko.spark.common.sampler.ThreadGrouper; import me.lucko.spark.common.sampler.aggregator.DataAggregator; import me.lucko.spark.common.sampler.node.ThreadNode; +import me.lucko.spark.common.sampler.window.WindowStatisticsCollector; import me.lucko.spark.common.tick.TickHook; import me.lucko.spark.proto.SparkSamplerProtos.SamplerMetadata; @@ -31,7 +32,6 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; /** * Implementation of {@link DataAggregator} which supports only including sampling data from "ticks" @@ -48,14 +48,15 @@ public class TickedDataAggregator extends JavaDataAggregator { /** The expected number of samples in each tick */ private final int expectedSize; - /** The number of ticks aggregated so far */ - private final AtomicInteger numberOfTicks = new AtomicInteger(); - - private final Object mutex = new Object(); + /** Counts the number of ticks aggregated */ + private WindowStatisticsCollector.ExplicitTickCounter tickCounter; // state private int currentTick = -1; - private TickList currentData = new TickList(0); + private TickList currentData = null; + + // guards currentData + private final Object mutex = new Object(); public TickedDataAggregator(ExecutorService workerPool, ThreadGrouper threadGrouper, int interval, boolean ignoreSleeping, boolean ignoreNative, TickHook tickHook, int tickLengthThreshold) { super(workerPool, threadGrouper, interval, ignoreSleeping, ignoreNative); @@ -66,29 +67,34 @@ public class TickedDataAggregator extends JavaDataAggregator { this.expectedSize = (int) ((50 / intervalMilliseconds) + 10); } + public void setTickCounter(WindowStatisticsCollector.ExplicitTickCounter tickCounter) { + this.tickCounter = tickCounter; + } + @Override public SamplerMetadata.DataAggregator getMetadata() { // push the current tick (so numberOfTicks is accurate) synchronized (this.mutex) { pushCurrentTick(); + this.currentData = null; } return SamplerMetadata.DataAggregator.newBuilder() .setType(SamplerMetadata.DataAggregator.Type.TICKED) .setThreadGrouper(this.threadGrouper.asProto()) .setTickLengthThreshold(this.tickLengthThreshold) - .setNumberOfIncludedTicks(this.numberOfTicks.get()) + .setNumberOfIncludedTicks(this.tickCounter.getTotalCountedTicks()) .build(); } @Override - public void insertData(ThreadInfo threadInfo) { + public void insertData(ThreadInfo threadInfo, int window) { synchronized (this.mutex) { int tick = this.tickHook.getCurrentTick(); - if (this.currentTick != tick) { + if (this.currentTick != tick || this.currentData == null) { pushCurrentTick(); this.currentTick = tick; - this.currentData = new TickList(this.expectedSize); + this.currentData = new TickList(this.expectedSize, window); } this.currentData.addData(threadInfo); @@ -98,6 +104,9 @@ public class TickedDataAggregator extends JavaDataAggregator { // guarded by 'mutex' private void pushCurrentTick() { TickList currentData = this.currentData; + if (currentData == null) { + return; + } // approximate how long the tick lasted int tickLengthMicros = currentData.getList().size() * this.interval; @@ -107,8 +116,8 @@ public class TickedDataAggregator extends JavaDataAggregator { return; } - this.numberOfTicks.incrementAndGet(); this.workerPool.submit(currentData); + this.tickCounter.increment(); } @Override @@ -121,21 +130,19 @@ public class TickedDataAggregator extends JavaDataAggregator { return super.exportData(); } - public int getNumberOfTicks() { - return this.numberOfTicks.get(); - } - private final class TickList implements Runnable { private final List list; + private final int window; - TickList(int expectedSize) { + TickList(int expectedSize, int window) { this.list = new ArrayList<>(expectedSize); + this.window = window; } @Override public void run() { for (ThreadInfo data : this.list) { - writeData(data); + writeData(data, this.window); } } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/node/AbstractNode.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/node/AbstractNode.java index fd2be8d..fe1afcd 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/node/AbstractNode.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/node/AbstractNode.java @@ -20,6 +20,9 @@ package me.lucko.spark.common.sampler.node; +import me.lucko.spark.common.sampler.async.jfr.Dictionary; +import me.lucko.spark.common.sampler.window.ProtoTimeEncoder; + import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -27,62 +30,63 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.LongAdder; +import java.util.stream.IntStream; /** * Encapsulates a timed node in the sampling stack. */ public abstract class AbstractNode { - private static final int MAX_STACK_DEPTH = 300; + protected static final int MAX_STACK_DEPTH = 300; /** A map of the nodes children */ private final Map children = new ConcurrentHashMap<>(); /** The accumulated sample time for this node, measured in microseconds */ - private final LongAdder totalTime = new LongAdder(); + // long key = the window (effectively System.currentTimeMillis() / 60_000) + // LongAdder value = accumulated time in microseconds + private final Dictionary times = new Dictionary<>(); /** - * Gets the total sample time logged for this node in milliseconds. + * Gets the time accumulator for a given window * - * @return the total time + * @param window the window + * @return the accumulator */ - public double getTotalTime() { - return this.totalTime.longValue() / 1000d; + protected LongAdder getTimeAccumulator(int window) { + LongAdder adder = this.times.get(window); + if (adder == null) { + adder = new LongAdder(); + this.times.put(window, adder); + } + return adder; } - public Collection getChildren() { - return this.children.values(); + /** + * Gets the time windows that have been logged for this node. + * + * @return the time windows + */ + public IntStream getTimeWindows() { + IntStream.Builder keys = IntStream.builder(); + this.times.forEach((key, value) -> keys.add((int) key)); + return keys.build(); } /** - * Logs the given stack trace against this node and its children. + * Gets the encoded total sample times logged for this node in milliseconds. * - * @param describer the function that describes the elements of the stack - * @param stack the stack - * @param time the total time to log - * @param the stack trace element type + * @return the total times */ - public void log(StackTraceNode.Describer describer, T[] stack, long time) { - if (stack.length == 0) { - return; - } - - this.totalTime.add(time); - - AbstractNode node = this; - T previousElement = null; - - for (int offset = 0; offset < Math.min(MAX_STACK_DEPTH, stack.length); offset++) { - T element = stack[(stack.length - 1) - offset]; - - node = node.resolveChild(describer.describe(element, previousElement)); - node.totalTime.add(time); + protected double[] encodeTimesForProto(ProtoTimeEncoder encoder) { + return encoder.encode(this.times); + } - previousElement = element; - } + public Collection getChildren() { + return this.children.values(); } - private StackTraceNode resolveChild(StackTraceNode.Description description) { + protected StackTraceNode resolveChild(StackTraceNode.Description description) { StackTraceNode result = this.children.get(description); // fast path if (result != null) { return result; @@ -96,7 +100,7 @@ public abstract class AbstractNode { * @param other the other node */ protected void merge(AbstractNode other) { - this.totalTime.add(other.totalTime.longValue()); + other.times.forEach((key, value) -> getTimeAccumulator((int) key).add(value.longValue())); for (Map.Entry child : other.children.entrySet()) { resolveChild(child.getKey()).merge(child.getValue()); } @@ -123,7 +127,7 @@ public abstract class AbstractNode { list.add(child); } - list.sort(null); + //list.sort(null); return list; } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/node/StackTraceNode.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/node/StackTraceNode.java index b0d9237..ed938d5 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/node/StackTraceNode.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/node/StackTraceNode.java @@ -20,6 +20,7 @@ package me.lucko.spark.common.sampler.node; +import me.lucko.spark.common.sampler.window.ProtoTimeEncoder; import me.lucko.spark.common.util.MethodDisambiguator; import me.lucko.spark.proto.SparkSamplerProtos; @@ -30,7 +31,7 @@ import java.util.Objects; /** * Represents a stack trace element within the {@link AbstractNode node} structure. */ -public final class StackTraceNode extends AbstractNode implements Comparable { +public final class StackTraceNode extends AbstractNode { /** * Magic number to denote "no present" line number for a node. @@ -64,12 +65,16 @@ public final class StackTraceNode extends AbstractNode implements Comparable= 0) { proto.setLineNumber(this.description.lineNumber); } @@ -87,26 +92,12 @@ public final class StackTraceNode extends AbstractNode implements Comparable