aboutsummaryrefslogtreecommitdiff
path: root/spark-common/src/main/java/me
diff options
context:
space:
mode:
Diffstat (limited to 'spark-common/src/main/java/me')
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/command/modules/SamplerModule.java14
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/AbstractSampler.java42
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java4
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncDataAggregator.java4
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerAccess.java4
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerJob.java264
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java255
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/async/JfrParsingException.java27
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/async/ProfileSegment.java50
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/Dictionary.java4
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaDataAggregator.java7
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java56
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/java/SimpleDataAggregator.java4
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/java/TickedDataAggregator.java41
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/node/AbstractNode.java70
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/node/StackTraceNode.java77
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/node/ThreadNode.java42
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/window/ProfilingWindowUtils.java (renamed from spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadNodeOrder.java)36
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/window/ProtoTimeEncoder.java94
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/window/WindowStatisticsCollector.java267
20 files changed, 979 insertions, 383 deletions
diff --git a/spark-common/src/main/java/me/lucko/spark/common/command/modules/SamplerModule.java b/spark-common/src/main/java/me/lucko/spark/common/command/modules/SamplerModule.java
index 2afed64..c1e4981 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/command/modules/SamplerModule.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/command/modules/SamplerModule.java
@@ -35,7 +35,6 @@ import me.lucko.spark.common.sampler.Sampler;
import me.lucko.spark.common.sampler.SamplerBuilder;
import me.lucko.spark.common.sampler.ThreadDumper;
import me.lucko.spark.common.sampler.ThreadGrouper;
-import me.lucko.spark.common.sampler.ThreadNodeOrder;
import me.lucko.spark.common.sampler.async.AsyncSampler;
import me.lucko.spark.common.sampler.node.MergeMode;
import me.lucko.spark.common.sampler.source.ClassSourceLookup;
@@ -94,7 +93,6 @@ public class SamplerModule implements CommandModule {
.argumentUsage("not-combined", null)
.argumentUsage("force-java-sampler", null)
.argumentUsage("stop --comment", "comment")
- .argumentUsage("stop --order-by-time", null)
.argumentUsage("stop --save-to-file", null)
.executor(this::profiler)
.tabCompleter((platform, sender, arguments) -> {
@@ -103,7 +101,7 @@ public class SamplerModule implements CommandModule {
}
if (arguments.contains("--stop") || arguments.contains("--upload")) {
- return TabCompleter.completeForOpts(arguments, "--order-by-time", "--comment", "--save-to-file");
+ return TabCompleter.completeForOpts(arguments, "--comment", "--save-to-file");
}
List<String> opts = new ArrayList<>(Arrays.asList("--info", "--stop", "--cancel",
@@ -249,14 +247,13 @@ public class SamplerModule implements CommandModule {
// await the result
if (timeoutSeconds != -1) {
- ThreadNodeOrder threadOrder = arguments.boolFlag("order-by-time") ? ThreadNodeOrder.BY_TIME : ThreadNodeOrder.BY_NAME;
String comment = Iterables.getFirst(arguments.stringFlag("comment"), null);
MethodDisambiguator methodDisambiguator = new MethodDisambiguator();
MergeMode mergeMode = arguments.boolFlag("separate-parent-calls") ? MergeMode.separateParentCalls(methodDisambiguator) : MergeMode.sameMethod(methodDisambiguator);
boolean saveToFile = arguments.boolFlag("save-to-file");
future.thenAcceptAsync(s -> {
resp.broadcastPrefixed(text("The active profiler has completed! Uploading results..."));
- handleUpload(platform, resp, s, threadOrder, comment, mergeMode, saveToFile);
+ handleUpload(platform, resp, s, comment, mergeMode, saveToFile);
});
}
}
@@ -293,18 +290,17 @@ public class SamplerModule implements CommandModule {
} else {
this.activeSampler.stop();
resp.broadcastPrefixed(text("The active profiler has been stopped! Uploading results..."));
- ThreadNodeOrder threadOrder = arguments.boolFlag("order-by-time") ? ThreadNodeOrder.BY_TIME : ThreadNodeOrder.BY_NAME;
String comment = Iterables.getFirst(arguments.stringFlag("comment"), null);
MethodDisambiguator methodDisambiguator = new MethodDisambiguator();
MergeMode mergeMode = arguments.boolFlag("separate-parent-calls") ? MergeMode.separateParentCalls(methodDisambiguator) : MergeMode.sameMethod(methodDisambiguator);
boolean saveToFile = arguments.boolFlag("save-to-file");
- handleUpload(platform, resp, this.activeSampler, threadOrder, comment, mergeMode, saveToFile);
+ handleUpload(platform, resp, this.activeSampler, comment, mergeMode, saveToFile);
this.activeSampler = null;
}
}
- private void handleUpload(SparkPlatform platform, CommandResponseHandler resp, Sampler sampler, ThreadNodeOrder threadOrder, String comment, MergeMode mergeMode, boolean saveToFileFlag) {
- SparkSamplerProtos.SamplerData output = sampler.toProto(platform, resp.sender(), threadOrder, comment, mergeMode, ClassSourceLookup.create(platform));
+ private void handleUpload(SparkPlatform platform, CommandResponseHandler resp, Sampler sampler, String comment, MergeMode mergeMode, boolean saveToFileFlag) {
+ SparkSamplerProtos.SamplerData output = sampler.toProto(platform, resp.sender(), comment, mergeMode, ClassSourceLookup.create(platform));
boolean saveToFile = false;
if (saveToFileFlag) {
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/AbstractSampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/AbstractSampler.java
index 6fc5a10..c650738 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/AbstractSampler.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/AbstractSampler.java
@@ -30,7 +30,10 @@ import me.lucko.spark.common.sampler.node.MergeMode;
import me.lucko.spark.common.sampler.node.ThreadNode;
import me.lucko.spark.common.sampler.source.ClassSourceLookup;
import me.lucko.spark.common.sampler.source.SourceMetadata;
+import me.lucko.spark.common.sampler.window.ProtoTimeEncoder;
+import me.lucko.spark.common.sampler.window.WindowStatisticsCollector;
import me.lucko.spark.common.tick.TickHook;
+import me.lucko.spark.proto.SparkProtos;
import me.lucko.spark.proto.SparkSamplerProtos.SamplerData;
import me.lucko.spark.proto.SparkSamplerProtos.SamplerMetadata;
@@ -58,12 +61,12 @@ public abstract class AbstractSampler implements Sampler {
/** The time when sampling first began */
protected long startTime = -1;
- /** The game tick when sampling first began */
- protected int startTick = -1;
-
/** The unix timestamp (in millis) when this sampler should automatically complete. */
protected final long autoEndTime; // -1 for nothing
+ /** Collects statistics for each window in the sample */
+ protected final WindowStatisticsCollector windowStatisticsCollector;
+
/** A future to encapsulate the completion of this sampler instance */
protected final CompletableFuture<Sampler> future = new CompletableFuture<>();
@@ -75,6 +78,7 @@ public abstract class AbstractSampler implements Sampler {
this.interval = interval;
this.threadDumper = threadDumper;
this.autoEndTime = autoEndTime;
+ this.windowStatisticsCollector = new WindowStatisticsCollector(platform);
}
@Override
@@ -106,11 +110,11 @@ public abstract class AbstractSampler implements Sampler {
@Override
public void start() {
this.startTime = System.currentTimeMillis();
+ }
- TickHook tickHook = this.platform.getTickHook();
- if (tickHook != null) {
- this.startTick = tickHook.getCurrentTick();
- }
+ @Override
+ public void stop() {
+ this.windowStatisticsCollector.stop();
}
protected void writeMetadataToProto(SamplerData.Builder proto, SparkPlatform platform, CommandSender creator, String comment, DataAggregator dataAggregator) {
@@ -127,12 +131,9 @@ public abstract class AbstractSampler implements Sampler {
metadata.setComment(comment);
}
- if (this.startTick != -1) {
- TickHook tickHook = this.platform.getTickHook();
- if (tickHook != null) {
- int numberOfTicks = tickHook.getCurrentTick() - this.startTick;
- metadata.setNumberOfTicks(numberOfTicks);
- }
+ int totalTicks = this.windowStatisticsCollector.getTotalTicks();
+ if (totalTicks != -1) {
+ metadata.setNumberOfTicks(totalTicks);
}
try {
@@ -171,14 +172,23 @@ public abstract class AbstractSampler implements Sampler {
proto.setMetadata(metadata);
}
- protected void writeDataToProto(SamplerData.Builder proto, DataAggregator dataAggregator, Comparator<ThreadNode> outputOrder, MergeMode mergeMode, ClassSourceLookup classSourceLookup) {
+ protected void writeDataToProto(SamplerData.Builder proto, DataAggregator dataAggregator, MergeMode mergeMode, ClassSourceLookup classSourceLookup) {
List<ThreadNode> data = dataAggregator.exportData();
- data.sort(outputOrder);
+ data.sort(Comparator.comparing(ThreadNode::getThreadLabel));
ClassSourceLookup.Visitor classSourceVisitor = ClassSourceLookup.createVisitor(classSourceLookup);
+ ProtoTimeEncoder timeEncoder = new ProtoTimeEncoder(data);
+ int[] timeWindows = timeEncoder.getKeys();
+ for (int timeWindow : timeWindows) {
+ proto.addTimeWindows(timeWindow);
+ }
+
+ this.windowStatisticsCollector.ensureHasStatisticsForAllWindows(timeWindows);
+ proto.putAllTimeWindowStatistics(this.windowStatisticsCollector.export());
+
for (ThreadNode entry : data) {
- proto.addThreads(entry.toProto(mergeMode));
+ proto.addThreads(entry.toProto(mergeMode, timeEncoder));
classSourceVisitor.visit(entry);
}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java
index 98281de..e06cba6 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java
@@ -23,11 +23,9 @@ package me.lucko.spark.common.sampler;
import me.lucko.spark.common.SparkPlatform;
import me.lucko.spark.common.command.sender.CommandSender;
import me.lucko.spark.common.sampler.node.MergeMode;
-import me.lucko.spark.common.sampler.node.ThreadNode;
import me.lucko.spark.common.sampler.source.ClassSourceLookup;
import me.lucko.spark.proto.SparkSamplerProtos.SamplerData;
-import java.util.Comparator;
import java.util.concurrent.CompletableFuture;
/**
@@ -67,6 +65,6 @@ public interface Sampler {
CompletableFuture<Sampler> getFuture();
// Methods used to export the sampler data to the web viewer.
- SamplerData toProto(SparkPlatform platform, CommandSender creator, Comparator<ThreadNode> outputOrder, String comment, MergeMode mergeMode, ClassSourceLookup classSourceLookup);
+ SamplerData toProto(SparkPlatform platform, CommandSender creator, String comment, MergeMode mergeMode, ClassSourceLookup classSourceLookup);
}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncDataAggregator.java
index 3de3943..402330a 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncDataAggregator.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncDataAggregator.java
@@ -47,10 +47,10 @@ public class AsyncDataAggregator extends AbstractDataAggregator {
.build();
}
- public void insertData(ProfileSegment element) {
+ public void insertData(ProfileSegment element, int window) {
try {
ThreadNode node = getNode(this.threadGrouper.getGroup(element.getNativeThreadId(), element.getThreadName()));
- node.log(STACK_TRACE_DESCRIBER, element.getStackTrace(), element.getTime());
+ node.log(STACK_TRACE_DESCRIBER, element.getStackTrace(), element.getTime(), window);
} catch (Exception e) {
e.printStackTrace();
}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerAccess.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerAccess.java
index abde21d..1480650 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerAccess.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerAccess.java
@@ -87,11 +87,11 @@ public class AsyncProfilerAccess {
this.setupException = setupException;
}
- public AsyncProfiler getProfiler() {
+ public AsyncProfilerJob startNewProfilerJob() {
if (this.profiler == null) {
throw new UnsupportedOperationException("async-profiler not supported", this.setupException);
}
- return this.profiler;
+ return AsyncProfilerJob.createNew(this, this.profiler);
}
public ProfilingEvent getProfilingEvent() {
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerJob.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerJob.java
new file mode 100644
index 0000000..7b123a7
--- /dev/null
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerJob.java
@@ -0,0 +1,264 @@
+/*
+ * This file is part of spark.
+ *
+ * Copyright (c) lucko (Luck) <luck@lucko.me>
+ * Copyright (c) contributors
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+package me.lucko.spark.common.sampler.async;
+
+import me.lucko.spark.common.SparkPlatform;
+import me.lucko.spark.common.sampler.ThreadDumper;
+import me.lucko.spark.common.sampler.async.jfr.JfrReader;
+
+import one.profiler.AsyncProfiler;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Predicate;
+
+/**
+ * Represents a profiling job within async-profiler.
+ *
+ * <p>Only one job can be running at a time. This is guarded by
+ * {@link #createNew(AsyncProfilerAccess, AsyncProfiler)}.</p>
+ */
+public class AsyncProfilerJob {
+
+ /**
+ * The currently active job.
+ */
+ private static final AtomicReference<AsyncProfilerJob> ACTIVE = new AtomicReference<>();
+
+ /**
+ * Creates a new {@link AsyncProfilerJob}.
+ *
+ * <p>Will throw an {@link IllegalStateException} if another job is already active.</p>
+ *
+ * @param access the profiler access object
+ * @param profiler the profiler
+ * @return the job
+ */
+ static AsyncProfilerJob createNew(AsyncProfilerAccess access, AsyncProfiler profiler) {
+ synchronized (ACTIVE) {
+ AsyncProfilerJob existing = ACTIVE.get();
+ if (existing != null) {
+ throw new IllegalStateException("Another profiler is already active: " + existing);
+ }
+
+ AsyncProfilerJob job = new AsyncProfilerJob(access, profiler);
+ ACTIVE.set(job);
+ return job;
+ }
+ }
+
+ /** The async-profiler access object */
+ private final AsyncProfilerAccess access;
+ /** The async-profiler instance */
+ private final AsyncProfiler profiler;
+
+ // Set on init
+ /** The platform */
+ private SparkPlatform platform;
+ /** The sampling interval in microseconds */
+ private int interval;
+ /** The thread dumper */
+ private ThreadDumper threadDumper;
+ /** The profiling window */
+ private int window;
+
+ /** The file used by async-profiler to output data */
+ private Path outputFile;
+
+ private AsyncProfilerJob(AsyncProfilerAccess access, AsyncProfiler profiler) {
+ this.access = access;
+ this.profiler = profiler;
+ }
+
+ /**
+ * Executes an async-profiler command.
+ *
+ * @param command the command
+ * @return the output
+ */
+ private String execute(String command) {
+ try {
+ return this.profiler.execute(command);
+ } catch (IOException e) {
+ throw new RuntimeException("Exception whilst executing profiler command", e);
+ }
+ }
+
+ /**
+ * Checks to ensure that this job is still active.
+ */
+ private void checkActive() {
+ if (ACTIVE.get() != this) {
+ throw new IllegalStateException("Profiler job no longer active!");
+ }
+ }
+
+ // Initialise the job
+ public void init(SparkPlatform platform, int interval, ThreadDumper threadDumper, int window) {
+ this.platform = platform;
+ this.interval = interval;
+ this.threadDumper = threadDumper;
+ this.window = window;
+ }
+
+ /**
+ * Starts the job.
+ */
+ public void start() {
+ checkActive();
+
+ try {
+ // create a new temporary output file
+ try {
+ this.outputFile = this.platform.getTemporaryFiles().create("spark-", "-profile-data.jfr.tmp");
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to create temporary output file", e);
+ }
+
+ // construct a command to send to async-profiler
+ String command = "start,event=" + this.access.getProfilingEvent() + ",interval=" + this.interval + "us,threads,jfr,file=" + this.outputFile.toString();
+ if (this.threadDumper instanceof ThreadDumper.Specific) {
+ command += ",filter";
+ }
+
+ // start the profiler
+ String resp = execute(command).trim();
+
+ if (!resp.equalsIgnoreCase("profiling started")) {
+ throw new RuntimeException("Unexpected response: " + resp);
+ }
+
+ // append threads to be profiled, if necessary
+ if (this.threadDumper instanceof ThreadDumper.Specific) {
+ ThreadDumper.Specific threadDumper = (ThreadDumper.Specific) this.threadDumper;
+ for (Thread thread : threadDumper.getThreads()) {
+ this.profiler.addThread(thread);
+ }
+ }
+
+ } catch (Exception e) {
+ try {
+ this.profiler.stop();
+ } catch (Exception e2) {
+ // ignore
+ }
+ close();
+
+ throw e;
+ }
+ }
+
+ /**
+ * Stops the job.
+ */
+ public void stop() {
+ checkActive();
+
+ try {
+ this.profiler.stop();
+ } catch (IllegalStateException e) {
+ if (!e.getMessage().equals("Profiler is not active")) { // ignore
+ throw e;
+ }
+ } finally {
+ close();
+ }
+ }
+
+ /**
+ * Aggregates the collected data.
+ */
+ public void aggregate(AsyncDataAggregator dataAggregator) {
+
+ Predicate<String> threadFilter;
+ if (this.threadDumper instanceof ThreadDumper.Specific) {
+ ThreadDumper.Specific specificDumper = (ThreadDumper.Specific) this.threadDumper;
+ threadFilter = n -> specificDumper.getThreadNames().contains(n.toLowerCase());
+ } else {
+ threadFilter = n -> true;
+ }
+
+ // read the jfr file produced by async-profiler
+ try (JfrReader reader = new JfrReader(this.outputFile)) {
+ readSegments(reader, threadFilter, dataAggregator, this.window);
+ } catch (Exception e) {
+ boolean fileExists;
+ try {
+ fileExists = Files.exists(this.outputFile) && Files.size(this.outputFile) != 0;
+ } catch (IOException ex) {
+ fileExists = false;
+ }
+
+ if (fileExists) {
+ throw new JfrParsingException("Error parsing JFR data from profiler output", e);
+ } else {
+ throw new JfrParsingException("Error parsing JFR data from profiler output - file " + this.outputFile + " does not exist!", e);
+ }
+ }
+
+ // delete the output file after reading
+ try {
+ Files.deleteIfExists(this.outputFile);
+ } catch (IOException e) {
+ // ignore
+ }
+
+ }
+
+ private void readSegments(JfrReader reader, Predicate<String> threadFilter, AsyncDataAggregator dataAggregator, int window) throws IOException {
+ List<JfrReader.ExecutionSample> samples = reader.readAllEvents(JfrReader.ExecutionSample.class);
+ for (int i = 0; i < samples.size(); i++) {
+ JfrReader.ExecutionSample sample = samples.get(i);
+
+ long duration;
+ if (i == 0) {
+ // we don't really know the duration of the first sample, so just use the sampling
+ // interval
+ duration = this.interval;
+ } else {
+ // calculate the duration of the sample by calculating the time elapsed since the
+ // previous sample
+ duration = TimeUnit.NANOSECONDS.toMicros(sample.time - samples.get(i - 1).time);
+ }
+
+ String threadName = reader.threads.get(sample.tid);
+ if (!threadFilter.test(threadName)) {
+ continue;
+ }
+
+ // parse the segment and give it to the data aggregator
+ ProfileSegment segment = ProfileSegment.parseSegment(reader, sample, threadName, duration);
+ dataAggregator.insertData(segment, window);
+ }
+ }
+
+ public int getWindow() {
+ return this.window;
+ }
+
+ private void close() {
+ ACTIVE.compareAndSet(this, null);
+ }
+}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java
index 7d9cb81..2c9bb5f 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java
@@ -27,61 +27,41 @@ import me.lucko.spark.common.command.sender.CommandSender;
import me.lucko.spark.common.sampler.AbstractSampler;
import me.lucko.spark.common.sampler.ThreadDumper;
import me.lucko.spark.common.sampler.ThreadGrouper;
-import me.lucko.spark.common.sampler.async.jfr.JfrReader;
import me.lucko.spark.common.sampler.node.MergeMode;
-import me.lucko.spark.common.sampler.node.ThreadNode;
import me.lucko.spark.common.sampler.source.ClassSourceLookup;
+import me.lucko.spark.common.sampler.window.ProfilingWindowUtils;
+import me.lucko.spark.common.tick.TickHook;
import me.lucko.spark.proto.SparkSamplerProtos.SamplerData;
-import one.profiler.AsyncProfiler;
-
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.util.Comparator;
-import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import java.util.function.Predicate;
/**
* A sampler implementation using async-profiler.
*/
public class AsyncSampler extends AbstractSampler {
- private final AsyncProfiler profiler;
+ private final AsyncProfilerAccess profilerAccess;
/** Responsible for aggregating and then outputting collected sampling data */
private final AsyncDataAggregator dataAggregator;
- /** Flag to mark if the output has been completed */
- private boolean outputComplete = false;
+ /** Mutex for the current profiler job */
+ private final Object[] currentJobMutex = new Object[0];
- /** The temporary output file */
- private Path outputFile;
+ /** Current profiler job */
+ private AsyncProfilerJob currentJob;
- /** The executor used for timeouts */
- private ScheduledExecutorService timeoutExecutor;
+ /** The executor used for scheduling and management */
+ private ScheduledExecutorService scheduler;
public AsyncSampler(SparkPlatform platform, int interval, ThreadDumper threadDumper, ThreadGrouper threadGrouper, long endTime) {
super(platform, interval, threadDumper, endTime);
- this.profiler = AsyncProfilerAccess.getInstance(platform).getProfiler();
+ this.profilerAccess = AsyncProfilerAccess.getInstance(platform);
this.dataAggregator = new AsyncDataAggregator(threadGrouper);
- }
-
- /**
- * Executes a profiler command.
- *
- * @param command the command to execute
- * @return the response
- */
- private String execute(String command) {
- try {
- return this.profiler.execute(command);
- } catch (IOException e) {
- throw new RuntimeException("Exception whilst executing profiler command", e);
- }
+ this.scheduler = Executors.newSingleThreadScheduledExecutor(
+ new ThreadFactoryBuilder().setNameFormat("spark-asyncsampler-worker-thread").build()
+ );
}
/**
@@ -91,33 +71,58 @@ public class AsyncSampler extends AbstractSampler {
public void start() {
super.start();
- try {
- this.outputFile = this.platform.getTemporaryFiles().create("spark-", "-profile-data.jfr.tmp");
- } catch (IOException e) {
- throw new RuntimeException("Unable to create temporary output file", e);
+ TickHook tickHook = this.platform.getTickHook();
+ if (tickHook != null) {
+ this.windowStatisticsCollector.startCountingTicks(tickHook);
}
- String command = "start,event=" + AsyncProfilerAccess.getInstance(this.platform).getProfilingEvent() + ",interval=" + this.interval + "us,threads,jfr,file=" + this.outputFile.toString();
- if (this.threadDumper instanceof ThreadDumper.Specific) {
- command += ",filter";
- }
+ int window = ProfilingWindowUtils.unixMillisToWindow(System.currentTimeMillis());
- String resp = execute(command).trim();
- if (!resp.equalsIgnoreCase("profiling started")) {
- throw new RuntimeException("Unexpected response: " + resp);
- }
+ AsyncProfilerJob job = this.profilerAccess.startNewProfilerJob();
+ job.init(this.platform, this.interval, this.threadDumper, window);
+ job.start();
+ this.currentJob = job;
- if (this.threadDumper instanceof ThreadDumper.Specific) {
- ThreadDumper.Specific threadDumper = (ThreadDumper.Specific) this.threadDumper;
- for (Thread thread : threadDumper.getThreads()) {
- this.profiler.addThread(thread);
- }
- }
+ // rotate the sampler job every minute to put data into a new window
+ this.scheduler.scheduleAtFixedRate(this::rotateProfilerJob, 1, 1, TimeUnit.MINUTES);
recordInitialGcStats();
scheduleTimeout();
}
+ private void rotateProfilerJob() {
+ try {
+ synchronized (this.currentJobMutex) {
+ AsyncProfilerJob previousJob = this.currentJob;
+ if (previousJob == null) {
+ return;
+ }
+
+ try {
+ // stop the previous job
+ previousJob.stop();
+
+ // collect statistics for the window
+ this.windowStatisticsCollector.measureNow(previousJob.getWindow());
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ // start a new job
+ int window = previousJob.getWindow() + 1;
+ AsyncProfilerJob newJob = this.profilerAccess.startNewProfilerJob();
+ newJob.init(this.platform, this.interval, this.threadDumper, window);
+ newJob.start();
+ this.currentJob = newJob;
+
+ // aggregate the output of the previous job
+ previousJob.aggregate(this.dataAggregator);
+ }
+ } catch (Throwable e) {
+ e.printStackTrace();
+ }
+ }
+
private void scheduleTimeout() {
if (this.autoEndTime == -1) {
return;
@@ -128,11 +133,7 @@ public class AsyncSampler extends AbstractSampler {
return;
}
- this.timeoutExecutor = Executors.newSingleThreadScheduledExecutor(
- new ThreadFactoryBuilder().setNameFormat("spark-asyncsampler-timeout-thread").build()
- );
-
- this.timeoutExecutor.schedule(() -> {
+ this.scheduler.schedule(() -> {
stop();
this.future.complete(this);
}, delay, TimeUnit.MILLISECONDS);
@@ -143,145 +144,27 @@ public class AsyncSampler extends AbstractSampler {
*/
@Override
public void stop() {
- try {
- this.profiler.stop();
- } catch (IllegalStateException e) {
- if (!e.getMessage().equals("Profiler is not active")) { // ignore
- throw e;
- }
- }
+ super.stop();
+ synchronized (this.currentJobMutex) {
+ this.currentJob.stop();
+ this.windowStatisticsCollector.measureNow(this.currentJob.getWindow());
+ this.currentJob.aggregate(this.dataAggregator);
+ this.currentJob = null;
+ }
- if (this.timeoutExecutor != null) {
- this.timeoutExecutor.shutdown();
- this.timeoutExecutor = null;
+ if (this.scheduler != null) {
+ this.scheduler.shutdown();
+ this.scheduler = null;
}
}
@Override
- public SamplerData toProto(SparkPlatform platform, CommandSender creator, Comparator<ThreadNode> outputOrder, String comment, MergeMode mergeMode, ClassSourceLookup classSourceLookup) {
+ public SamplerData toProto(SparkPlatform platform, CommandSender creator, String comment, MergeMode mergeMode, ClassSourceLookup classSourceLookup) {
SamplerData.Builder proto = SamplerData.newBuilder();
writeMetadataToProto(proto, platform, creator, comment, this.dataAggregator);
- aggregateOutput();
- writeDataToProto(proto, this.dataAggregator, outputOrder, mergeMode, classSourceLookup);
+ writeDataToProto(proto, this.dataAggregator, mergeMode, classSourceLookup);
return proto.build();
}
- private void aggregateOutput() {
- if (this.outputComplete) {
- return;
- }
- this.outputComplete = true;
-
- Predicate<String> threadFilter;
- if (this.threadDumper instanceof ThreadDumper.Specific) {
- ThreadDumper.Specific threadDumper = (ThreadDumper.Specific) this.threadDumper;
- threadFilter = n -> threadDumper.getThreadNames().contains(n.toLowerCase());
- } else {
- threadFilter = n -> true;
- }
-
- // read the jfr file produced by async-profiler
- try (JfrReader reader = new JfrReader(this.outputFile)) {
- readSegments(reader, threadFilter);
- } catch (Exception e) {
- boolean fileExists;
- try {
- fileExists = Files.exists(this.outputFile) && Files.size(this.outputFile) != 0;
- } catch (IOException ex) {
- fileExists = false;
- }
-
- if (fileExists) {
- throw new JfrParsingException("Error parsing JFR data from pro