diff options
author | lucko <git@lucko.me> | 2022-10-07 20:26:24 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-10-07 20:26:24 +0100 |
commit | d31f3c7bdf03c874ff9518d47d060adc18322d6b (patch) | |
tree | 94843af32b019568bda755cdf48441efed9abbc8 /spark-common/src/main/java/me/lucko/spark/common | |
parent | c4b1eccd9cd51e348983fab42ced78166f39cb0e (diff) | |
download | spark-d31f3c7bdf03c874ff9518d47d060adc18322d6b.tar.gz spark-d31f3c7bdf03c874ff9518d47d060adc18322d6b.tar.bz2 spark-d31f3c7bdf03c874ff9518d47d060adc18322d6b.zip |
Split profiler output into windows (#253)
Diffstat (limited to 'spark-common/src/main/java/me/lucko/spark/common')
20 files changed, 979 insertions, 383 deletions
diff --git a/spark-common/src/main/java/me/lucko/spark/common/command/modules/SamplerModule.java b/spark-common/src/main/java/me/lucko/spark/common/command/modules/SamplerModule.java index 2afed64..c1e4981 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/command/modules/SamplerModule.java +++ b/spark-common/src/main/java/me/lucko/spark/common/command/modules/SamplerModule.java @@ -35,7 +35,6 @@ import me.lucko.spark.common.sampler.Sampler; import me.lucko.spark.common.sampler.SamplerBuilder; import me.lucko.spark.common.sampler.ThreadDumper; import me.lucko.spark.common.sampler.ThreadGrouper; -import me.lucko.spark.common.sampler.ThreadNodeOrder; import me.lucko.spark.common.sampler.async.AsyncSampler; import me.lucko.spark.common.sampler.node.MergeMode; import me.lucko.spark.common.sampler.source.ClassSourceLookup; @@ -94,7 +93,6 @@ public class SamplerModule implements CommandModule { .argumentUsage("not-combined", null) .argumentUsage("force-java-sampler", null) .argumentUsage("stop --comment", "comment") - .argumentUsage("stop --order-by-time", null) .argumentUsage("stop --save-to-file", null) .executor(this::profiler) .tabCompleter((platform, sender, arguments) -> { @@ -103,7 +101,7 @@ public class SamplerModule implements CommandModule { } if (arguments.contains("--stop") || arguments.contains("--upload")) { - return TabCompleter.completeForOpts(arguments, "--order-by-time", "--comment", "--save-to-file"); + return TabCompleter.completeForOpts(arguments, "--comment", "--save-to-file"); } List<String> opts = new ArrayList<>(Arrays.asList("--info", "--stop", "--cancel", @@ -249,14 +247,13 @@ public class SamplerModule implements CommandModule { // await the result if (timeoutSeconds != -1) { - ThreadNodeOrder threadOrder = arguments.boolFlag("order-by-time") ? ThreadNodeOrder.BY_TIME : ThreadNodeOrder.BY_NAME; String comment = Iterables.getFirst(arguments.stringFlag("comment"), null); MethodDisambiguator methodDisambiguator = new MethodDisambiguator(); MergeMode mergeMode = arguments.boolFlag("separate-parent-calls") ? MergeMode.separateParentCalls(methodDisambiguator) : MergeMode.sameMethod(methodDisambiguator); boolean saveToFile = arguments.boolFlag("save-to-file"); future.thenAcceptAsync(s -> { resp.broadcastPrefixed(text("The active profiler has completed! Uploading results...")); - handleUpload(platform, resp, s, threadOrder, comment, mergeMode, saveToFile); + handleUpload(platform, resp, s, comment, mergeMode, saveToFile); }); } } @@ -293,18 +290,17 @@ public class SamplerModule implements CommandModule { } else { this.activeSampler.stop(); resp.broadcastPrefixed(text("The active profiler has been stopped! Uploading results...")); - ThreadNodeOrder threadOrder = arguments.boolFlag("order-by-time") ? ThreadNodeOrder.BY_TIME : ThreadNodeOrder.BY_NAME; String comment = Iterables.getFirst(arguments.stringFlag("comment"), null); MethodDisambiguator methodDisambiguator = new MethodDisambiguator(); MergeMode mergeMode = arguments.boolFlag("separate-parent-calls") ? MergeMode.separateParentCalls(methodDisambiguator) : MergeMode.sameMethod(methodDisambiguator); boolean saveToFile = arguments.boolFlag("save-to-file"); - handleUpload(platform, resp, this.activeSampler, threadOrder, comment, mergeMode, saveToFile); + handleUpload(platform, resp, this.activeSampler, comment, mergeMode, saveToFile); this.activeSampler = null; } } - private void handleUpload(SparkPlatform platform, CommandResponseHandler resp, Sampler sampler, ThreadNodeOrder threadOrder, String comment, MergeMode mergeMode, boolean saveToFileFlag) { - SparkSamplerProtos.SamplerData output = sampler.toProto(platform, resp.sender(), threadOrder, comment, mergeMode, ClassSourceLookup.create(platform)); + private void handleUpload(SparkPlatform platform, CommandResponseHandler resp, Sampler sampler, String comment, MergeMode mergeMode, boolean saveToFileFlag) { + SparkSamplerProtos.SamplerData output = sampler.toProto(platform, resp.sender(), comment, mergeMode, ClassSourceLookup.create(platform)); boolean saveToFile = false; if (saveToFileFlag) { diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/AbstractSampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/AbstractSampler.java index 6fc5a10..c650738 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/AbstractSampler.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/AbstractSampler.java @@ -30,7 +30,10 @@ import me.lucko.spark.common.sampler.node.MergeMode; import me.lucko.spark.common.sampler.node.ThreadNode; import me.lucko.spark.common.sampler.source.ClassSourceLookup; import me.lucko.spark.common.sampler.source.SourceMetadata; +import me.lucko.spark.common.sampler.window.ProtoTimeEncoder; +import me.lucko.spark.common.sampler.window.WindowStatisticsCollector; import me.lucko.spark.common.tick.TickHook; +import me.lucko.spark.proto.SparkProtos; import me.lucko.spark.proto.SparkSamplerProtos.SamplerData; import me.lucko.spark.proto.SparkSamplerProtos.SamplerMetadata; @@ -58,12 +61,12 @@ public abstract class AbstractSampler implements Sampler { /** The time when sampling first began */ protected long startTime = -1; - /** The game tick when sampling first began */ - protected int startTick = -1; - /** The unix timestamp (in millis) when this sampler should automatically complete. */ protected final long autoEndTime; // -1 for nothing + /** Collects statistics for each window in the sample */ + protected final WindowStatisticsCollector windowStatisticsCollector; + /** A future to encapsulate the completion of this sampler instance */ protected final CompletableFuture<Sampler> future = new CompletableFuture<>(); @@ -75,6 +78,7 @@ public abstract class AbstractSampler implements Sampler { this.interval = interval; this.threadDumper = threadDumper; this.autoEndTime = autoEndTime; + this.windowStatisticsCollector = new WindowStatisticsCollector(platform); } @Override @@ -106,11 +110,11 @@ public abstract class AbstractSampler implements Sampler { @Override public void start() { this.startTime = System.currentTimeMillis(); + } - TickHook tickHook = this.platform.getTickHook(); - if (tickHook != null) { - this.startTick = tickHook.getCurrentTick(); - } + @Override + public void stop() { + this.windowStatisticsCollector.stop(); } protected void writeMetadataToProto(SamplerData.Builder proto, SparkPlatform platform, CommandSender creator, String comment, DataAggregator dataAggregator) { @@ -127,12 +131,9 @@ public abstract class AbstractSampler implements Sampler { metadata.setComment(comment); } - if (this.startTick != -1) { - TickHook tickHook = this.platform.getTickHook(); - if (tickHook != null) { - int numberOfTicks = tickHook.getCurrentTick() - this.startTick; - metadata.setNumberOfTicks(numberOfTicks); - } + int totalTicks = this.windowStatisticsCollector.getTotalTicks(); + if (totalTicks != -1) { + metadata.setNumberOfTicks(totalTicks); } try { @@ -171,14 +172,23 @@ public abstract class AbstractSampler implements Sampler { proto.setMetadata(metadata); } - protected void writeDataToProto(SamplerData.Builder proto, DataAggregator dataAggregator, Comparator<ThreadNode> outputOrder, MergeMode mergeMode, ClassSourceLookup classSourceLookup) { + protected void writeDataToProto(SamplerData.Builder proto, DataAggregator dataAggregator, MergeMode mergeMode, ClassSourceLookup classSourceLookup) { List<ThreadNode> data = dataAggregator.exportData(); - data.sort(outputOrder); + data.sort(Comparator.comparing(ThreadNode::getThreadLabel)); ClassSourceLookup.Visitor classSourceVisitor = ClassSourceLookup.createVisitor(classSourceLookup); + ProtoTimeEncoder timeEncoder = new ProtoTimeEncoder(data); + int[] timeWindows = timeEncoder.getKeys(); + for (int timeWindow : timeWindows) { + proto.addTimeWindows(timeWindow); + } + + this.windowStatisticsCollector.ensureHasStatisticsForAllWindows(timeWindows); + proto.putAllTimeWindowStatistics(this.windowStatisticsCollector.export()); + for (ThreadNode entry : data) { - proto.addThreads(entry.toProto(mergeMode)); + proto.addThreads(entry.toProto(mergeMode, timeEncoder)); classSourceVisitor.visit(entry); } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java index 98281de..e06cba6 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java @@ -23,11 +23,9 @@ package me.lucko.spark.common.sampler; import me.lucko.spark.common.SparkPlatform; import me.lucko.spark.common.command.sender.CommandSender; import me.lucko.spark.common.sampler.node.MergeMode; -import me.lucko.spark.common.sampler.node.ThreadNode; import me.lucko.spark.common.sampler.source.ClassSourceLookup; import me.lucko.spark.proto.SparkSamplerProtos.SamplerData; -import java.util.Comparator; import java.util.concurrent.CompletableFuture; /** @@ -67,6 +65,6 @@ public interface Sampler { CompletableFuture<Sampler> getFuture(); // Methods used to export the sampler data to the web viewer. - SamplerData toProto(SparkPlatform platform, CommandSender creator, Comparator<ThreadNode> outputOrder, String comment, MergeMode mergeMode, ClassSourceLookup classSourceLookup); + SamplerData toProto(SparkPlatform platform, CommandSender creator, String comment, MergeMode mergeMode, ClassSourceLookup classSourceLookup); } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncDataAggregator.java index 3de3943..402330a 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncDataAggregator.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncDataAggregator.java @@ -47,10 +47,10 @@ public class AsyncDataAggregator extends AbstractDataAggregator { .build(); } - public void insertData(ProfileSegment element) { + public void insertData(ProfileSegment element, int window) { try { ThreadNode node = getNode(this.threadGrouper.getGroup(element.getNativeThreadId(), element.getThreadName())); - node.log(STACK_TRACE_DESCRIBER, element.getStackTrace(), element.getTime()); + node.log(STACK_TRACE_DESCRIBER, element.getStackTrace(), element.getTime(), window); } catch (Exception e) { e.printStackTrace(); } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerAccess.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerAccess.java index abde21d..1480650 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerAccess.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerAccess.java @@ -87,11 +87,11 @@ public class AsyncProfilerAccess { this.setupException = setupException; } - public AsyncProfiler getProfiler() { + public AsyncProfilerJob startNewProfilerJob() { if (this.profiler == null) { throw new UnsupportedOperationException("async-profiler not supported", this.setupException); } - return this.profiler; + return AsyncProfilerJob.createNew(this, this.profiler); } public ProfilingEvent getProfilingEvent() { diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerJob.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerJob.java new file mode 100644 index 0000000..7b123a7 --- /dev/null +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerJob.java @@ -0,0 +1,264 @@ +/* + * This file is part of spark. + * + * Copyright (c) lucko (Luck) <luck@lucko.me> + * Copyright (c) contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +package me.lucko.spark.common.sampler.async; + +import me.lucko.spark.common.SparkPlatform; +import me.lucko.spark.common.sampler.ThreadDumper; +import me.lucko.spark.common.sampler.async.jfr.JfrReader; + +import one.profiler.AsyncProfiler; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Predicate; + +/** + * Represents a profiling job within async-profiler. + * + * <p>Only one job can be running at a time. This is guarded by + * {@link #createNew(AsyncProfilerAccess, AsyncProfiler)}.</p> + */ +public class AsyncProfilerJob { + + /** + * The currently active job. + */ + private static final AtomicReference<AsyncProfilerJob> ACTIVE = new AtomicReference<>(); + + /** + * Creates a new {@link AsyncProfilerJob}. + * + * <p>Will throw an {@link IllegalStateException} if another job is already active.</p> + * + * @param access the profiler access object + * @param profiler the profiler + * @return the job + */ + static AsyncProfilerJob createNew(AsyncProfilerAccess access, AsyncProfiler profiler) { + synchronized (ACTIVE) { + AsyncProfilerJob existing = ACTIVE.get(); + if (existing != null) { + throw new IllegalStateException("Another profiler is already active: " + existing); + } + + AsyncProfilerJob job = new AsyncProfilerJob(access, profiler); + ACTIVE.set(job); + return job; + } + } + + /** The async-profiler access object */ + private final AsyncProfilerAccess access; + /** The async-profiler instance */ + private final AsyncProfiler profiler; + + // Set on init + /** The platform */ + private SparkPlatform platform; + /** The sampling interval in microseconds */ + private int interval; + /** The thread dumper */ + private ThreadDumper threadDumper; + /** The profiling window */ + private int window; + + /** The file used by async-profiler to output data */ + private Path outputFile; + + private AsyncProfilerJob(AsyncProfilerAccess access, AsyncProfiler profiler) { + this.access = access; + this.profiler = profiler; + } + + /** + * Executes an async-profiler command. + * + * @param command the command + * @return the output + */ + private String execute(String command) { + try { + return this.profiler.execute(command); + } catch (IOException e) { + throw new RuntimeException("Exception whilst executing profiler command", e); + } + } + + /** + * Checks to ensure that this job is still active. + */ + private void checkActive() { + if (ACTIVE.get() != this) { + throw new IllegalStateException("Profiler job no longer active!"); + } + } + + // Initialise the job + public void init(SparkPlatform platform, int interval, ThreadDumper threadDumper, int window) { + this.platform = platform; + this.interval = interval; + this.threadDumper = threadDumper; + this.window = window; + } + + /** + * Starts the job. + */ + public void start() { + checkActive(); + + try { + // create a new temporary output file + try { + this.outputFile = this.platform.getTemporaryFiles().create("spark-", "-profile-data.jfr.tmp"); + } catch (IOException e) { + throw new RuntimeException("Unable to create temporary output file", e); + } + + // construct a command to send to async-profiler + String command = "start,event=" + this.access.getProfilingEvent() + ",interval=" + this.interval + "us,threads,jfr,file=" + this.outputFile.toString(); + if (this.threadDumper instanceof ThreadDumper.Specific) { + command += ",filter"; + } + + // start the profiler + String resp = execute(command).trim(); + + if (!resp.equalsIgnoreCase("profiling started")) { + throw new RuntimeException("Unexpected response: " + resp); + } + + // append threads to be profiled, if necessary + if (this.threadDumper instanceof ThreadDumper.Specific) { + ThreadDumper.Specific threadDumper = (ThreadDumper.Specific) this.threadDumper; + for (Thread thread : threadDumper.getThreads()) { + this.profiler.addThread(thread); + } + } + + } catch (Exception e) { + try { + this.profiler.stop(); + } catch (Exception e2) { + // ignore + } + close(); + + throw e; + } + } + + /** + * Stops the job. + */ + public void stop() { + checkActive(); + + try { + this.profiler.stop(); + } catch (IllegalStateException e) { + if (!e.getMessage().equals("Profiler is not active")) { // ignore + throw e; + } + } finally { + close(); + } + } + + /** + * Aggregates the collected data. + */ + public void aggregate(AsyncDataAggregator dataAggregator) { + + Predicate<String> threadFilter; + if (this.threadDumper instanceof ThreadDumper.Specific) { + ThreadDumper.Specific specificDumper = (ThreadDumper.Specific) this.threadDumper; + threadFilter = n -> specificDumper.getThreadNames().contains(n.toLowerCase()); + } else { + threadFilter = n -> true; + } + + // read the jfr file produced by async-profiler + try (JfrReader reader = new JfrReader(this.outputFile)) { + readSegments(reader, threadFilter, dataAggregator, this.window); + } catch (Exception e) { + boolean fileExists; + try { + fileExists = Files.exists(this.outputFile) && Files.size(this.outputFile) != 0; + } catch (IOException ex) { + fileExists = false; + } + + if (fileExists) { + throw new JfrParsingException("Error parsing JFR data from profiler output", e); + } else { + throw new JfrParsingException("Error parsing JFR data from profiler output - file " + this.outputFile + " does not exist!", e); + } + } + + // delete the output file after reading + try { + Files.deleteIfExists(this.outputFile); + } catch (IOException e) { + // ignore + } + + } + + private void readSegments(JfrReader reader, Predicate<String> threadFilter, AsyncDataAggregator dataAggregator, int window) throws IOException { + List<JfrReader.ExecutionSample> samples = reader.readAllEvents(JfrReader.ExecutionSample.class); + for (int i = 0; i < samples.size(); i++) { + JfrReader.ExecutionSample sample = samples.get(i); + + long duration; + if (i == 0) { + // we don't really know the duration of the first sample, so just use the sampling + // interval + duration = this.interval; + } else { + // calculate the duration of the sample by calculating the time elapsed since the + // previous sample + duration = TimeUnit.NANOSECONDS.toMicros(sample.time - samples.get(i - 1).time); + } + + String threadName = reader.threads.get(sample.tid); + if (!threadFilter.test(threadName)) { + continue; + } + + // parse the segment and give it to the data aggregator + ProfileSegment segment = ProfileSegment.parseSegment(reader, sample, threadName, duration); + dataAggregator.insertData(segment, window); + } + } + + public int getWindow() { + return this.window; + } + + private void close() { + ACTIVE.compareAndSet(this, null); + } +} diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java index 7d9cb81..2c9bb5f 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java @@ -27,61 +27,41 @@ import me.lucko.spark.common.command.sender.CommandSender; import me.lucko.spark.common.sampler.AbstractSampler; import me.lucko.spark.common.sampler.ThreadDumper; import me.lucko.spark.common.sampler.ThreadGrouper; -import me.lucko.spark.common.sampler.async.jfr.JfrReader; import me.lucko.spark.common.sampler.node.MergeMode; -import me.lucko.spark.common.sampler.node.ThreadNode; import me.lucko.spark.common.sampler.source.ClassSourceLookup; +import me.lucko.spark.common.sampler.window.ProfilingWindowUtils; +import me.lucko.spark.common.tick.TickHook; import me.lucko.spark.proto.SparkSamplerProtos.SamplerData; -import one.profiler.AsyncProfiler; - -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.Comparator; -import java.util.List; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.function.Predicate; /** * A sampler implementation using async-profiler. */ public class AsyncSampler extends AbstractSampler { - private final AsyncProfiler profiler; + private final AsyncProfilerAccess profilerAccess; /** Responsible for aggregating and then outputting collected sampling data */ private final AsyncDataAggregator dataAggregator; - /** Flag to mark if the output has been completed */ - private boolean outputComplete = false; + /** Mutex for the current profiler job */ + private final Object[] currentJobMutex = new Object[0]; - /** The temporary output file */ - private Path outputFile; + /** Current profiler job */ + private AsyncProfilerJob currentJob; - /** The executor used for timeouts */ - private ScheduledExecutorService timeoutExecutor; + /** The executor used for scheduling and management */ + private ScheduledExecutorService scheduler; public AsyncSampler(SparkPlatform platform, int interval, ThreadDumper threadDumper, ThreadGrouper threadGrouper, long endTime) { super(platform, interval, threadDumper, endTime); - this.profiler = AsyncProfilerAccess.getInstance(platform).getProfiler(); + this.profilerAccess = AsyncProfilerAccess.getInstance(platform); this.dataAggregator = new AsyncDataAggregator(threadGrouper); - } - - /** - * Executes a profiler command. - * - * @param command the command to execute - * @return the response - */ - private String execute(String command) { - try { - return this.profiler.execute(command); - } catch (IOException e) { - throw new RuntimeException("Exception whilst executing profiler command", e); - } + this.scheduler = Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder().setNameFormat("spark-asyncsampler-worker-thread").build() + ); } /** @@ -91,33 +71,58 @@ public class AsyncSampler extends AbstractSampler { public void start() { super.start(); - try { - this.outputFile = this.platform.getTemporaryFiles().create("spark-", "-profile-data.jfr.tmp"); - } catch (IOException e) { - throw new RuntimeException("Unable to create temporary output file", e); + TickHook tickHook = this.platform.getTickHook(); + if (tickHook != null) { + this.windowStatisticsCollector.startCountingTicks(tickHook); } - String command = "start,event=" + AsyncProfilerAccess.getInstance(this.platform).getProfilingEvent() + ",interval=" + this.interval + "us,threads,jfr,file=" + this.outputFile.toString(); - if (this.threadDumper instanceof ThreadDumper.Specific) { - command += ",filter"; - } + int window = ProfilingWindowUtils.unixMillisToWindow(System.currentTimeMillis()); - String resp = execute(command).trim(); - if (!resp.equalsIgnoreCase("profiling started")) { - throw new RuntimeException("Unexpected response: " + resp); - } + AsyncProfilerJob job = this.profilerAccess.startNewProfilerJob(); + job.init(this.platform, this.interval, this.threadDumper, window); + job.start(); + this.currentJob = job; - if (this.threadDumper instanceof ThreadDumper.Specific) { - ThreadDumper.Specific threadDumper = (ThreadDumper.Specific) this.threadDumper; - for (Thread thread : threadDumper.getThreads()) { - this.profiler.addThread(thread); - } - } + // rotate the sampler job every minute to put data into a new window + this.scheduler.scheduleAtFixedRate(this::rotateProfilerJob, 1, 1, TimeUnit.MINUTES); recordInitialGcStats(); scheduleTimeout(); } + private void rotateProfilerJob() { + try { + synchronized (this.currentJobMutex) { + AsyncProfilerJob previousJob = this.currentJob; + if (previousJob == null) { + return; + } + + try { + // stop the previous job + previousJob.stop(); + + // collect statistics for the window + this.windowStatisticsCollector.measureNow(previousJob.getWindow()); + } catch (Exception e) { + e.printStackTrace(); + } + + // start a new job + int window = previousJob.getWindow() + 1; + AsyncProfilerJob newJob = this.profilerAccess.startNewProfilerJob(); + newJob.init(this.platform, this.interval, this.threadDumper, window); + newJob.start(); + this.currentJob = newJob; + + // aggregate the output of the previous job + previousJob.aggregate(this.dataAggregator); + } + } catch (Throwable e) { + e.printStackTrace(); + } + } + private void scheduleTimeout() { if (this.autoEndTime == -1) { return; @@ -128,11 +133,7 @@ public class AsyncSampler extends AbstractSampler { return; } - this.timeoutExecutor = Executors.newSingleThreadScheduledExecutor( - new ThreadFactoryBuilder().setNameFormat("spark-asyncsampler-timeout-thread").build() - ); - - this.timeoutExecutor.schedule(() -> { + this.scheduler.schedule(() -> { stop(); this.future.complete(this); }, delay, TimeUnit.MILLISECONDS); @@ -143,145 +144,27 @@ public class AsyncSampler extends AbstractSampler { */ @Override public void stop() { - try { - this.profiler.stop(); - } catch (IllegalStateException e) { - if (!e.getMessage().equals("Profiler is not active")) { // ignore - throw e; - } - } + super.stop(); + synchronized (this.currentJobMutex) { + this.currentJob.stop(); + this.windowStatisticsCollector.measureNow(this.currentJob.getWindow()); + this.currentJob.aggregate(this.dataAggregator); + this.currentJob = null; + } - if (this.timeoutExecutor != null) { - this.timeoutExecutor.shutdown(); - this.timeoutExecutor = null; + if (this.scheduler != null) { + this.scheduler.shutdown(); + this.scheduler = null; } } @Override - public SamplerData toProto(SparkPlatform platform, CommandSender creator, Comparator<ThreadNode> outputOrder, String comment, MergeMode mergeMode, ClassSourceLookup classSourceLookup) { + public SamplerData toProto(SparkPlatform platform, CommandSender creator, String comment, MergeMode mergeMode, ClassSourceLookup classSourceLookup) { SamplerData.Builder proto = SamplerData.newBuilder(); writeMetadataToProto(proto, platform, creator, comment, this.dataAggregator); - aggregateOutput(); - writeDataToProto(proto, this.dataAggregator, outputOrder, mergeMode, classSourceLookup); + writeDataToProto(proto, this.dataAggregator, mergeMode, classSourceLookup); return proto.build(); } - private void aggregateOutput() { - if (this.outputComplete) { - return; - } - this.outputComplete = true; - - Predicate<String> threadFilter; - if (this.threadDumper instanceof ThreadDumper.Specific) { - ThreadDumper.Specific threadDumper = (ThreadDumper.Specific) this.threadDumper; - threadFilter = n -> threadDumper.getThreadNames().contains(n.toLowerCase()); - } else { - threadFilter = n -> true; - } - - // read the jfr file produced by async-profiler - try (JfrReader reader = new JfrReader(this.outputFile)) { - readSegments(reader, threadFilter); - } catch (Exception e) { - boolean fileExists; - try { - fileExists = Files.exists(this.outputFile) && Files.size(this.outputFile) != 0; - } catch (IOException ex) { - fileExists = false; - } - - if (fileExists) { - throw new JfrParsingException("Error parsing JFR data from profiler output", e); - } else { - throw new JfrParsingException("Error parsing JFR data from profiler output - file " + this.outputFile + " does not exist!", e); - } - } - - // delete the output file after reading - try { - Files.deleteIfExists(this.outputFile); - } catch (IOException e) { - // ignore - } - } - - private void readSegments(JfrReader reader, Predicate<String> threadFilter) throws IOException { - List<JfrReader.ExecutionSample> samples = reader.readAllEvents(JfrReader.ExecutionSample.class); - for (int i = 0; i < samples.size(); i++) { - JfrReader.ExecutionSample sample = samples.get(i); - - long duration; - if (i == 0) { - // we don't really know the duration of the first sample, so just use the sampling - // interval - duration = this.interval; - } else { - // calculate the duration of the sample by calculating the time elapsed since the - // previous sample - duration = TimeUnit.NANOSECONDS.toMicros(sample.time - samples.get(i - 1).time); - } - - String threadName = reader.threads.get(sample.tid); - if (!threadFilter.test(threadName)) { - continue; - } - - // parse the segment and give it to the data aggregator - ProfileSegment segment = parseSegment(reader, sample, threadName, duration); - this.dataAggregator.insertData(segment); - } - } - - private static ProfileSegment parseSegment(JfrReader reader, JfrReader.ExecutionSample sample, String threadName, long duration) { - JfrReader.StackTrace stackTrace = reader.stackTraces.get(sample.stackTraceId); - int len = stackTrace.methods.length; - - AsyncStackTraceElement[] stack = new AsyncStackTraceElement[len]; - for (int i = 0; i < len; i++) { - stack[i] = parseStackFrame(reader, stackTrace.methods[i]); - } - - return new ProfileSegment(sample.tid, threadName, stack, duration); - } - - private static AsyncStackTraceElement parseStackFrame(JfrReader reader, long methodId) { - AsyncStackTraceElement result = reader.stackFrames.get(methodId); - if (result != null) { - return result; - } - - JfrReader.MethodRef methodRef = reader.methods.get(methodId); - JfrReader.ClassRef classRef = reader.classes.get(methodRef.cls); - - byte[] className = reader.symbols.get(classRef.name); - byte[] methodName = reader.symbols.get(methodRef.name); - - if (className == null || className.length == 0) { - // native call - result = new AsyncStackTraceElement( - AsyncStackTraceElement.NATIVE_CALL, - new String(methodName, StandardCharsets.UTF_8), - null - ); - } else { - // java method - byte[] methodDesc = reader.symbols.get(methodRef.sig); - result = new AsyncStackTraceElement( - new String(className, StandardCharsets.UTF_8).replace('/', '.'), - new String(methodName, StandardCharsets.UTF_8), - new String(methodDesc, StandardCharsets.UTF_8) - ); - } - - reader.stackFrames.put(methodId, result); - return result; - } - - private static final class JfrParsingException extends RuntimeException { - public JfrParsingException(String message, Throwable cause) { - super(message, cause); - } - } } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/JfrParsingException.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/JfrParsingException.java new file mode 100644 index 0000000..6dab359 --- /dev/null +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/JfrParsingException.java @@ -0,0 +1,27 @@ +/* + * This file is part of spark. + * + * Copyright (c) lucko (Luck) <luck@lucko.me> + * Copyright (c) contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +package me.lucko.spark.common.sampler.async; + +public class JfrParsingException extends RuntimeException { + public JfrParsingException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/ProfileSegment.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/ProfileSegment.java index 154e6fe..26debaf 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/ProfileSegment.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/ProfileSegment.java @@ -20,6 +20,10 @@ package me.lucko.spark.common.sampler.async; +import me.lucko.spark.common.sampler.async.jfr.JfrReader; + +import java.nio.charset.StandardCharsets; + /** * Represents a profile "segment". * @@ -58,4 +62,50 @@ public class ProfileSegment { public long getTime() { return this.time; } + + public static ProfileSegment parseSegment(JfrReader reader, JfrReader.ExecutionSample sample, String threadName, long duration) { + JfrReader.StackTrace stackTrace = reader.stackTraces.get(sample.stackTraceId); + int len = stackTrace.methods.length; + + AsyncStackTraceElement[] stack = new AsyncStackTraceElement[len]; + for (int i = 0; i < len; i++) { + stack[i] = parseStackFrame(reader, stackTrace.methods[i]); + } + + return new ProfileSegment(sample.tid, threadName, stack, duration); + } + + private static AsyncStackTraceElement parseStackFrame(JfrReader reader, long methodId) { + AsyncStackTraceElement result = reader.stackFrames.get(methodId); + if (result != null) { + return result; + } + + JfrReader.MethodRef methodRef = reader.methods.get(methodId); + JfrReader.ClassRef classRef = reader.classes.get(methodRef.cls); + + byte[] className = reader.symbols.get(classRef.name); + byte[] methodName = reader.symbols.get(methodRef.name); + + if (className == null || className.length == 0) { + // native call + result = new AsyncStackTraceElement( + AsyncStackTraceElement.NATIVE_CALL, + new String(methodName, StandardCharsets.UTF_8), + null + ); + } else { + // java method + byte[] methodDesc = reader.symbols.get(methodRef.sig); + result = new AsyncStackTraceElement( + new String(className, StandardCharsets.UTF_8).replace('/', '.'), + new String(methodName, StandardCharsets.UTF_8), + new String(methodDesc, StandardCharsets.UTF_8) + ); + } + + reader.stackFrames.put(methodId, result); + return result; + } + } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/Dictionary.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/Dictionary.java index 23223a2..60f6543 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/Dictionary.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/Dictionary.java @@ -37,6 +37,10 @@ public class Dictionary<T> { size = 0; } + public int size() { + return this.size; + } + public void put(long key, T value) { if (key == 0) { throw new IllegalArgumentException("Zero key not allowed"); diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaDataAggregator.java index cc530d6..c51ec05 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaDataAggregator.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaDataAggregator.java @@ -66,10 +66,11 @@ public abstract class JavaDataAggregator extends AbstractDataAggregator { * Inserts sampling data into this aggregator * * @param threadInfo the thread info + * @param window the window */ - public abstract void insertData(ThreadInfo threadInfo); + public abstract void insertData(ThreadInfo threadInfo, int window); - protected void writeData(ThreadInfo threadInfo) { + protected void writeData(ThreadInfo threadInfo, int window) { if (this.ignoreSleeping && isSleeping(threadInfo)) { return; } @@ -79,7 +80,7 @@ public abstract class JavaDataAggregator extends AbstractDataAggregator { try { ThreadNode node = getNode(this.threadGrouper.getGroup(threadInfo.getThreadId(), threadInfo.getThreadName())); - node.log(STACK_TRACE_DESCRIBER, threadInfo.getStackTrace(), this.interval); + node.log(STACK_TRACE_DESCRIBER, threadInfo.getStackTrace(), this.interval, window); } catch (Exception e) { e.printStackTrace(); } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java index 0f73a9f..8c96fd3 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java @@ -28,15 +28,17 @@ import me.lucko.spark.common.sampler.AbstractSampler; import me.lucko.spark.common.sampler.ThreadDumper; import me.lucko.spark.common.sampler.ThreadGrouper; import me.lucko.spark.common.sampler.node.MergeMode; -import me.lucko.spark.common.sampler.node.ThreadNode; import me.lucko.spark.common.sampler.source.ClassSourceLookup; +import me.lucko.spark.common.sampler.window.ProfilingWindowUtils; +import me.lucko.spark.common.sampler.window.WindowStatisticsCollector; import me.lucko.spark.common.tick.TickHook; import me.lucko.spark.proto.SparkSamplerProtos.SamplerData; +import org.checkerframework.checker.units.qual.A; + import java.lang.management.ManagementFactory; import java.lang.management.ThreadInfo; import java.lang.management.ThreadMXBean; -import java.util.Comparator; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; @@ -62,6 +64,9 @@ public class JavaSampler extends AbstractSampler implements Runnable { /** Responsible for aggregating and then outputting collected sampling data */ private final JavaDataAggregator dataAggregator; + + /** The last window that was profiled */ + private final AtomicInteger lastWindow = new AtomicInteger(); public JavaSampler(SparkPlatform platform, int interval, ThreadDumper threadDumper, ThreadGrouper threadGrouper, long endTime, boolean ignoreSleeping, boolean ignoreNative) { super(platform, interval, threadDumper, endTime); @@ -76,12 +81,28 @@ public class JavaSampler extends AbstractSampler implements Runnable { @Override public void start() { super.start(); + + TickHook tickHook = this.platform.getTickHook(); + if (tickHook != null) { + if (this.dataAggregator instanceof TickedDataAggregator) { + WindowStatisticsCollector.ExplicitTickCounter counter = this.windowStatisticsCollector.startCountingTicksExplicit(tickHook); + ((TickedDataAggregator) this.dataAggregator).setTickCounter(counter); + } else { + this.windowStatisticsCollector.startCountingTicks(tickHook); + } + } + this.task = this.workerPool.scheduleAtFixedRate(this, 0, this.interval, TimeUnit.MICROSECONDS); } @Override public void stop() { + super.stop(); + this.task.cancel(false); + + // collect statistics for the final window + this.windowStatisticsCollector.measureNow(this.lastWindow.get()); } @Override @@ -89,27 +110,30 @@ public class JavaSampler extends AbstractSampler implements Runnable { // this is effectively synchronized, the worker pool will not allow this task // to concurrently execute. try { - if (this.autoEndTime != -1 && this.autoEndTime <= System.currentTimeMillis()) { - this.future.complete(this); + long time = System.currentTimeMillis(); + + if (this.autoEndTime != -1 && this.autoEndTime <= time) { stop(); + this.future.complete(this); return; } + int window = ProfilingWindowUtils.unixMillisToWindow(time); ThreadInfo[] threadDumps = this.threadDumper.dumpThreads(this.threadBean); - this.workerPool.execute(new InsertDataTask(this.dataAggregator, threadDumps)); + this.workerPool.execute(new InsertDataTask(threadDumps, window)); } catch (Throwable t) { - this.future.completeExceptionally(t); stop(); + this.future.completeExceptionally(t); } } - private static final class InsertDataTask implements Runnable { - private final JavaDataAggregator dataAggregator; + private final class InsertDataTask implements Runnable { private final ThreadInfo[] threadDumps; + private final int window; - InsertDataTask(JavaDataAggregator dataAggregator, ThreadInfo[] threadDumps) { - this.dataAggregator = dataAggregator; + InsertDataTask(ThreadInfo[] threadDumps, int window) { this.threadDumps = threadDumps; + this.window = window; } @Override @@ -118,16 +142,22 @@ public class JavaSampler extends AbstractSampler implements Runnable { if (threadInfo.getThreadName() == null || threadInfo.getStackTrace() == null) { continue; } - this.dataAggregator.insertData(threadInfo); + JavaSampler.this.dataAggregator.insertData(threadInfo, this.window); + } + + // if we have just stepped over into a new window, collect statistics for the previous window + int previousWindow = JavaSampler.this.lastWindow.getAndSet(this.window); + if (previousWindow != 0 && previousWindow != this.window) { + JavaSampler.this.windowStatisticsCollector.measureNow(previousWindow); } } } @Override - public SamplerData toProto(SparkPlatform platform, CommandSender creator, Comparator<ThreadNode> outputOrder, String comment, MergeMode mergeMode, ClassSourceLookup classSourceLookup) { + public SamplerData toProto(SparkPlatform platform, CommandSender creator, String comment, MergeMode mergeMode, ClassSourceLookup classSourceLookup) { SamplerData.Builder proto = SamplerData.newBuilder(); writeMetadataToProto(proto, platform, creator, comment, this.dataAggregator); - writeDataToProto(proto, this.dataAggregator, outputOrder, mergeMode, classSourceLookup); + writeDataToProto(proto, this.dataAggregator, mergeMode, classSourceLookup); return proto.build(); } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/SimpleDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/SimpleDataAggregator.java index 39e21aa..54173fe 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/SimpleDataAggregator.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/SimpleDataAggregator.java @@ -44,8 +44,8 @@ public class SimpleDataAggregator extends JavaDataAggregator { } @Override - public void insertData(ThreadInfo threadInfo) { - writeData(threadInfo); + public void insertData(ThreadInfo threadInfo, int window) { + writeData(threadInfo, window); } } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/TickedDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/TickedDataAggregator.java index e062f31..d537b96 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/TickedDataAggregator.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/TickedDataAggregator.java @@ -23,6 +23,7 @@ package me.lucko.spark.common.sampler.java; import me.lucko.spark.common.sampler.ThreadGrouper; import me.lucko.spark.common.sampler.aggregator.DataAggregator; import me.lucko.spark.common.sampler.node.ThreadNode; +import me.lucko.spark.common.sampler.window.WindowStatisticsCollector; import me.lucko.spark.common.tick.TickHook; import me.lucko.spark.proto.SparkSamplerProtos.SamplerMetadata; @@ -31,7 +32,6 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; /** * Implementation of {@link DataAggregator} which supports only including sampling data from "ticks" @@ -48,14 +48,15 @@ public class TickedDataAggregator extends JavaDataAggregator { /** The expected number of samples in each tick */ private final int expectedSize; - /** The number of ticks aggregated so far */ - private final AtomicInteger numberOfTicks = new AtomicInteger(); - - private final Object mutex = new Object(); + /** Counts the number of ticks aggregated */ + private WindowStatisticsCollector.ExplicitTickCounter tickCounter; // state private int currentTick = -1; - private TickList currentData = new TickList(0); + private TickList currentData = null; + + // guards currentData + private final Object mutex = new Object(); public TickedDataAggregator(ExecutorService workerPool, ThreadGrouper threadGrouper, int interval, boolean ignoreSleeping, boolean ignoreNative, TickHook tickHook, int tickLengthThreshold) { super(workerPool, threadGrouper, interval, ignoreSleeping, ignoreNative); @@ -66,29 +67,34 @@ public class TickedDataAggregator extends JavaDataAggregator { this.expectedSize = (int) ((50 / intervalMilliseconds) + 10); } + public void setTickCounter(WindowStatisticsCollector.ExplicitTickCounter tickCounter) { + this.tickCounter = tickCounter; + } + @Override public SamplerMetadata.DataAggregator getMetadata() { // push the current tick (so numberOfTicks is accurate) synchronized (this.mutex) { pushCurrentTick(); + this.currentData = null; } return SamplerMetadata.DataAggregator.newBuilder() .setType(SamplerMetadata.DataAggregator.Type.TICKED) .setThreadGrouper(this.threadGrouper.asProto()) .setTickLengthThreshold(this.tickLengthThreshold) - .setNumberOfIncludedTicks(this.numberOfTicks.get()) + .setNumberOfIncludedTicks(this.tickCounter.getTotalCountedTicks()) .build(); } @Override - public void insertData(ThreadInfo threadInfo) { + public void insertData(ThreadInfo threadInfo, int window) { synchronized (this.mutex) { int tick = this.tickHook.getCurrentTick(); - if (this.currentTick != tick) { + if (this.currentTick != tick || this.currentData == null) { pushCurrentTick(); this.currentTick = tick; - this.currentData = new TickList(this.expectedSize); + this.currentData = new TickList(this.expectedSize, window); } this.currentData.addData(threadInfo); @@ -98,6 +104,9 @@ public class TickedDataAggregator extends JavaDataAggregator { // guarded by 'mutex' private void pushCurrentTick() { TickList currentData = this.currentData; + if (currentData == null) { + return; + } // approximate how long the tick lasted int tickLengthMicros = currentData.getList().size() * this.interval; @@ -107,8 +116,8 @@ public class TickedDataAggregator extends JavaDataAggregator { return; } - this.numberOfTicks.incrementAndGet(); this.workerPool.submit(currentData); + this.tickCounter.increment(); } @Override @@ -121,21 +130,19 @@ public class TickedDataAggregator extends JavaDataAggregator { return super.exportData(); } - public int getNumberOfTicks() { - return this.numberOfTicks.get(); - } - private final class TickList implements Runnable { private final List<ThreadInfo> list; + private final int window; - TickList(int expectedSize) { + TickList(int expectedSize, int window) { this.list = new ArrayList<>(expectedSize); + this.window = window; } @Override public void run() { for (ThreadInfo data : this.list) { - writeData(data); + writeData(data, this.window); } } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/node/AbstractNode.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/node/AbstractNode.java index fd2be8d..fe1afcd 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/node/AbstractNode.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/node/AbstractNode.java @@ -20,6 +20,9 @@ package me.lucko.spark.common.sampler.node; +import me.lucko.spark.common.sampler.async.jfr.Dictionary; +import me.lucko.spark.common.sampler.window.ProtoTimeEncoder; + import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -27,62 +30,63 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.LongAdder; +import java.util.stream.IntStream; /** * Encapsulates a timed node in the sampling stack. */ public abstract class AbstractNode { - private static final int MAX_STACK_DEPTH = 300; + protected static final int MAX_STACK_DEPTH = 300; /** A map of the nodes children */ private final Map<StackTraceNode.Description, StackTraceNode> children = new ConcurrentHashMap<>(); /** The accumulated sample time for this node, measured in microseconds */ - private final LongAdder totalTime = new LongAdder(); + // long key = the window (effectively System.currentTimeMillis() / 60_000) + // LongAdder value = accumulated time in microseconds + private final Dictionary<LongAdder> times = new Dictionary<>(); /** - * Gets the total sample time logged for this node in milliseconds. + * Gets the time accumulator for a given window * - * @return the total time + * @param window the window + * @return the accumulator */ - public double getTotalTime() { - return this.totalTime.longValue() / 1000d; + protected LongAdder getTimeAccumulator(int window) { + LongAdder adder = this.times.get(window); + if (adder == null) { + adder = new LongAdder(); + this.times.put(window, adder); + } + return adder; } - public Collection<StackTraceNode> getChildren() { - return this.children.values(); + /** + * Gets the time windows that have been logged for this node. + * + * @return the time windows + */ + public IntStream getTimeWindows() { + IntStream.Builder keys = IntStream.builder(); + this.times.forEach((key, value) -> keys.add((int) key)); + return keys.build(); } /** - * Logs the given stack trace against this node and its children. + * Gets the encoded total sample times logged for this node in milliseconds. * - * @param describer the function that describes the elements of the stack - * @param stack the stack - * @param time the total time to log - * @param <T> the stack trace element type + * @return the total times */ - public <T> void log(StackTraceNode.Describer<T> describer, T[] stack, long time) { - if (stack.length == 0) { - return; - } - - this.totalTime.add(time); - - AbstractNode node = this; - T previousElement = null; - - for (int offset = 0; offset < Math.min(MAX_STACK_DEPTH, stack.length); offset++) { - T element = stack[(stack.length - 1) - offset]; - - node = node.resolveChild(describer.describe(element, previousElement)); - node.totalTime.add(time); + protected double[] encodeTimesForProto(ProtoTimeEncoder encoder) { + return encoder.encode(this.times); + } - previousElement = element; - } + public Collection<StackTraceNode> getChildren() { + return this.children.values(); } - private StackTraceNode resolveChild(StackTraceNode.Description description) { + protected StackTraceNode resolveChild(StackTraceNode.Description description) { StackTraceNode result = this.children.get(description); // fast path if (result != null) { return result; @@ -96,7 +100,7 @@ public abstract class AbstractNode { * @param other the other node */ protected void merge(AbstractNode other) { - this.totalTime.add(other.totalTime.longValue()); + other.times.forEach((key, value) -> getTimeAccumulator((int) key).add(value.longValue())); for (Map.Entry<StackTraceNode.Description, StackTraceNode> child : other.children.entrySet()) { resolveChild(child.getKey()).merge(child.getValue()); } @@ -123,7 +127,7 @@ public abstract class AbstractNode { list.add(child); } - list.sort(null); + //list.sort(null); return list; } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/node/StackTraceNode.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/node/StackTraceNode.java index b0d9237..ed938d5 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/node/StackTraceNode.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/node/StackTraceNode.java @@ -20,6 +20,7 @@ package me.lucko.spark.common.sampler.node; +import me.lucko.spark.common.sampler.window.ProtoTimeEncoder; import me.lucko.spark.common.util.MethodDisambiguator; import me.lucko.spark.proto.SparkSamplerProtos; @@ -30,7 +31,7 @@ import java.util.Objects; /** * Represents a stack trace element within the {@link AbstractNode node} structure. */ -public final class StackTraceNode extends AbstractNode implements Comparable<StackTraceNode> { +public final class StackTraceNode extends AbstractNode { /** * Magic number to denote "no present" line number for a node. @@ -64,12 +65,16 @@ public final class StackTraceNode extends AbstractNode implements Comparable<Sta return this.description.parentLineNumber; } - public SparkSamplerProtos.StackTraceNode toProto(MergeMode mergeMode) { + public SparkSamplerProtos.StackTraceNode toProto(MergeMode mergeMode, ProtoTimeEncoder timeEncoder) { SparkSamplerProtos.StackTraceNode.Builder proto = SparkSamplerProtos.StackTraceNode.newBuilder() - .setTime(getTotalTime()) .setClassName(this.description.className) .setMethodName(this.description.methodName); + double[] times = encodeTimesForProto(timeEncoder); + for (double time : times) { + proto.addTimes(time); + } + if (this.description.lineNumber >= 0) { proto.setLineNumber(this.description.lineNumber); } @@ -87,26 +92,12 @@ public final class StackTraceNode extends AbstractNode implements Comparable<Sta } for (StackTraceNode child : exportChildren(mergeMode)) { - proto.addChildren(child.toProto(mergeMode)); + proto.addChildren(child.toProto(mergeMode, timeEncoder)); } return proto.build(); } - @Override - public int compareTo(StackTraceNode that) { - if (this == that) { - return 0; - } - - int i = -Double.compare(this.getTotalTime(), that.getTotalTime()); - if (i != 0) { - return i; - } - - return this.description.compareTo(that.description); - } - /** * Function to construct a {@link StackTraceNode.Description} from a stack trace element * of type {@code T}. @@ -129,7 +120,7 @@ public final class StackTraceNode extends AbstractNode implements Comparable<Sta /** * Encapsulates the attributes of a {@link StackTraceNode}. */ - public static final class Description implements Comparable<Description> { + public static final class Description { private final String className; private final String methodName; @@ -162,54 +153,6 @@ public final class StackTraceNode extends AbstractNode implements Comparable<Sta this.hash = Objects.hash(this.className, this.methodName, this.methodDescription); } - private static <T extends Comparable<T>> int nullCompare(T a, T b) { - if (a == null && b == null) { - return 0; - } else if (a == null) { - return -1; - } else if (b == null) { - return 1; - } else { - return a.compareTo(b); - } - } - - @Override - public int compareTo(Description that) { - if (this == that) { - return 0; - } - - int i = this.className.compareTo(that.className); - if (i != 0) { - return i; - } - - i = this.methodName.compareTo(that.methodName); - if (i != 0) { - return i; - } - - i = nullCompare(this.methodDescription, that.methodDescription); - if (i != 0) { - return i; - } - - if (this.methodDescription != null && that.methodDescription != null) { - i = this.methodDescription.compareTo(that.methodDescription); - if (i != 0) { - return i; - } - } - - i = Integer.compare(this.lineNumber, that.lineNumber); - if (i != 0) { - return i; - } - - return Integer.compare(this.parentLineNumber, that.parentLineNumber); - } - @Override public boolean equals(Object o) { if (this == o) return true; diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/node/ThreadNode.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/node/ThreadNode.java index ed97443..1dce523 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/node/ThreadNode.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/node/ThreadNode.java @@ -20,6 +20,7 @@ package me.lucko.spark.common.sampler.node; +import me.lucko.spark.common.sampler.window.ProtoTimeEncoder; import me.lucko.spark.proto.SparkSamplerProtos; /** @@ -53,13 +54,46 @@ public final class ThreadNode extends AbstractNode { this.label = label; } - public SparkSamplerProtos.ThreadNode toProto(MergeMode mergeMode) { + /** + * Logs the given stack trace against this node and its children. + * + * @param describer the function that describes the elements of the stack + * @param stack the stack + * @param time the total time to log + * @param window the window + * @param <T> the stack trace element type + */ + public <T> void log(StackTraceNode.Describer<T> describer, T[] stack, long time, int window) { + if (stack.length == 0) { + return; + } + + getTimeAccumulator(window).add(time); + + AbstractNode node = this; + T previousElement = null; + + for (int offset = 0; offset < Math.min(MAX_STACK_DEPTH, stack.length); offset++) { + T element = stack[(stack.length - 1) - offset]; + + node = node.resolveChild(describer.describe(element, previousElement)); + node.getTimeAccumulator(window).add(time); + + previousElement = element; + } + } + + public SparkSamplerProtos.ThreadNode toProto(MergeMode mergeMode, ProtoTimeEncoder timeEncoder) { SparkSamplerProtos.ThreadNode.Builder proto = SparkSamplerProtos.ThreadNode.newBuilder() - .setName(getThreadLabel()) - .setTime(getTotalTime()); + .setName(getThreadLabel()); + + double[] times = encodeTimesForProto(timeEncoder); + for (double time : times) { + proto.addTimes(time); + } for (StackTraceNode child : exportChildren(mergeMode)) { - proto.addChildren(child.toProto(mergeMode)); + proto.addChildren(child.toProto(mergeMode, timeEncoder)); } return proto.build(); diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadNodeOrder.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/window/ProfilingWindowUtils.java index adcedcd..109adb3 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadNodeOrder.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/window/ProfilingWindowUtils.java @@ -18,35 +18,19 @@ * along with this program. If not, see <http://www.gnu.org/licenses/>. */ -package me.lucko.spark.common.sampler; +package me.lucko.spark.common.sampler.window; -import me.lucko.spark.common.sampler.node.ThreadNode; - -import java.util.Comparator; - -/** - * Methods of ordering {@link ThreadNode}s in the output data. - */ -public enum ThreadNodeOrder implements Comparator<ThreadNode> { +public enum ProfilingWindowUtils { + ; /** - * Order by the name of the thread (alphabetically) + * Gets the profiling window for the given time in unix-millis. + * + * @param time the time in milliseconds + * @return the window */ - BY_NAME { - @Override - public int compare(ThreadNode o1, ThreadNode o2) { - return o1.getThreadLabel().compareTo(o2.getThreadLabel()); - } - }, - - /** - * Order by the time taken by the thread (most time taken first) - */ - BY_TIME { - @Override - public int compare(ThreadNode o1, ThreadNode o2) { - return -Double.compare(o1.getTotalTime(), o2.getTotalTime()); - } + public static int unixMillisToWindow(long time) { + // one window per minute + return (int) (time / 60_000); } - } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/window/ProtoTimeEncoder.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/window/ProtoTimeEncoder.java new file mode 100644 index 0000000..edb2309 --- /dev/null +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/window/ProtoTimeEncoder.java @@ -0,0 +1,94 @@ +/* + * This file is part of spark. + * + * Copyright (c) lucko (Luck) <luck@lucko.me> + * Copyright (c) contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +package me.lucko.spark.common.sampler.window; + +import me.lucko.spark.common.sampler.async.jfr.Dictionary; +import me.lucko.spark.common.sampler.node.AbstractNode; +import me.lucko.spark.common.sampler.node.ThreadNode; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.LongAdder; +import java.util.stream.IntStream; + +/** + * Encodes a map of int->double into a double array. + */ +public class ProtoTimeEncoder { + /** A sorted array of all possible keys to encode */ + private final int[] keys; + /** A map of key value -> index in the keys array */ + private final Map<Integer, Integer> keysToIndex; + + public ProtoTimeEncoder(List<ThreadNode> sourceData) { + // get an array of all keys that show up in the source data + this.keys = sourceData.stream() + .map(AbstractNode::getTimeWindows) + .reduce(IntStream.empty(), IntStream::concat) + .distinct() + .sorted() + .toArray(); + + // construct a reverse index lookup + this.keysToIndex = new HashMap<>(this.keys.length); + for (int i = 0; i < this.keys.length; i++) { + this.keysToIndex.put(this.keys[i], i); + } + } + + /** + * Gets an array of the keys that could be encoded by this encoder. + * + * @return an array of keys + */ + public int[] getKeys() { + return this.keys; + } + + /** + * Encode a {@link Dictionary} (map) of times/durations into a double array. + * + * @param times a dictionary of times (unix-time millis -> duration in microseconds) + * @return the times encoded as a double array + */ + public double[] encode(Dictionary<LongAdder> times) { + // construct an array of values - length needs to exactly match the + // number of keys, even if some values are zero. + double[] array = new double[this.keys.length]; + + times.forEach((key, value) -> { + // get the index for the given key + Integer idx = this.keysToIndex.get((int) key); + if (idx == null) { + throw new RuntimeException("No index for key " + key + " in " + this.keysToIndex.keySet()); + } + + // convert the duration from microseconds -> milliseconds + double durationInMilliseconds = value.longValue() / 1000d; + + // store in the array + array[idx] = durationInMilliseconds; + }); + + return array; + } +} diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/window/WindowStatisticsCollector.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/window/WindowStatisticsCollector.java new file mode 100644 index 0000000..47f739d --- /dev/null +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/window/WindowStatisticsCollector.java @@ -0,0 +1,267 @@ +/* + * This file is part of spark. + * + * Copyright (c) lucko (Luck) <luck@lucko.me> + * Copyright (c) contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +package me.lucko.spark.common.sampler.window; + +import me.lucko.spark.common.SparkPlatform; +import me.lucko.spark.common.monitor.cpu.CpuMonitor; +import me.lucko.spark.common.monitor.tick.TickStatistics; +import me.lucko.spark.common.tick.TickHook; +import me.lucko.spark.common.util.RollingAverage; +import me.lucko.spark.proto.SparkProtos; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Collects statistics for each profiling window. + */ +public class WindowStatisticsCollector { + private static final SparkProtos.WindowStatistics ZERO = SparkProtos.WindowStatistics.newBuilder().build(); + + /** The platform */ + private final SparkPlatform platform; + + /** Map of profiling window -> statistics */ + private final Map<Integer, SparkProtos.WindowStatistics> stats; + + private TickCounter tickCounter; + + public WindowStatisticsCollector(SparkPlatform platform) { + this.platform = platform; + this.stats = new ConcurrentHashMap<>(); + } + + /** + * Indicates to the statistics collector that it should count the number + * of ticks in each window using the provided {@link TickHook}. + * + * @param hook the tick hook + */ + public void startCountingTicks(TickHook hook) { + this.tickCounter = new NormalTickCounter(this.platform, hook); + } + + /** + * Indicates to the statistics collector that it should count the number + * of ticks in each window, according to how many times the + * {@link ExplicitTickCounter#increment()} method is called. + * + * @param hook the tick hook + * @return the counter + */ + public ExplicitTickCounter startCountingTicksExplicit(TickHook hook) { + ExplicitTickCounter counter = new ExplicitTickCounter(this.platform, hook); + this.tickCounter = counter; + return counter; + } + + public void stop() { + if (this.tickCounter != null) { + this.tickCounter.stop(); + } + } + + /** + * Gets the total number of ticks that have passed between the time + * when the profiler started and stopped. + * + * <p>Importantly, note that this metric is different to the total number of ticks in a window + * (which is recorded by {@link SparkProtos.WindowStatistics#getTicks()}) or the total number + * of observed ticks if the 'only-ticks-over' aggregator is being used + * (which is recorded by {@link SparkProtos.WindowStatistics#getTicks()} + * and {@link ExplicitTickCounter#getTotalCountedTicks()}.</p> + * + * @return the total number of ticks in the profile + */ + public int getTotalTicks() { + return this.tickCounter == null ? -1 : this.tickCounter.getTotalTicks(); + } + + /** + * Measures statistics for the given window if none have been recorded yet. + * + * @param window the window + */ + public void measureNow(int window) { + this.stats.computeIfAbsent(window, w -> measure()); + } + + /** + * Ensures that the exported map has statistics (even if they are zeroed) for all windows. + * + * @param windows the expected windows + */ + public void ensureHasStatisticsForAllWindows(int[] windows) { + for (int window : windows) { + this.stats.computeIfAbsent(window, w -> ZERO); + } + } + + public Map<Integer, SparkProtos.WindowStatistics> export() { + return this.stats; + } + + /** + * Measures current statistics, where possible averaging over the last minute. (1 min = 1 window) + * + * @return the current statistics + */ + private SparkProtos.WindowStatistics measure() { + SparkProtos.WindowStatistics.Builder builder = SparkProtos.WindowStatistics.newBuilder(); + + TickStatistics tickStatistics = this.platform.getTickStatistics(); + if (tickStatistics != null) { + builder.setTps(tickStatistics.tps1Min()); + + RollingAverage mspt = tickStatistics.duration1Min(); + if (mspt != null) { + builder.setMsptMedian(mspt.median()); + builder.setMsptMax(mspt.max()); + } + } + + if (this.tickCounter != null) { + int ticks = this.tickCounter.getCountedTicksThisWindowAndReset(); + builder.setTicks(ticks); + } + + builder.setCpuProcess(CpuMonitor.processLoad1MinAvg()); + builder.setCpuSystem(CpuMonitor.systemLoad1MinAvg()); + + return builder.build(); + } + + /** + * Responsible for counting the number of ticks in a profile/window. + */ + public interface TickCounter { + + /** + * Stop the counter. + */ + void stop(); + + /** + * Get the total number of ticks. + * + * <p>See {@link WindowStatisticsCollector#getTotalTicks()} for a longer explanation + * of what this means exactly.</p> + * + * @return the total ticks + */ + int getTotalTicks(); + + /** + * Gets the total number of ticks counted in the last window, + * and resets the counter to zero. + * + * @return the number of ticks counted since the last time this method was called + */ + int getCountedTicksThisWindowAndReset(); + } + + private static abstract class BaseTickCounter implements TickCounter { + protected final SparkPlatform platform; + protected final TickHook tickHook; + + /** The game tick when sampling first began */ + private final int startTick; + + /** The game tick when sampling stopped */ + private int stopTick = -1; + + BaseTickCounter(SparkPlatform platform, TickHook tickHook) { + this.platform = platform; + this.tickHook = tickHook; + this.startTick = this.tickHook.getCurrentTick(); + } + + @Override + public void stop() { + this.stopTick = this.tickHook.getCurrentTick(); + } + + @Override + public int getTotalTicks() { + if (this.startTick == -1) { + throw new IllegalStateException("start tick not recorded"); + } + if (this.stopTick == -1) { + throw new IllegalStateException("stop tick not recorded"); + } + + return this.stopTick - this.startTick; + } + } + + /** + * Counts the number of ticks in a window using a {@link TickHook}. + */ + public static final class NormalTickCounter extends BaseTickCounter { + private int last; + + NormalTickCounter(SparkPlatform platform, TickHook tickHook) { + super(platform, tickHook); + this.last = this.tickHook.getCurrentTick(); + } + + @Override + public int getCountedTicksThisWindowAndReset() { + synchronized (this) { + int now = this.tickHook.getCurrentTick(); + int ticks = now - this.last; + this.last = now; + return ticks; + } + } + } + + /** + * Counts the number of ticks in a window according to the number of times + * {@link #increment()} is called. + * + * Used by the {@link me.lucko.spark.common.sampler.java.TickedDataAggregator}. + */ + public static final class ExplicitTickCounter extends BaseTickCounter { + private final AtomicInteger counted = new AtomicInteger(); + private final AtomicInteger total = new AtomicInteger(); + + ExplicitTickCounter(SparkPlatform platform, TickHook tickHook) { + super(platform, tickHook); + } + + public void increment() { + this.counted.incrementAndGet(); + this.total.incrementAndGet(); + } + + public int getTotalCountedTicks() { + return this.total.get(); + } + + @Override + public int getCountedTicksThisWindowAndReset() { + return this.counted.getAndSet(0); + } + } + +} |