diff options
Diffstat (limited to 'spark-common/src/main/java/me')
20 files changed, 979 insertions, 383 deletions
diff --git a/spark-common/src/main/java/me/lucko/spark/common/command/modules/SamplerModule.java b/spark-common/src/main/java/me/lucko/spark/common/command/modules/SamplerModule.java index 2afed64..c1e4981 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/command/modules/SamplerModule.java +++ b/spark-common/src/main/java/me/lucko/spark/common/command/modules/SamplerModule.java @@ -35,7 +35,6 @@ import me.lucko.spark.common.sampler.Sampler; import me.lucko.spark.common.sampler.SamplerBuilder; import me.lucko.spark.common.sampler.ThreadDumper; import me.lucko.spark.common.sampler.ThreadGrouper; -import me.lucko.spark.common.sampler.ThreadNodeOrder; import me.lucko.spark.common.sampler.async.AsyncSampler; import me.lucko.spark.common.sampler.node.MergeMode; import me.lucko.spark.common.sampler.source.ClassSourceLookup; @@ -94,7 +93,6 @@ public class SamplerModule implements CommandModule { .argumentUsage("not-combined", null) .argumentUsage("force-java-sampler", null) .argumentUsage("stop --comment", "comment") - .argumentUsage("stop --order-by-time", null) .argumentUsage("stop --save-to-file", null) .executor(this::profiler) .tabCompleter((platform, sender, arguments) -> { @@ -103,7 +101,7 @@ public class SamplerModule implements CommandModule { } if (arguments.contains("--stop") || arguments.contains("--upload")) { - return TabCompleter.completeForOpts(arguments, "--order-by-time", "--comment", "--save-to-file"); + return TabCompleter.completeForOpts(arguments, "--comment", "--save-to-file"); } List<String> opts = new ArrayList<>(Arrays.asList("--info", "--stop", "--cancel", @@ -249,14 +247,13 @@ public class SamplerModule implements CommandModule { // await the result if (timeoutSeconds != -1) { - ThreadNodeOrder threadOrder = arguments.boolFlag("order-by-time") ? ThreadNodeOrder.BY_TIME : ThreadNodeOrder.BY_NAME; String comment = Iterables.getFirst(arguments.stringFlag("comment"), null); MethodDisambiguator methodDisambiguator = new MethodDisambiguator(); MergeMode mergeMode = arguments.boolFlag("separate-parent-calls") ? MergeMode.separateParentCalls(methodDisambiguator) : MergeMode.sameMethod(methodDisambiguator); boolean saveToFile = arguments.boolFlag("save-to-file"); future.thenAcceptAsync(s -> { resp.broadcastPrefixed(text("The active profiler has completed! Uploading results...")); - handleUpload(platform, resp, s, threadOrder, comment, mergeMode, saveToFile); + handleUpload(platform, resp, s, comment, mergeMode, saveToFile); }); } } @@ -293,18 +290,17 @@ public class SamplerModule implements CommandModule { } else { this.activeSampler.stop(); resp.broadcastPrefixed(text("The active profiler has been stopped! Uploading results...")); - ThreadNodeOrder threadOrder = arguments.boolFlag("order-by-time") ? ThreadNodeOrder.BY_TIME : ThreadNodeOrder.BY_NAME; String comment = Iterables.getFirst(arguments.stringFlag("comment"), null); MethodDisambiguator methodDisambiguator = new MethodDisambiguator(); MergeMode mergeMode = arguments.boolFlag("separate-parent-calls") ? MergeMode.separateParentCalls(methodDisambiguator) : MergeMode.sameMethod(methodDisambiguator); boolean saveToFile = arguments.boolFlag("save-to-file"); - handleUpload(platform, resp, this.activeSampler, threadOrder, comment, mergeMode, saveToFile); + handleUpload(platform, resp, this.activeSampler, comment, mergeMode, saveToFile); this.activeSampler = null; } } - private void handleUpload(SparkPlatform platform, CommandResponseHandler resp, Sampler sampler, ThreadNodeOrder threadOrder, String comment, MergeMode mergeMode, boolean saveToFileFlag) { - SparkSamplerProtos.SamplerData output = sampler.toProto(platform, resp.sender(), threadOrder, comment, mergeMode, ClassSourceLookup.create(platform)); + private void handleUpload(SparkPlatform platform, CommandResponseHandler resp, Sampler sampler, String comment, MergeMode mergeMode, boolean saveToFileFlag) { + SparkSamplerProtos.SamplerData output = sampler.toProto(platform, resp.sender(), comment, mergeMode, ClassSourceLookup.create(platform)); boolean saveToFile = false; if (saveToFileFlag) { diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/AbstractSampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/AbstractSampler.java index 6fc5a10..c650738 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/AbstractSampler.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/AbstractSampler.java @@ -30,7 +30,10 @@ import me.lucko.spark.common.sampler.node.MergeMode; import me.lucko.spark.common.sampler.node.ThreadNode; import me.lucko.spark.common.sampler.source.ClassSourceLookup; import me.lucko.spark.common.sampler.source.SourceMetadata; +import me.lucko.spark.common.sampler.window.ProtoTimeEncoder; +import me.lucko.spark.common.sampler.window.WindowStatisticsCollector; import me.lucko.spark.common.tick.TickHook; +import me.lucko.spark.proto.SparkProtos; import me.lucko.spark.proto.SparkSamplerProtos.SamplerData; import me.lucko.spark.proto.SparkSamplerProtos.SamplerMetadata; @@ -58,12 +61,12 @@ public abstract class AbstractSampler implements Sampler { /** The time when sampling first began */ protected long startTime = -1; - /** The game tick when sampling first began */ - protected int startTick = -1; - /** The unix timestamp (in millis) when this sampler should automatically complete. */ protected final long autoEndTime; // -1 for nothing + /** Collects statistics for each window in the sample */ + protected final WindowStatisticsCollector windowStatisticsCollector; + /** A future to encapsulate the completion of this sampler instance */ protected final CompletableFuture<Sampler> future = new CompletableFuture<>(); @@ -75,6 +78,7 @@ public abstract class AbstractSampler implements Sampler { this.interval = interval; this.threadDumper = threadDumper; this.autoEndTime = autoEndTime; + this.windowStatisticsCollector = new WindowStatisticsCollector(platform); } @Override @@ -106,11 +110,11 @@ public abstract class AbstractSampler implements Sampler { @Override public void start() { this.startTime = System.currentTimeMillis(); + } - TickHook tickHook = this.platform.getTickHook(); - if (tickHook != null) { - this.startTick = tickHook.getCurrentTick(); - } + @Override + public void stop() { + this.windowStatisticsCollector.stop(); } protected void writeMetadataToProto(SamplerData.Builder proto, SparkPlatform platform, CommandSender creator, String comment, DataAggregator dataAggregator) { @@ -127,12 +131,9 @@ public abstract class AbstractSampler implements Sampler { metadata.setComment(comment); } - if (this.startTick != -1) { - TickHook tickHook = this.platform.getTickHook(); - if (tickHook != null) { - int numberOfTicks = tickHook.getCurrentTick() - this.startTick; - metadata.setNumberOfTicks(numberOfTicks); - } + int totalTicks = this.windowStatisticsCollector.getTotalTicks(); + if (totalTicks != -1) { + metadata.setNumberOfTicks(totalTicks); } try { @@ -171,14 +172,23 @@ public abstract class AbstractSampler implements Sampler { proto.setMetadata(metadata); } - protected void writeDataToProto(SamplerData.Builder proto, DataAggregator dataAggregator, Comparator<ThreadNode> outputOrder, MergeMode mergeMode, ClassSourceLookup classSourceLookup) { + protected void writeDataToProto(SamplerData.Builder proto, DataAggregator dataAggregator, MergeMode mergeMode, ClassSourceLookup classSourceLookup) { List<ThreadNode> data = dataAggregator.exportData(); - data.sort(outputOrder); + data.sort(Comparator.comparing(ThreadNode::getThreadLabel)); ClassSourceLookup.Visitor classSourceVisitor = ClassSourceLookup.createVisitor(classSourceLookup); + ProtoTimeEncoder timeEncoder = new ProtoTimeEncoder(data); + int[] timeWindows = timeEncoder.getKeys(); + for (int timeWindow : timeWindows) { + proto.addTimeWindows(timeWindow); + } + + this.windowStatisticsCollector.ensureHasStatisticsForAllWindows(timeWindows); + proto.putAllTimeWindowStatistics(this.windowStatisticsCollector.export()); + for (ThreadNode entry : data) { - proto.addThreads(entry.toProto(mergeMode)); + proto.addThreads(entry.toProto(mergeMode, timeEncoder)); classSourceVisitor.visit(entry); } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java index 98281de..e06cba6 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java @@ -23,11 +23,9 @@ package me.lucko.spark.common.sampler; import me.lucko.spark.common.SparkPlatform; import me.lucko.spark.common.command.sender.CommandSender; import me.lucko.spark.common.sampler.node.MergeMode; -import me.lucko.spark.common.sampler.node.ThreadNode; import me.lucko.spark.common.sampler.source.ClassSourceLookup; import me.lucko.spark.proto.SparkSamplerProtos.SamplerData; -import java.util.Comparator; import java.util.concurrent.CompletableFuture; /** @@ -67,6 +65,6 @@ public interface Sampler { CompletableFuture<Sampler> getFuture(); // Methods used to export the sampler data to the web viewer. - SamplerData toProto(SparkPlatform platform, CommandSender creator, Comparator<ThreadNode> outputOrder, String comment, MergeMode mergeMode, ClassSourceLookup classSourceLookup); + SamplerData toProto(SparkPlatform platform, CommandSender creator, String comment, MergeMode mergeMode, ClassSourceLookup classSourceLookup); } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncDataAggregator.java index 3de3943..402330a 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncDataAggregator.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncDataAggregator.java @@ -47,10 +47,10 @@ public class AsyncDataAggregator extends AbstractDataAggregator { .build(); } - public void insertData(ProfileSegment element) { + public void insertData(ProfileSegment element, int window) { try { ThreadNode node = getNode(this.threadGrouper.getGroup(element.getNativeThreadId(), element.getThreadName())); - node.log(STACK_TRACE_DESCRIBER, element.getStackTrace(), element.getTime()); + node.log(STACK_TRACE_DESCRIBER, element.getStackTrace(), element.getTime(), window); } catch (Exception e) { e.printStackTrace(); } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerAccess.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerAccess.java index abde21d..1480650 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerAccess.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerAccess.java @@ -87,11 +87,11 @@ public class AsyncProfilerAccess { this.setupException = setupException; } - public AsyncProfiler getProfiler() { + public AsyncProfilerJob startNewProfilerJob() { if (this.profiler == null) { throw new UnsupportedOperationException("async-profiler not supported", this.setupException); } - return this.profiler; + return AsyncProfilerJob.createNew(this, this.profiler); } public ProfilingEvent getProfilingEvent() { diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerJob.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerJob.java new file mode 100644 index 0000000..7b123a7 --- /dev/null +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerJob.java @@ -0,0 +1,264 @@ +/* + * This file is part of spark. + * + * Copyright (c) lucko (Luck) <luck@lucko.me> + * Copyright (c) contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +package me.lucko.spark.common.sampler.async; + +import me.lucko.spark.common.SparkPlatform; +import me.lucko.spark.common.sampler.ThreadDumper; +import me.lucko.spark.common.sampler.async.jfr.JfrReader; + +import one.profiler.AsyncProfiler; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Predicate; + +/** + * Represents a profiling job within async-profiler. + * + * <p>Only one job can be running at a time. This is guarded by + * {@link #createNew(AsyncProfilerAccess, AsyncProfiler)}.</p> + */ +public class AsyncProfilerJob { + + /** + * The currently active job. + */ + private static final AtomicReference<AsyncProfilerJob> ACTIVE = new AtomicReference<>(); + + /** + * Creates a new {@link AsyncProfilerJob}. + * + * <p>Will throw an {@link IllegalStateException} if another job is already active.</p> + * + * @param access the profiler access object + * @param profiler the profiler + * @return the job + */ + static AsyncProfilerJob createNew(AsyncProfilerAccess access, AsyncProfiler profiler) { + synchronized (ACTIVE) { + AsyncProfilerJob existing = ACTIVE.get(); + if (existing != null) { + throw new IllegalStateException("Another profiler is already active: " + existing); + } + + AsyncProfilerJob job = new AsyncProfilerJob(access, profiler); + ACTIVE.set(job); + return job; + } + } + + /** The async-profiler access object */ + private final AsyncProfilerAccess access; + /** The async-profiler instance */ + private final AsyncProfiler profiler; + + // Set on init + /** The platform */ + private SparkPlatform platform; + /** The sampling interval in microseconds */ + private int interval; + /** The thread dumper */ + private ThreadDumper threadDumper; + /** The profiling window */ + private int window; + + /** The file used by async-profiler to output data */ + private Path outputFile; + + private AsyncProfilerJob(AsyncProfilerAccess access, AsyncProfiler profiler) { + this.access = access; + this.profiler = profiler; + } + + /** + * Executes an async-profiler command. + * + * @param command the command + * @return the output + */ + private String execute(String command) { + try { + return this.profiler.execute(command); + } catch (IOException e) { + throw new RuntimeException("Exception whilst executing profiler command", e); + } + } + + /** + * Checks to ensure that this job is still active. + */ + private void checkActive() { + if (ACTIVE.get() != this) { + throw new IllegalStateException("Profiler job no longer active!"); + } + } + + // Initialise the job + public void init(SparkPlatform platform, int interval, ThreadDumper threadDumper, int window) { + this.platform = platform; + this.interval = interval; + this.threadDumper = threadDumper; + this.window = window; + } + + /** + * Starts the job. + */ + public void start() { + checkActive(); + + try { + // create a new temporary output file + try { + this.outputFile = this.platform.getTemporaryFiles().create("spark-", "-profile-data.jfr.tmp"); + } catch (IOException e) { + throw new RuntimeException("Unable to create temporary output file", e); + } + + // construct a command to send to async-profiler + String command = "start,event=" + this.access.getProfilingEvent() + ",interval=" + this.interval + "us,threads,jfr,file=" + this.outputFile.toString(); + if (this.threadDumper instanceof ThreadDumper.Specific) { + command += ",filter"; + } + + // start the profiler + String resp = execute(command).trim(); + + if (!resp.equalsIgnoreCase("profiling started")) { + throw new RuntimeException("Unexpected response: " + resp); + } + + // append threads to be profiled, if necessary + if (this.threadDumper instanceof ThreadDumper.Specific) { + ThreadDumper.Specific threadDumper = (ThreadDumper.Specific) this.threadDumper; + for (Thread thread : threadDumper.getThreads()) { + this.profiler.addThread(thread); + } + } + + } catch (Exception e) { + try { + this.profiler.stop(); + } catch (Exception e2) { + // ignore + } + close(); + + throw e; + } + } + + /** + * Stops the job. + */ + public void stop() { + checkActive(); + + try { + this.profiler.stop(); + } catch (IllegalStateException e) { + if (!e.getMessage().equals("Profiler is not active")) { // ignore + throw e; + } + } finally { + close(); + } + } + + /** + * Aggregates the collected data. + */ + public void aggregate(AsyncDataAggregator dataAggregator) { + + Predicate<String> threadFilter; + if (this.threadDumper instanceof ThreadDumper.Specific) { + ThreadDumper.Specific specificDumper = (ThreadDumper.Specific) this.threadDumper; + threadFilter = n -> specificDumper.getThreadNames().contains(n.toLowerCase()); + } else { + threadFilter = n -> true; + } + + // read the jfr file produced by async-profiler + try (JfrReader reader = new JfrReader(this.outputFile)) { + readSegments(reader, threadFilter, dataAggregator, this.window); + } catch (Exception e) { + boolean fileExists; + try { + fileExists = Files.exists(this.outputFile) && Files.size(this.outputFile) != 0; + } catch (IOException ex) { + fileExists = false; + } + + if (fileExists) { + throw new JfrParsingException("Error parsing JFR data from profiler output", e); + } else { + throw new JfrParsingException("Error parsing JFR data from profiler output - file " + this.outputFile + " does not exist!", e); + } + } + + // delete the output file after reading + try { + Files.deleteIfExists(this.outputFile); + } catch (IOException e) { + // ignore + } + + } + + private void readSegments(JfrReader reader, Predicate<String> threadFilter, AsyncDataAggregator dataAggregator, int window) throws IOException { + List<JfrReader.ExecutionSample> samples = reader.readAllEvents(JfrReader.ExecutionSample.class); + for (int i = 0; i < samples.size(); i++) { + JfrReader.ExecutionSample sample = samples.get(i); + + long duration; + if (i == 0) { + // we don't really know the duration of the first sample, so just use the sampling + // interval + duration = this.interval; + } else { + // calculate the duration of the sample by calculating the time elapsed since the + // previous sample + duration = TimeUnit.NANOSECONDS.toMicros(sample.time - samples.get(i - 1).time); + } + + String threadName = reader.threads.get(sample.tid); + if (!threadFilter.test(threadName)) { + continue; + } + + // parse the segment and give it to the data aggregator + ProfileSegment segment = ProfileSegment.parseSegment(reader, sample, threadName, duration); + dataAggregator.insertData(segment, window); + } + } + + public int getWindow() { + return this.window; + } + + private void close() { + ACTIVE.compareAndSet(this, null); + } +} diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java index 7d9cb81..2c9bb5f 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java @@ -27,61 +27,41 @@ import me.lucko.spark.common.command.sender.CommandSender; import me.lucko.spark.common.sampler.AbstractSampler; import me.lucko.spark.common.sampler.ThreadDumper; import me.lucko.spark.common.sampler.ThreadGrouper; -import me.lucko.spark.common.sampler.async.jfr.JfrReader; import me.lucko.spark.common.sampler.node.MergeMode; -import me.lucko.spark.common.sampler.node.ThreadNode; import me.lucko.spark.common.sampler.source.ClassSourceLookup; +import me.lucko.spark.common.sampler.window.ProfilingWindowUtils; +import me.lucko.spark.common.tick.TickHook; import me.lucko.spark.proto.SparkSamplerProtos.SamplerData; -import one.profiler.AsyncProfiler; - -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.Comparator; -import java.util.List; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.function.Predicate; /** * A sampler implementation using async-profiler. */ public class AsyncSampler extends AbstractSampler { - private final AsyncProfiler profiler; + private final AsyncProfilerAccess profilerAccess; /** Responsible for aggregating and then outputting collected sampling data */ private final AsyncDataAggregator dataAggregator; - /** Flag to mark if the output has been completed */ - private boolean outputComplete = false; + /** Mutex for the current profiler job */ + private final Object[] currentJobMutex = new Object[0]; - /** The temporary output file */ - private Path outputFile; + /** Current profiler job */ + private AsyncProfilerJob currentJob; - /** The executor used for timeouts */ - private ScheduledExecutorService timeoutExecutor; + /** The executor used for scheduling and management */ + private ScheduledExecutorService scheduler; public AsyncSampler(SparkPlatform platform, int interval, ThreadDumper threadDumper, ThreadGrouper threadGrouper, long endTime) { super(platform, interval, threadDumper, endTime); - this.profiler = AsyncProfilerAccess.getInstance(platform).getProfiler(); + this.profilerAccess = AsyncProfilerAccess.getInstance(platform); this.dataAggregator = new AsyncDataAggregator(threadGrouper); - } - - /** - * Executes a profiler command. - * - * @param command the command to execute - * @return the response - */ - private String execute(String command) { - try { - return this.profiler.execute(command); - } catch (IOException e) { - throw new RuntimeException("Exception whilst executing profiler command", e); - } + this.scheduler = Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder().setNameFormat("spark-asyncsampler-worker-thread").build() + ); } /** @@ -91,33 +71,58 @@ public class AsyncSampler extends AbstractSampler { public void start() { super.start(); - try { - this.outputFile = this.platform.getTemporaryFiles().create("spark-", "-profile-data.jfr.tmp"); - } catch (IOException e) { - throw new RuntimeException("Unable to create temporary output file", e); + TickHook tickHook = this.platform.getTickHook(); + if (tickHook != null) { + this.windowStatisticsCollector.startCountingTicks(tickHook); } - String command = "start,event=" + AsyncProfilerAccess.getInstance(this.platform).getProfilingEvent() + ",interval=" + this.interval + "us,threads,jfr,file=" + this.outputFile.toString(); - if (this.threadDumper instanceof ThreadDumper.Specific) { - command += ",filter"; - } + int window = ProfilingWindowUtils.unixMillisToWindow(System.currentTimeMillis()); - String resp = execute(command).trim(); - if (!resp.equalsIgnoreCase("profiling started")) { - throw new RuntimeException("Unexpected response: " + resp); - } + AsyncProfilerJob job = this.profilerAccess.startNewProfilerJob(); + job.init(this.platform, this.interval, this.threadDumper, window); + job.start(); + this.currentJob = job; - if (this.threadDumper instanceof ThreadDumper.Specific) { - ThreadDumper.Specific threadDumper = (ThreadDumper.Specific) this.threadDumper; - for (Thread thread : threadDumper.getThreads()) { - this.profiler.addThread(thread); - } - } + // rotate the sampler job every minute to put data into a new window + this.scheduler.scheduleAtFixedRate(this::rotateProfilerJob, 1, 1, TimeUnit.MINUTES); recordInitialGcStats(); scheduleTimeout(); } + private void rotateProfilerJob() { + try { + synchronized (this.currentJobMutex) { + AsyncProfilerJob previousJob = this.currentJob; + if (previousJob == null) { + return; + } + + try { + // stop the previous job + previousJob.stop(); + + // collect statistics for the window + this.windowStatisticsCollector.measureNow(previousJob.getWindow()); + } catch (Exception e) { + e.printStackTrace(); + } + + // start a new job + int window = previousJob.getWindow() + 1; + AsyncProfilerJob newJob = this.profilerAccess.startNewProfilerJob(); + newJob.init(this.platform, this.interval, this.threadDumper, window); + newJob.start(); + this.currentJob = newJob; + + // aggregate the output of the previous job + previousJob.aggregate(this.dataAggregator); + } + } catch (Throwable e) { + e.printStackTrace(); + } + } + private void scheduleTimeout() { if (this.autoEndTime == -1) { return; @@ -128,11 +133,7 @@ public class AsyncSampler extends AbstractSampler { return; } - this.timeoutExecutor = Executors.newSingleThreadScheduledExecutor( - new ThreadFactoryBuilder().setNameFormat("spark-asyncsampler-timeout-thread").build() - ); - - this.timeoutExecutor.schedule(() -> { + this.scheduler.schedule(() -> { stop(); this.future.complete(this); }, delay, TimeUnit.MILLISECONDS); @@ -143,145 +144,27 @@ public class AsyncSampler extends AbstractSampler { */ @Override public void stop() { - try { - this.profiler.stop(); - } catch (IllegalStateException e) { - if (!e.getMessage().equals("Profiler is not active")) { // ignore - throw e; - } - } + super.stop(); + synchronized (this.currentJobMutex) { + this.currentJob.stop(); + this.windowStatisticsCollector.measureNow(this.currentJob.getWindow()); + this.currentJob.aggregate(this.dataAggregator); + this.currentJob = null; + } - if (this.timeoutExecutor != null) { - this.timeoutExecutor.shutdown(); - this.timeoutExecutor = null; + if (this.scheduler != null) { + this.scheduler.shutdown(); + this.scheduler = null; } } @Override - public SamplerData toProto(SparkPlatform platform, CommandSender creator, Comparator<ThreadNode> outputOrder, String comment, MergeMode mergeMode, ClassSourceLookup classSourceLookup) { + public SamplerData toProto(SparkPlatform platform, CommandSender creator, String comment, MergeMode mergeMode, ClassSourceLookup classSourceLookup) { SamplerData.Builder proto = SamplerData.newBuilder(); writeMetadataToProto(proto, platform, creator, comment, this.dataAggregator); - aggregateOutput(); - writeDataToProto(proto, this.dataAggregator, outputOrder, mergeMode, classSourceLookup); + writeDataToProto(proto, this.dataAggregator, mergeMode, classSourceLookup); return proto.build(); } - private void aggregateOutput() { - if (this.outputComplete) { - return; - } - this.outputComplete = true; - - Predicate<String> threadFilter; - if (this.threadDumper instanceof ThreadDumper.Specific) { - ThreadDumper.Specific threadDumper = (ThreadDumper.Specific) this.threadDumper; - threadFilter = n -> threadDumper.getThreadNames().contains(n.toLowerCase()); - } else { - threadFilter = n -> true; - } - - // read the jfr file produced by async-profiler - try (JfrReader reader = new JfrReader(this.outputFile)) { - readSegments(reader, threadFilter); - } catch (Exception e) { - boolean fileExists; - try { - fileExists = Files.exists(this.outputFile) && Files.size(this.outputFile) != 0; - } catch (IOException ex) { - fileExists = false; - } - - if (fileExists) { - throw new JfrParsingException("Error parsing JFR data from pro |
