aboutsummaryrefslogtreecommitdiff
path: root/spark-common/src/main/java/me/lucko/spark/common/sampler
diff options
context:
space:
mode:
authorembeddedt <42941056+embeddedt@users.noreply.github.com>2022-12-26 18:58:46 -0500
committerembeddedt <42941056+embeddedt@users.noreply.github.com>2022-12-26 18:58:46 -0500
commit1075665def4a41cf0064255a6da1d1a652f5d473 (patch)
tree11bba64e8f28ce8b83adc05252b75f17e2ccbf6a /spark-common/src/main/java/me/lucko/spark/common/sampler
parentd9550259c1995d21fc345c58f2e531fdecf75acd (diff)
parentd9655c40c02aef137c7a6a00a1cc90a1e6fb08d1 (diff)
downloadspark-1075665def4a41cf0064255a6da1d1a652f5d473.tar.gz
spark-1075665def4a41cf0064255a6da1d1a652f5d473.tar.bz2
spark-1075665def4a41cf0064255a6da1d1a652f5d473.zip
Merge remote-tracking branch 'lucko/master' into forge-1.7.10
Diffstat (limited to 'spark-common/src/main/java/me/lucko/spark/common/sampler')
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/AbstractSampler.java95
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/BackgroundSamplerManager.java115
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java15
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java29
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerContainer.java76
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerSettings.java61
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/AbstractDataAggregator.java6
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/DataAggregator.java8
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncDataAggregator.java4
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerAccess.java34
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerJob.java276
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java267
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/async/JfrParsingException.java (renamed from spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadNodeOrder.java)33
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/async/ProfileSegment.java50
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/Dictionary.java4
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/JfrReader.java4
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaDataAggregator.java7
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java83
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/java/SimpleDataAggregator.java4
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/java/TickedDataAggregator.java41
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/node/AbstractNode.java75
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/node/StackTraceNode.java79
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/node/ThreadNode.java166
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/source/ClassSourceLookup.java462
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/source/SourceMetadata.java81
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/window/ProfilingWindowUtils.java70
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/window/ProtoTimeEncoder.java93
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/window/WindowStatisticsCollector.java287
28 files changed, 2116 insertions, 409 deletions
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/AbstractSampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/AbstractSampler.java
index 1c217db..e324fd3 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/AbstractSampler.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/AbstractSampler.java
@@ -23,17 +23,22 @@ package me.lucko.spark.common.sampler;
import me.lucko.spark.common.SparkPlatform;
import me.lucko.spark.common.command.sender.CommandSender;
import me.lucko.spark.common.monitor.memory.GarbageCollectorStatistics;
+import me.lucko.spark.common.platform.MetadataProvider;
import me.lucko.spark.common.platform.serverconfig.ServerConfigProvider;
import me.lucko.spark.common.sampler.aggregator.DataAggregator;
import me.lucko.spark.common.sampler.node.MergeMode;
import me.lucko.spark.common.sampler.node.ThreadNode;
-import me.lucko.spark.common.tick.TickHook;
-import me.lucko.spark.common.util.ClassSourceLookup;
+import me.lucko.spark.common.sampler.source.ClassSourceLookup;
+import me.lucko.spark.common.sampler.source.SourceMetadata;
+import me.lucko.spark.common.sampler.window.ProtoTimeEncoder;
+import me.lucko.spark.common.sampler.window.WindowStatisticsCollector;
import me.lucko.spark.proto.SparkSamplerProtos.SamplerData;
import me.lucko.spark.proto.SparkSamplerProtos.SamplerMetadata;
+import java.util.Collection;
import java.util.Comparator;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
@@ -54,23 +59,28 @@ public abstract class AbstractSampler implements Sampler {
/** The time when sampling first began */
protected long startTime = -1;
- /** The game tick when sampling first began */
- protected int startTick = -1;
-
/** The unix timestamp (in millis) when this sampler should automatically complete. */
protected final long autoEndTime; // -1 for nothing
+ /** If the sampler is running in the background */
+ protected boolean background;
+
+ /** Collects statistics for each window in the sample */
+ protected final WindowStatisticsCollector windowStatisticsCollector;
+
/** A future to encapsulate the completion of this sampler instance */
protected final CompletableFuture<Sampler> future = new CompletableFuture<>();
/** The garbage collector statistics when profiling started */
protected Map<String, GarbageCollectorStatistics> initialGcStats;
- protected AbstractSampler(SparkPlatform platform, int interval, ThreadDumper threadDumper, long autoEndTime) {
+ protected AbstractSampler(SparkPlatform platform, SamplerSettings settings) {
this.platform = platform;
- this.interval = interval;
- this.threadDumper = threadDumper;
- this.autoEndTime = autoEndTime;
+ this.interval = settings.interval();
+ this.threadDumper = settings.threadDumper();
+ this.autoEndTime = settings.autoEndTime();
+ this.background = settings.runningInBackground();
+ this.windowStatisticsCollector = new WindowStatisticsCollector(platform);
}
@Override
@@ -87,6 +97,11 @@ public abstract class AbstractSampler implements Sampler {
}
@Override
+ public boolean isRunningInBackground() {
+ return this.background;
+ }
+
+ @Override
public CompletableFuture<Sampler> getFuture() {
return this.future;
}
@@ -102,11 +117,11 @@ public abstract class AbstractSampler implements Sampler {
@Override
public void start() {
this.startTime = System.currentTimeMillis();
+ }
- TickHook tickHook = this.platform.getTickHook();
- if (tickHook != null) {
- this.startTick = tickHook.getCurrentTick();
- }
+ @Override
+ public void stop(boolean cancelled) {
+ this.windowStatisticsCollector.stop();
}
protected void writeMetadataToProto(SamplerData.Builder proto, SparkPlatform platform, CommandSender creator, String comment, DataAggregator dataAggregator) {
@@ -123,12 +138,9 @@ public abstract class AbstractSampler implements Sampler {
metadata.setComment(comment);
}
- if (this.startTick != -1) {
- TickHook tickHook = this.platform.getTickHook();
- if (tickHook != null) {
- int numberOfTicks = tickHook.getCurrentTick() - this.startTick;
- metadata.setNumberOfTicks(numberOfTicks);
- }
+ int totalTicks = this.windowStatisticsCollector.getTotalTicks();
+ if (totalTicks != -1) {
+ metadata.setNumberOfTicks(totalTicks);
}
try {
@@ -145,27 +157,60 @@ public abstract class AbstractSampler implements Sampler {
try {
ServerConfigProvider serverConfigProvider = platform.getPlugin().createServerConfigProvider();
- metadata.putAllServerConfigurations(serverConfigProvider.exportServerConfigurations());
+ if (serverConfigProvider != null) {
+ metadata.putAllServerConfigurations(serverConfigProvider.export());
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ try {
+ MetadataProvider extraMetadataProvider = platform.getPlugin().createExtraMetadataProvider();
+ if (extraMetadataProvider != null) {
+ metadata.putAllExtraPlatformMetadata(extraMetadataProvider.export());
+ }
} catch (Exception e) {
e.printStackTrace();
}
+ Collection<SourceMetadata> knownSources = platform.getPlugin().getKnownSources();
+ for (SourceMetadata source : knownSources) {
+ metadata.putSources(source.getName().toLowerCase(Locale.ROOT), source.toProto());
+ }
+
proto.setMetadata(metadata);
}
- protected void writeDataToProto(SamplerData.Builder proto, DataAggregator dataAggregator, Comparator<ThreadNode> outputOrder, MergeMode mergeMode, ClassSourceLookup classSourceLookup) {
+ protected void writeDataToProto(SamplerData.Builder proto, DataAggregator dataAggregator, MergeMode mergeMode, ClassSourceLookup classSourceLookup) {
List<ThreadNode> data = dataAggregator.exportData();
- data.sort(outputOrder);
+ data.sort(Comparator.comparing(ThreadNode::getThreadLabel));
ClassSourceLookup.Visitor classSourceVisitor = ClassSourceLookup.createVisitor(classSourceLookup);
+ ProtoTimeEncoder timeEncoder = new ProtoTimeEncoder(data);
+ int[] timeWindows = timeEncoder.getKeys();
+ for (int timeWindow : timeWindows) {
+ proto.addTimeWindows(timeWindow);
+ }
+
+ this.windowStatisticsCollector.ensureHasStatisticsForAllWindows(timeWindows);
+ proto.putAllTimeWindowStatistics(this.windowStatisticsCollector.export());
+
for (ThreadNode entry : data) {
- proto.addThreads(entry.toProto(mergeMode));
+ proto.addThreads(entry.toProto(mergeMode, timeEncoder));
classSourceVisitor.visit(entry);
}
- if (classSourceVisitor.hasMappings()) {
- proto.putAllClassSources(classSourceVisitor.getMapping());
+ if (classSourceVisitor.hasClassSourceMappings()) {
+ proto.putAllClassSources(classSourceVisitor.getClassSourceMapping());
+ }
+
+ if (classSourceVisitor.hasMethodSourceMappings()) {
+ proto.putAllMethodSources(classSourceVisitor.getMethodSourceMapping());
+ }
+
+ if (classSourceVisitor.hasLineSourceMappings()) {
+ proto.putAllLineSources(classSourceVisitor.getLineSourceMapping());
}
}
}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/BackgroundSamplerManager.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/BackgroundSamplerManager.java
new file mode 100644
index 0000000..7e3b6b4
--- /dev/null
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/BackgroundSamplerManager.java
@@ -0,0 +1,115 @@
+/*
+ * This file is part of spark.
+ *
+ * Copyright (c) lucko (Luck) <luck@lucko.me>
+ * Copyright (c) contributors
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+package me.lucko.spark.common.sampler;
+
+import me.lucko.spark.common.SparkPlatform;
+import me.lucko.spark.common.platform.PlatformInfo;
+import me.lucko.spark.common.util.Configuration;
+
+import java.util.logging.Level;
+
+public class BackgroundSamplerManager {
+
+ private static final String OPTION_ENABLED = "backgroundProfiler";
+ private static final String OPTION_ENGINE = "backgroundProfilerEngine";
+ private static final String OPTION_INTERVAL = "backgroundProfilerInterval";
+
+ private static final String MARKER_FAILED = "_marker_background_profiler_failed";
+
+ private final SparkPlatform platform;
+ private final Configuration configuration;
+ private final boolean enabled;
+
+ public BackgroundSamplerManager(SparkPlatform platform, Configuration configuration) {
+ this.platform = platform;
+ this.configuration = configuration;
+
+ PlatformInfo.Type type = this.platform.getPlugin().getPlatformInfo().getType();
+ this.enabled = type != PlatformInfo.Type.CLIENT && this.configuration.getBoolean(OPTION_ENABLED, type == PlatformInfo.Type.SERVER);
+ }
+
+ public void initialise() {
+ if (!this.enabled) {
+ return;
+ }
+
+ // are we enabling the background profiler by default for the first time?
+ boolean didEnableByDefault = false;
+ if (!this.configuration.contains(OPTION_ENABLED)) {
+ this.configuration.setBoolean(OPTION_ENABLED, true);
+ didEnableByDefault = true;
+ }
+
+ // did the background profiler fail to start on the previous attempt?
+ if (this.configuration.getBoolean(MARKER_FAILED, false)) {
+ this.platform.getPlugin().log(Level.WARNING, "It seems the background profiler failed to start when spark was last enabled. Sorry about that!");
+ this.platform.getPlugin().log(Level.WARNING, "In the future, spark will try to use the built-in Java profiling engine instead.");
+
+ this.configuration.remove(MARKER_FAILED);
+ this.configuration.setString(OPTION_ENGINE, "java");
+ this.configuration.save();
+ }
+
+ this.platform.getPlugin().log(Level.INFO, "Starting background profiler...");
+
+ if (didEnableByDefault) {
+ // set the failed marker and save before we try to start the profiler,
+ // then remove the marker afterwards if everything goes ok!
+ this.configuration.setBoolean(MARKER_FAILED, true);
+ this.configuration.save();
+ }
+
+ try {
+ startSampler();
+
+ if (didEnableByDefault) {
+ this.configuration.remove(MARKER_FAILED);
+ this.configuration.save();
+ }
+
+ } catch (Throwable e) {
+ e.printStackTrace();
+ }
+ }
+
+ public boolean restartBackgroundSampler() {
+ if (this.enabled) {
+ startSampler();
+ return true;
+ }
+ return false;
+ }
+
+ private void startSampler() {
+ boolean forceJavaEngine = this.configuration.getString(OPTION_ENGINE, "async").equals("java");
+
+ Sampler sampler = new SamplerBuilder()
+ .background(true)
+ .threadDumper(this.platform.getPlugin().getDefaultThreadDumper())
+ .threadGrouper(ThreadGrouper.BY_POOL)
+ .samplingInterval(this.configuration.getInteger(OPTION_INTERVAL, 10))
+ .forceJavaSampler(forceJavaEngine)
+ .start(this.platform);
+
+ this.platform.getSamplerContainer().setActiveSampler(sampler);
+ }
+
+}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java
index 84f2da1..36a63f1 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java
@@ -23,11 +23,9 @@ package me.lucko.spark.common.sampler;
import me.lucko.spark.common.SparkPlatform;
import me.lucko.spark.common.command.sender.CommandSender;
import me.lucko.spark.common.sampler.node.MergeMode;
-import me.lucko.spark.common.sampler.node.ThreadNode;
-import me.lucko.spark.common.util.ClassSourceLookup;
+import me.lucko.spark.common.sampler.source.ClassSourceLookup;
import me.lucko.spark.proto.SparkSamplerProtos.SamplerData;
-import java.util.Comparator;
import java.util.concurrent.CompletableFuture;
/**
@@ -43,7 +41,7 @@ public interface Sampler {
/**
* Stops the sampler.
*/
- void stop();
+ void stop(boolean cancelled);
/**
* Gets the time when the sampler started (unix timestamp in millis)
@@ -60,6 +58,13 @@ public interface Sampler {
long getAutoEndTime();
/**
+ * If this sampler is running in the background. (wasn't started by a specific user)
+ *
+ * @return true if the sampler is running in the background
+ */
+ boolean isRunningInBackground();
+
+ /**
* Gets a future to encapsulate the completion of the sampler
*
* @return a future
@@ -67,6 +72,6 @@ public interface Sampler {
CompletableFuture<Sampler> getFuture();
// Methods used to export the sampler data to the web viewer.
- SamplerData toProto(SparkPlatform platform, CommandSender creator, Comparator<ThreadNode> outputOrder, String comment, MergeMode mergeMode, ClassSourceLookup classSourceLookup);
+ SamplerData toProto(SparkPlatform platform, CommandSender creator, String comment, MergeMode mergeMode, ClassSourceLookup classSourceLookup);
}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java
index 88b9d91..ec635ef 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java
@@ -38,7 +38,8 @@ public class SamplerBuilder {
private boolean ignoreSleeping = false;
private boolean ignoreNative = false;
private boolean useAsyncProfiler = true;
- private long timeout = -1;
+ private long autoEndTime = -1;
+ private boolean background = false;
private ThreadDumper threadDumper = ThreadDumper.ALL;
private ThreadGrouper threadGrouper = ThreadGrouper.BY_NAME;
@@ -57,7 +58,12 @@ public class SamplerBuilder {
if (timeout <= 0) {
throw new IllegalArgumentException("timeout > 0");
}
- this.timeout = System.currentTimeMillis() + unit.toMillis(timeout);
+ this.autoEndTime = System.currentTimeMillis() + unit.toMillis(timeout);
+ return this;
+ }
+
+ public SamplerBuilder background(boolean background) {
+ this.background = background;
return this;
}
@@ -93,15 +99,24 @@ public class SamplerBuilder {
}
public Sampler start(SparkPlatform platform) {
+ boolean onlyTicksOverMode = this.ticksOver != -1 && this.tickHook != null;
+ boolean canUseAsyncProfiler = this.useAsyncProfiler &&
+ !onlyTicksOverMode &&
+ !(this.ignoreSleeping || this.ignoreNative) &&
+ !(this.threadDumper instanceof ThreadDumper.Regex) &&
+ AsyncProfilerAccess.getInstance(platform).checkSupported(platform);
+
+
int intervalMicros = (int) (this.samplingInterval * 1000d);
+ SamplerSettings settings = new SamplerSettings(intervalMicros, this.threadDumper, this.threadGrouper, this.autoEndTime, this.background);
Sampler sampler;
- if (this.ticksOver != -1 && this.tickHook != null) {
- sampler = new JavaSampler(platform, intervalMicros, this.threadDumper, this.threadGrouper, this.timeout, this.ignoreSleeping, this.ignoreNative, this.tickHook, this.ticksOver);
- } else if (this.useAsyncProfiler && !(this.threadDumper instanceof ThreadDumper.Regex) && AsyncProfilerAccess.INSTANCE.checkSupported(platform)) {
- sampler = new AsyncSampler(platform, intervalMicros, this.threadDumper, this.threadGrouper, this.timeout);
+ if (canUseAsyncProfiler) {
+ sampler = new AsyncSampler(platform, settings);
+ } else if (onlyTicksOverMode) {
+ sampler = new JavaSampler(platform, settings, this.ignoreSleeping, this.ignoreNative, this.tickHook, this.ticksOver);
} else {
- sampler = new JavaSampler(platform, intervalMicros, this.threadDumper, this.threadGrouper, this.timeout, this.ignoreSleeping, this.ignoreNative);
+ sampler = new JavaSampler(platform, settings, this.ignoreSleeping, this.ignoreNative);
}
sampler.start();
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerContainer.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerContainer.java
new file mode 100644
index 0000000..15b1029
--- /dev/null
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerContainer.java
@@ -0,0 +1,76 @@
+/*
+ * This file is part of spark.
+ *
+ * Copyright (c) lucko (Luck) <luck@lucko.me>
+ * Copyright (c) contributors
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+package me.lucko.spark.common.sampler;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Container for the active sampler.
+ */
+public class SamplerContainer implements AutoCloseable {
+
+ private final AtomicReference<Sampler> activeSampler = new AtomicReference<>();
+
+ /**
+ * Gets the active sampler, or null if a sampler is not active.
+ *
+ * @return the active sampler
+ */
+ public Sampler getActiveSampler() {
+ return this.activeSampler.get();
+ }
+
+ /**
+ * Sets the active sampler, throwing an exception if another sampler is already active.
+ *
+ * @param sampler the sampler
+ */
+ public void setActiveSampler(Sampler sampler) {
+ if (!this.activeSampler.compareAndSet(null, sampler)) {
+ throw new IllegalStateException("Attempted to set active sampler when another was already active!");
+ }
+ }
+
+ /**
+ * Unsets the active sampler, if the provided sampler is active.
+ *
+ * @param sampler the sampler
+ */
+ public void unsetActiveSampler(Sampler sampler) {
+ this.activeSampler.compareAndSet(sampler, null);
+ }
+
+ /**
+ * Stops the active sampler, if there is one.
+ */
+ public void stopActiveSampler(boolean cancelled) {
+ Sampler sampler = this.activeSampler.getAndSet(null);
+ if (sampler != null) {
+ sampler.stop(cancelled);
+ }
+ }
+
+ @Override
+ public void close() {
+ stopActiveSampler(true);
+ }
+
+}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerSettings.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerSettings.java
new file mode 100644
index 0000000..6e55a43
--- /dev/null
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerSettings.java
@@ -0,0 +1,61 @@
+/*
+ * This file is part of spark.
+ *
+ * Copyright (c) lucko (Luck) <luck@lucko.me>
+ * Copyright (c) contributors
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+package me.lucko.spark.common.sampler;
+
+/**
+ * Base settings for all samplers
+ */
+public class SamplerSettings {
+
+ private final int interval;
+ private final ThreadDumper threadDumper;
+ private final ThreadGrouper threadGrouper;
+ private final long autoEndTime;
+ private final boolean runningInBackground;
+
+ public SamplerSettings(int interval, ThreadDumper threadDumper, ThreadGrouper threadGrouper, long autoEndTime, boolean runningInBackground) {
+ this.interval = interval;
+ this.threadDumper = threadDumper;
+ this.threadGrouper = threadGrouper;
+ this.autoEndTime = autoEndTime;
+ this.runningInBackground = runningInBackground;
+ }
+
+ public int interval() {
+ return this.interval;
+ }
+
+ public ThreadDumper threadDumper() {
+ return this.threadDumper;
+ }
+
+ public ThreadGrouper threadGrouper() {
+ return this.threadGrouper;
+ }
+
+ public long autoEndTime() {
+ return this.autoEndTime;
+ }
+
+ public boolean runningInBackground() {
+ return this.runningInBackground;
+ }
+}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/AbstractDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/AbstractDataAggregator.java
index ad9dee4..2c003e5 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/AbstractDataAggregator.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/AbstractDataAggregator.java
@@ -27,6 +27,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.IntPredicate;
/**
* Abstract implementation of {@link DataAggregator}.
@@ -52,6 +53,11 @@ public abstract class AbstractDataAggregator implements DataAggregator {
}
@Override
+ public void pruneData(IntPredicate timeWindowPredicate) {
+ this.threadData.values().removeIf(node -> node.removeTimeWindowsRecursively(timeWindowPredicate));
+ }
+
+ @Override
public List<ThreadNode> exportData() {
List<ThreadNode> data = new ArrayList<>(this.threadData.values());
for (ThreadNode node : data) {
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/DataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/DataAggregator.java
index 5590a96..ed33204 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/DataAggregator.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/DataAggregator.java
@@ -24,6 +24,7 @@ import me.lucko.spark.common.sampler.node.ThreadNode;
import me.lucko.spark.proto.SparkSamplerProtos.SamplerMetadata;
import java.util.List;
+import java.util.function.IntPredicate;
/**
* Aggregates sampling data.
@@ -38,6 +39,13 @@ public interface DataAggregator {
List<ThreadNode> exportData();
/**
+ * Prunes windows of data from this aggregator if the given {@code timeWindowPredicate} returns true.
+ *
+ * @param timeWindowPredicate the predicate
+ */
+ void pruneData(IntPredicate timeWindowPredicate);
+
+ /**
* Gets metadata about the data aggregator instance.
*/
SamplerMetadata.DataAggregator getMetadata();
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncDataAggregator.java
index 3de3943..402330a 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncDataAggregator.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncDataAggregator.java
@@ -47,10 +47,10 @@ public class AsyncDataAggregator extends AbstractDataAggregator {
.build();
}
- public void insertData(ProfileSegment element) {
+ public void insertData(ProfileSegment element, int window) {
try {
ThreadNode node = getNode(this.threadGrouper.getGroup(element.getNativeThreadId(), element.getThreadName()));
- node.log(STACK_TRACE_DESCRIBER, element.getStackTrace(), element.getTime());
+ node.log(STACK_TRACE_DESCRIBER, element.getStackTrace(), element.getTime(), window);
} catch (Exception e) {
e.printStackTrace();
}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerAccess.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerAccess.java
index ef2c035..1480650 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerAccess.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerAccess.java
@@ -22,9 +22,9 @@ package me.lucko.spark.common.sampler.async;
import com.google.common.collect.ImmutableTable;
import com.google.common.collect.Table;
+import com.google.common.io.ByteStreams;
import me.lucko.spark.common.SparkPlatform;
-import me.lucko.spark.common.util.TemporaryFiles;
import one.profiler.AsyncProfiler;
import one.profiler.Events;
@@ -32,19 +32,29 @@ import one.profiler.Events;
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
+import java.io.OutputStream;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
-import java.nio.file.StandardCopyOption;
import java.util.Locale;
+import java.util.Objects;
import java.util.logging.Level;
import java.util.stream.Collectors;
/**
* Provides a bridge between spark and async-profiler.
*/
-public enum AsyncProfilerAccess {
- INSTANCE;
+public class AsyncProfilerAccess {
+ private static AsyncProfilerAccess instance;
+
+ // singleton, needs a SparkPlatform for first init
+ public static synchronized AsyncProfilerAccess getInstance(SparkPlatform platform) {
+ if (instance == null) {
+ Objects.requireNonNull(platform, "platform");
+ instance = new AsyncProfilerAccess(platform);
+ }
+ return instance;
+ }
/** An instance of the async-profiler Java API. */
private final AsyncProfiler profiler;
@@ -55,13 +65,13 @@ public enum AsyncProfilerAccess {
/** If profiler is null, contains the reason why setup failed */
private final Exception setupException;
- AsyncProfilerAccess() {
+ AsyncProfilerAccess(SparkPlatform platform) {
AsyncProfiler profiler;
ProfilingEvent profilingEvent = null;
Exception setupException = null;
try {
- profiler = load();
+ profiler = load(platform);
if (isEventSupported(profiler, ProfilingEvent.CPU, false)) {
profilingEvent = ProfilingEvent.CPU;
} else if (isEventSupported(profiler, ProfilingEvent.WALL, true)) {
@@ -77,11 +87,11 @@ public enum AsyncProfilerAccess {
this.setupException = setupException;
}
- public AsyncProfiler getProfiler() {
+ public AsyncProfilerJob startNewProfilerJob() {
if (this.profiler == null) {
throw new UnsupportedOperationException("async-profiler not supported", this.setupException);
}
- return this.profiler;
+ return AsyncProfilerJob.createNew(this, this.profiler);
}
public ProfilingEvent getProfilingEvent() {
@@ -106,7 +116,7 @@ public enum AsyncProfilerAccess {
return this.profiler != null;
}
- private static AsyncProfiler load() throws Exception {
+ private static AsyncProfiler load(SparkPlatform platform) throws Exception {
// check compatibility
String os = System.getProperty("os.name").toLowerCase(Locale.ROOT).replace(" ", "");
String arch = System.getProperty("os.arch").toLowerCase(Locale.ROOT);
@@ -135,10 +145,10 @@ public enum AsyncProfilerAccess {
throw new IllegalStateException("Could not find " + resource + " in spark jar file");
}
- Path extractPath = TemporaryFiles.create("spark-", "-libasyncProfiler.so.tmp");
+ Path extractPath = platform.getTemporaryFiles().create("spark-", "-libasyncProfiler.so.tmp");
- try (InputStream in = profilerResource.openStream()) {
- Files.copy(in, extractPath, StandardCopyOption.REPLACE_EXISTING);
+ try (InputStream in = profilerResource.openStream(); OutputStream out = Files.newOutputStream(extractPath)) {
+ ByteStreams.copy(in, out);
}
// get an instance of async-profiler
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerJob.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerJob.java
new file mode 100644
index 0000000..d74b75f
--- /dev/null
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerJob.java
@@ -0,0 +1,276 @@
+/*
+ * This file is part of spark.
+ *
+ * Copyright (c) lucko (Luck) <luck@lucko.me>
+ * Copyright (c) contributors
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+package me.lucko.spark.common.sampler.async;
+
+import me.lucko.spark.common.SparkPlatform;
+import me.lucko.spark.common.sampler.ThreadDumper;
+import me.lucko.spark.common.sampler.async.jfr.JfrReader;
+
+import one.profiler.AsyncProfiler;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Predicate;
+
+/**
+ * Represents a profiling job within async-profiler.
+ *
+ * <p>Only one job can be running at a time. This is guarded by
+ * {@link #createNew(AsyncProfilerAccess, AsyncProfiler)}.</p>
+ */
+public class AsyncProfilerJob {
+
+ /**
+ * The currently active job.
+ */
+ private static final AtomicReference<AsyncProfilerJob> ACTIVE = new AtomicReference<>();
+
+ /**
+ * Creates a new {@link AsyncProfilerJob}.
+ *
+ * <p>Will throw an {@link IllegalStateException} if another job is already active.</p>
+ *
+ * @param access the profiler access object
+ * @param profiler the profiler
+ * @return the job
+ */
+ static AsyncProfilerJob createNew(AsyncProfilerAccess access, AsyncProfiler profiler) {
+ synchronized (ACTIVE) {
+ AsyncProfilerJob existing = ACTIVE.get();
+ if (existing != null) {
+ throw new IllegalStateException("Another profiler is already active: " + existing);
+ }
+
+ AsyncProfilerJob job = new AsyncProfilerJob(access, profiler);
+ ACTIVE.set(job);
+ return job;
+ }
+ }
+
+ /** The async-profiler access object */
+ private final AsyncProfilerAccess access;
+ /** The async-profiler instance */
+ private final AsyncProfiler profiler;
+
+ // Set on init
+ /** The platform */
+ private SparkPlatform platform;
+ /** The sampling interval in microseconds */
+ private int interval;
+ /** The thread dumper */
+ private ThreadDumper threadDumper;
+ /** The profiling window */
+ private int window;
+ /** If the profiler should run in quiet mode */
+ private boolean quiet;
+
+ /** The file used by async-profiler to output data */
+ private Path outputFile;
+
+ private AsyncProfilerJob(AsyncProfilerAccess access, AsyncProfiler profiler) {
+ this.access = access;
+ this.profiler = profiler;
+ }
+
+ /**
+ * Executes an async-profiler command.
+ *
+ * @param command the command
+ * @return the output
+ */
+ private String execute(String command) {
+ try {
+ return this.profiler.execute(command);
+ } catch (IOException e) {
+ throw new RuntimeException("Exception whilst executing profiler command", e);
+ }
+ }
+
+ /**
+ * Checks to ensure that this job is still active.
+ */
+ private void checkActive() {
+ if (ACTIVE.get() != this) {
+ throw new IllegalStateException("Profiler job no longer active!");
+ }
+ }
+
+ // Initialise the job
+ public void init(SparkPlatform platform, int interval, ThreadDumper threadDumper, int window, boolean quiet) {
+ this.platform = platform;
+ this.interval = interval;
+ this.threadDumper = threadDumper;
+ this.window = window;
+ this.quiet = quiet;
+ }
+
+ /**
+ * Starts the job.
+ */
+ public void start() {
+ checkActive();
+
+ try {
+ // create a new temporary output file
+ try {
+ this.outputFile = this.platform.getTemporaryFiles().create("spark-", "-profile-data.jfr.tmp");
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to create temporary output file", e);
+ }
+
+ // construct a command to send to async-profiler
+ String command = "start,event=" + this.access.getProfilingEvent() + ",interval=" + this.interval + "us,threads,jfr,file=" + this.outputFile.toString();
+ if (this.quiet) {
+ command += ",loglevel=NONE";
+ }
+ if (this.threadDumper instanceof ThreadDumper.Specific) {
+ command += ",filter";
+ }
+
+ // start the profiler
+ String resp = execute(command).trim();
+
+ if (!resp.equalsIgnoreCase("profiling started")) {
+ throw new RuntimeException("Unexpected response: " + resp);
+ }
+
+ // append threads to be profiled, if necessary
+ if (this.threadDumper instanceof ThreadDumper.Specific) {
+ ThreadDumper.Specific threadDumper = (ThreadDumper.Specific) this.threadDumper;
+ for (Thread thread : threadDumper.getThreads()) {
+ this.profiler.addThread(thread);
+ }
+ }
+
+ } catch (Exception e) {
+ try {
+ this.profiler.stop();
+ } catch (Exception e2) {
+ // ignore
+ }
+ close();
+
+ throw e;
+ }
+ }
+
+ /**
+ * Stops the job.
+ */
+ public void stop() {
+ checkActive();
+
+ try {
+ this.profiler.stop();
+ } catch (IllegalStateException e) {
+ if (!e.getMessage().equals("Profiler is not active")) { // ignore
+ throw e;
+ }
+ } finally {
+ close();
+ }
+ }
+
+ /**
+ * Aggregates the collected data.
+ */
+ public void aggregate(AsyncDataAggregator dataAggregator) {
+
+ Predicate<String> threadFilter;
+ if (this.threadDumper instanceof ThreadDumper.Specific) {
+ ThreadDumper.Specific specificDumper = (ThreadDumper.Specific) this.threadDumper;
+ threadFilter = n -> specificDumper.getThreadNames().contains(n.toLowerCase());
+ } else {
+ threadFilter = n -> true;
+ }
+
+ // read the jfr file produced by async-profiler
+ try (JfrReader reader = new JfrReader(this.outputFile)) {
+ readSegments(reader, threadFilter, dataAggregator, this.window);
+ } catch (Exception e) {
+ boolean fileExists;
+ try {
+ fileExists = Files.exists(this.outputFile) && Files.size(this.outputFile) != 0;
+ } catch (IOException ex) {
+ fileExists = false;
+ }
+
+ if (fileExists) {
+ throw new JfrParsingException("Error parsing JFR data from profiler output", e);
+ } else {
+ throw new JfrParsingException("Error parsing JFR data from profiler output - file " + this.outputFile + " does not exist!", e);
+ }
+ }
+
+ deleteOutputFile();
+ }
+
+ public void deleteOutputFile() {
+ try {
+ Files.deleteIfExists(this.outputFile);
+ } catch (IOException e) {
+ // ignore
+ }
+ }
+
+ private void readSegments(JfrReader reader, Predicate<String> threadFilter, AsyncDataAggregator dataAggregator, int window) throws IOException {
+ List<JfrReader.ExecutionSample> samples = reader.readAllEvents(JfrReader.ExecutionSample.class);
+ for (int i = 0; i < samples.size(); i++) {
+ JfrReader.ExecutionSample sample = samples.get(i);
+
+ long duration;
+ if (i == 0) {
+ // we don't really know the duration of the first sample, so just use the sampling
+ // interval
+ duration = this.interval;
+ } else {
+ // calculate the duration of the sample by calculating the time elapsed since the
+ // previous sample
+ duration = TimeUnit.NANOSECONDS.toMicros(sample.time - samples.get(i - 1).time);
+ }
+
+ String threadName = reader.threads.get((long) sample.tid);
+ if (threadName == null) {
+ continue;
+ }
+
+ if (!threadFilter.test(threadName)) {
+ continue;
+ }
+
+ // parse the segment and give it to the data aggregator
+ ProfileSegment segment = ProfileSegment.parseSegment(reader, sample, threadName, duration);
+ dataAggregator.insertData(segment, window);
+ }
+ }
+
+ public int getWindow() {
+ return this.window;
+ }
+
+ private void close() {
+ ACTIVE.compareAndSet(this, null);
+ }
+}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java
index d8288da..178f055 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java
@@ -25,64 +25,43 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
import me.lucko.spark.common.SparkPlatform;
import me.lucko.spark.common.command.sender.CommandSender;
import me.lucko.spark.common.sampler.AbstractSampler;
-import me.lucko.spark.common.sampler.ThreadDumper;
-import me.lucko.spark.common.sampler.ThreadGrouper;
-import me.lucko.spark.common.sampler.async.jfr.JfrReader;
+import me.lucko.spark.common.sampler.SamplerSettings;
import me.lucko.spark.common.sampler.node.MergeMode;
-import me.lucko.spark.common.sampler.node.ThreadNode;
-import me.lucko.spark.common.util.ClassSourceLookup;
-import me.lucko.spark.common.util.TemporaryFiles;
+import me.lucko.spark.common.sampler.source.ClassSourceLookup;
+import me.lucko.spark.common.sampler.window.ProfilingWindowUtils;
+import me.lucko.spark.common.tick.TickHook;
import me.lucko.spark.proto.SparkSamplerProtos.SamplerData;
-import one.profiler.AsyncProfiler;
-
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.util.Comparator;
-import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import java.util.function.Predicate;
+import java.util.function.IntPredicate;
/**
* A sampler implementation using async-profiler.
*/
public class AsyncSampler extends AbstractSampler {
- private final AsyncProfiler profiler;
+ private final AsyncProfilerAccess profilerAccess;
/** Responsible for aggregating and then outputting collected sampling data */
private final AsyncDataAggregator dataAggregator;
- /** Flag to mark if the output has been completed */
- private boolean outputComplete = false;
-
- /** The temporary output file */
- private Path outputFile;
+ /** Mutex for the current profiler job */
+ private final Object[] currentJobMutex = new Object[0];
- /** The executor used for timeouts */
- private ScheduledExecutorService timeoutExecutor;
+ /** Current profiler job */
+ private AsyncProfilerJob currentJob;
- public AsyncSampler(SparkPlatform platform, int interval, ThreadDumper threadDumper, ThreadGrouper threadGrouper, long endTime) {
- super(platform, interval, threadDumper, endTime);
- this.profiler = AsyncProfilerAccess.INSTANCE.getProfiler();
- this.dataAggregator = new AsyncDataAggregator(threadGrouper);
- }
+ /** The executor used for scheduling and management */
+ private ScheduledExecutorService scheduler;
- /**
- * Executes a profiler command.
- *
- * @param command the command to execute
- * @return the response
- */
- private String execute(String command) {
- try {
- return this.profiler.execute(command);
- } catch (IOException e) {
- throw new RuntimeException("Exception whilst executing profiler command", e);
- }
+ public AsyncSampler(SparkPlatform platform, SamplerSettings settings) {
+ super(platform, settings);
+ this.profilerAccess = AsyncProfilerAccess.getInstance(platform);
+ this.dataAggregator = new AsyncDataAggregator(settings.threadGrouper());
+ this.scheduler = Executors.newSingleThreadScheduledExecutor(
+ new ThreadFactoryBuilder().setNameFormat("spark-asyncsampler-worker-thread").build()
+ );
}
/**
@@ -92,33 +71,68 @@ public class AsyncSampler extends AbstractSampler {
public void start() {
super.start();
- try {
- this.outputFile = TemporaryFiles.create("spark-profile-", ".jfr.tmp");
- } catch (IOException e) {
- throw new RuntimeException("Unable to create temporary output file", e);
+ TickHook tickHook = this.platform.getTickHook();
+ if (tickHook != null) {
+ this.windowStatisticsCollector.startCountingTicks(tickHook);
}
- String command = "start,event=" + AsyncProfilerAccess.INSTANCE.getProfilingEvent() + ",interval=" + this.interval + "us,threads,jfr,file=" + this.outputFile.toString();
- if (this.threadDumper instanceof ThreadDumper.Specific) {
- command += ",filter";
- }
+ int window = ProfilingWindowUtils.windowNow();
- String resp = execute(command).trim();
- if (!resp.equalsIgnoreCase("profiling started")) {
- throw new RuntimeException("Unexpected response: " + resp);
- }
+ AsyncProfilerJob job = this.profilerAccess.startNewProfilerJob();
+ job.init(this.platform, this.interval, this.threadDumper, window, this.background);
+ job.start();
+ this.currentJob = job;
- if (this.threadDumper instanceof ThreadDumper.Specific) {
- ThreadDumper.Specific threadDumper = (ThreadDumper.Specific) this.threadDumper;
- for (Thread thread : threadDumper.getThreads()) {
- this.profiler.addThread(thread);
- }
- }
+ // rotate the sampler job to put data into a new window
+ this.scheduler.scheduleAtFixedRate(
+ this::rotateProfilerJob,
+ ProfilingWindowUtils.WINDOW_SIZE_SECONDS,
+ ProfilingWindowUtils.WINDOW_SIZE_SECONDS,
+ TimeUnit.SECONDS
+ );
recordInitialGcStats();
scheduleTimeout();
}
+ private void rotateProfilerJob() {
+ try {
+ synchronized (this.currentJobMutex) {
+ AsyncProfilerJob previousJob = this.currentJob;
+ if (previousJob == null) {
+ return;
+ }
+
+ try {
+ // stop the previous job
+ previousJob.stop();
+
+ // collect statistics for the window
+ this.windowStatisticsCollector.measureNow(previousJob.getWindow());
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ // start a new job
+ int window = previousJob.getWindow() + 1;
+ AsyncProfilerJob newJob = this.profilerAccess.startNewProfilerJob();
+ newJob.init(this.platform, this.interval, this.threadDumper, window, this.background);
+ newJob.start();
+ this.currentJob = newJob;
+
+ // aggregate the output of the previous job
+ previousJob.aggregate(this.dataAggregator);
+
+ // prune data older than the history size
+ IntPredicate predicate = ProfilingWindowUtils.keepHistoryBefore(window);
+ this.dataAggregator.pruneData(predicate);
+ this.windowStatisticsCollector.pruneStatistics(predicate);
+ }
+ } catch (Throwable e) {
+ e.printStackTrace();
+ }
+ }
+
private void scheduleTimeout() {
if (this.autoEndTime == -1) {
return;
@@ -129,12 +143,8 @@ public class AsyncSampler extends AbstractSampler {
return;
}
- this.timeoutExecutor = Executors.newSingleThreadScheduledExecutor(
- new ThreadFactoryBuilder().setNameFormat("spark-asyncsampler-timeout-thread").build()
- );
-
- this.timeoutExecutor.schedule(() -> {
- stop();
+ this.scheduler.schedule(() -> {
+ stop(false);
this.future.complete(this);
}, delay, TimeUnit.MILLISECONDS);
}
@@ -143,129 +153,32 @@ public class AsyncSampler extends AbstractSampler {
* Stops the profiler.
*/
@Override
- public void stop() {
- try {
- this.profiler.stop();
- } catch (IllegalStateException e) {
- if (!e.getMessage().equals("Profiler is not active")) { // ignore
- throw e;
+ public void stop(boolean cancelled) {
+ super.stop(cancelled);
+
+ synchronized (this.currentJobMutex) {
+ this.currentJob.stop();
+ if (!cancelled) {
+ this.windowStatisticsCollector.measureNow(this.currentJob.getWindow());
+ this.currentJob.aggregate(this.dataAggregator);
+ } else {
+ this.currentJob.deleteOutputFile();
}
+ this.currentJob = null;
}
-
- if (this.timeoutExecutor != null) {
- this.timeoutExecutor.shutdown();
- this.timeoutExecutor = null;
+ if (this.scheduler != null) {
+ this.scheduler.shutdown();
+ this.scheduler = null;
}
}
@Override
- public SamplerData toProto(SparkPlatform platform, CommandSender creator, Comparator<ThreadNode> outputOrder, String comment, MergeMode mergeMode, ClassSourceLookup classSourceLookup) {
+ public SamplerData toProto(SparkPlatform platform, CommandSender creator, String comment, MergeMode mergeMode, ClassSourceLookup classSourceLookup) {
SamplerData.Builder proto = SamplerData.newBuilder();
writeMetadataToProto(proto, platform, creator, comment, this.dataAggregator);
- aggregateOutput();
- writeDataToProto(proto, this.dataAggregator, outputOrder, mergeMode, classSourceLookup);
+ writeDataToProto(proto, this.dataAggregator, mergeMode, classSourceLookup);
return proto.build();
}
- private void aggregateOutput() {
- if (this.outputComplete) {
- return;
- }
- this.outputComplete = true;
-
- Predicate<String> threadFilter;
- if (this.threadDumper instanceof ThreadDumper.Specific) {
- ThreadDumper.Specific threadDumper = (ThreadDumper.Specific) this.threadDumper;
- threadFilter = n -> threadDumper.getThreadNames().contains(n.toLowerCase());
- } else {
- threadFilter = n -> true;
- }
-
- // read the jfr file produced by async-profiler
- try (JfrReader reader = new JfrReader(this.outputFile)) {
- readSegments(reader, threadFilter);
- } catch (IOException e) {
- throw new RuntimeException("Read error", e);
- }
-
- // delete the output file after reading
- try {
- Files.deleteIfExists(this.outputFile);
- } catch (IOException e) {
- // ignore
- }
- }
-
- private void readSegments(JfrReader reader, Predicate<String> threadFilter) throws IOException {
- List<JfrReader.ExecutionSample> samples = reader.readAllEvents(JfrReader.ExecutionSample.class);
- for (int i = 0; i < samples.size(); i++) {
- JfrReader.ExecutionSample sample = samples.get(i);
-
- long duration;
- if (i == 0) {
- // we don't really know the duration of the first sample, so just use the sampling
- // interval
- duration = this.interval;
- } else {
- // calculate the duration of the sample by calculating the time elapsed since the
- // previous sample
- duration = TimeUnit.NANOSECONDS.toMicros(sample.time - samples.get(i - 1).time);
- }
-
- String threadName = reader.threads.get(sample.tid);
- if (!threadFilter.test(threadName)) {
- continue;
- }
-
- // parse the segment and give it to the data aggregator
- ProfileSegment segment = parseSegment(reader, sample, threadName, duration);
- this.dataAggregator.insertData(segment);
- }
- }
-
- private static ProfileSegment parseSegment(JfrReader reader, JfrReader.ExecutionSample sample, String threadName, long duration) {
- JfrReader.StackTrace stackTrace = reader.stackTraces.get(sample.stackTraceId);
- int len = stackTrace.methods.length;
-
- AsyncStackTraceElement[] stack = new AsyncStackTraceElement[len];
- for (int i = 0; i < len; i++) {
- stack[i] = parseStackFrame(reader, stackTrace.methods[i]);
- }
-
- return new ProfileSegment(sample.tid, threadName, stack, duration);
- }
-
- private static AsyncStackTraceElement parseStackFrame(JfrReader reader, long methodId) {
- AsyncStackTraceElement result = reader.stackFrames.get(methodId);
- if (result != null) {
- return result;
- }
-
- JfrReader.MethodRef methodRef = reader.methods.get(methodId);
- JfrReader.ClassRef classRef = reader.classes.get(methodRef.cls);
-
- byte[] className = reader.symbols.get(classRef.name);
- byte[] methodName = reader.symbols.get(methodRef.name);
-
- if (className == null || className.length == 0) {
- // native call
- result = new AsyncStackTraceElement(
- AsyncStackTraceElement.NATIVE_CALL,
- new String(methodName, StandardCharsets.UTF_8),
- null
- );
- } else {
- // java method
- byte[] methodDesc = reader.symbols.get(methodRef.sig);
- result = new AsyncStackTraceElement(
- new String(className, StandardCharsets.UTF_8).replace('/', '.'),
- new String(methodName, StandardCharsets.UTF_8),
- new String(methodDesc, StandardCharsets.UTF_8)
- );
- }
-
- reader.stackFrames.put(methodId, result);
- return result;
- }
}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadNodeOrder.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/JfrParsingException.java
index adcedcd..6dab359 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadNodeOrder.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/JfrParsingException.java
@@ -18,35 +18,10 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-package me.lucko.spark.common.sampler;
+package me.lucko.spark.common.sampler.async;
-import me.lucko.spark.common.sampler.node.ThreadNode;
-
-import java.util.Comparator;
-
-/**
- * Methods of ordering {@link ThreadNode}s in the output data.
- */
-public enum ThreadNodeOrder implements Comparator<ThreadNode> {
-
- /**
- * Order by the name of the thread (alphabetically)
- */
- BY_NAME {
- @Override
- public int compare(ThreadNode o1, ThreadNode o2) {
- return o1.getThreadLabel().compareTo(o2.getThreadLabel());
- }
- },
-
- /**
- * Order by the time taken by the thread (most time taken first)
- */
- BY_TIME {
- @Override
- public int compare(ThreadNode o1, ThreadNode o2) {
- return -Double.compare(o1.getTotalTime(), o2.getTotalTime());
- }
+public class JfrParsingException extends RuntimeException {
+ public JfrParsingException(String message, Throwable cause) {
+ super(message, cause);
}
-
}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/ProfileSegment.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/ProfileSegment.java
index 154e6fe..26debaf 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/ProfileSegment.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/ProfileSegment.java
@@ -20,6 +20,10 @@
package me.lucko.spark.common.sampler.async;
+import me.lucko.spark.common.sampler.async.jfr.JfrReader;
+
+import java.nio.charset.StandardCharsets;
+
/**
* Represents a profile "segment".
*
@@ -58,4 +62,50 @@ public class ProfileSegment {
public long getTime() {
return this.time;
}
+
+ public static ProfileSegment parseSegment(JfrReader reader, JfrReader.ExecutionSample sample, String threadName, long duration) {
+ JfrReader.StackTrace stackTrace = reader.stackTraces.get(sample.stackTraceId);
+ int len = stackTrace.methods.length;
+
+ AsyncStackTraceElement[] stack = new AsyncStackTraceElement[len];
+ for (int i = 0; i < len; i++) {
+ stack[i] = parseStackFrame(reader, stackTrace.methods[i]);
+ }
+
+ return new ProfileSegment(sample.tid, threadName, stack, duration);
+ }
+
+ private static AsyncStackTraceElement parseStackFrame(JfrReader reader, long methodId) {
+ AsyncStackTraceElement result = reader.stackFrames.get(methodId);
+ if (result != null) {
+ return result;
+ }
+
+ JfrReader.MethodRef methodRef = reader.methods.get(methodId);
+ JfrReader.ClassRef classRef = reader.classes.get(methodRef.cls);
+
+ byte[] className = reader.symbols.get(classRef.name);
+ byte[] methodName = reader.symbols.get(methodRef.name);
+
+ if (className == null || className.length == 0) {
+ // native call
+ result = new AsyncStackTraceElement(
+ AsyncStackTraceElement.NATIVE_CALL,
+ new String(methodName, StandardCharsets.UTF_8),
+ null
+ );
+ } else {
+ // java method
+ byte[] methodDesc = reader.symbols.get(methodRef.sig);
+ result = new AsyncStackTraceElement(
+ new String(className, StandardCharsets.UTF_8).replace('/', '.'),
+ new String(methodName, StandardCharsets.UTF_8),
+ new String(methodDesc, StandardCharsets.UTF_8)
+ );
+ }
+
+ reader.stackFrames.put(methodId, result);
+ return result;
+ }
+
}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/Dictionary.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/Dictionary.java
index 23223a2..60f6543 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/Dictionary.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/Dictionary.java
@@ -37,6 +37,10 @@ public class Dictionary<T> {
size = 0;
}
+ public int size() {
+ return this.size;
+ }
+
public void put(long key, T value) {
if (key == 0) {
throw new IllegalArgumentException("Zero key not allowed");
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/JfrReader.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/JfrReader.java
index e0cc4e9..ea4985e 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/JfrReader.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/JfrReader.java
@@ -51,7 +51,7 @@ public class JfrReader implements Closeable {
public final Dictionary<JfrClass> types = new Dictionary<>();
public final Map<String, JfrClass> typesByName = new HashMap<>();
- public final Dictionary<String> threads = new Dictionary<>();
+ public final Map<Long, String> threads = new HashMap<>(); // spark
public final Dictionary<ClassRef> classes = new Dictionary<>();
public final Dictionary<byte[]> symbols = new Dictionary<>();
public final Dictionary<MethodRef> methods = new Dictionary<>();
@@ -324,7 +324,7 @@ public class JfrReader implements Closeable {
}
private void readThreads(boolean hasGroup) {
- int count = threads.preallocate(getVarint());
+ int count = getVarint(); //threads.preallocate(getVarint());
for (int i = 0; i < count; i++) {
long id = getVarlong();
String osName = getString();
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaDataAggregator.java
index cc530d6..c51ec05 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaDataAggregator.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaDataAggregator.java
@@ -66,10 +66,11 @@ public abstract class JavaDataAggregator extends AbstractDataAggregator {
* Inserts sampling data into this aggregator
*
* @param threadInfo the thread info
+ * @param window the window
*/
- public abstract void insertData(ThreadInfo threadInfo);
+ public abstract void insertData(ThreadInfo threadInfo, int window);
- protected void writeData(ThreadInfo threadInfo) {
+ protected void writeData(ThreadInfo threadInfo, int window) {
if (this.ignoreSleeping && isSleeping(threadInfo)) {
return;
}
@@ -79,7 +80,7 @@ public abstract class JavaDataAggregator extends AbstractDataAggregator {
try {
ThreadNode node = getNode(this.threadGrouper.getGroup(threadInfo.getThreadId(), threadInfo.getThreadName()));
- node.log(STACK_TRACE_DESCRIBER, threadInfo.getStackTrace(), this.interval);
+ node.log(STACK_TRACE_DESCRIBER, threadInfo.getStackTrace(), this.interval, window);
} catch (Exception e) {
e.printStackTrace();
}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java
index 913faee..72a37e8 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java
@@ -25,23 +25,23 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
import me.lucko.spark.common.SparkPlatform;
import me.lucko.spark.common.command.sender.CommandSender;
import me.lucko.spark.common.sampler.AbstractSampler;
-import me.lucko.spark.common.sampler.ThreadDumper;
-import me.lucko.spark.common.sampler.ThreadGrouper;
+import me.lucko.spark.common.sampler.SamplerSettings;
import me.lucko.spark.common.sampler.node.MergeMode;
-import me.lucko.spark.common.sampler.node.ThreadNode;
+import me.lucko.spark.common.sampler.source.ClassSourceLookup;
+import me.lucko.spark.common.sampler.window.ProfilingWindowUtils;
+import me.lucko.spark.common.sampler.window.WindowStatisticsCollector;
import me.lucko.spark.common.tick.TickHook;
-import me.lucko.spark.common.util.ClassSourceLookup;
import me.lucko.spark.proto.SparkSamplerProtos.SamplerData;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
-import java.util.Comparator;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.IntPredicate;
/**
* A sampler implementation using Java (WarmRoast).
@@ -62,26 +62,47 @@ public class JavaSampler extends AbstractSampler implements Runnable {
/** Responsible for aggregating and then outputting collected sampling data */
private final JavaDataAggregator dataAggregator;
+
+ /** The last window that was profiled */
+ private final AtomicInteger lastWindow = new AtomicInteger();
- public JavaSampler(SparkPlatform platform, int interval, ThreadDumper threadDumper, ThreadGrouper threadGrouper, long endTime, boolean ignoreSleeping, boolean ignoreNative) {
- super(platform, interval, threadDumper, endTime);
- this.dataAggregator = new SimpleDataAggregator(this.workerPool, threadGrouper, interval, ignoreSleeping, ignoreNative);
+ public JavaSampler(SparkPlatform platform, SamplerSettings settings, boolean ignoreSleeping, boolean ignoreNative) {
+ super(platform, settings);
+ this.dataAggregator = new SimpleDataAggregator(this.workerPool, settings.threadGrouper(), settings.interval(), ignoreSleeping, ignoreNative);
}
- public JavaSampler(SparkPlatform platform, int interval, ThreadDumper threadDumper, ThreadGrouper threadGrouper, long endTime, boolean ignoreSleeping, boolean ignoreNative, TickHook tickHook, int tickLengthThreshold) {
- super(platform, interval, threadDumper, endTime);
- this.dataAggregator = new TickedDataAggregator(this.workerPool, threadGrouper, interval, ignoreSleeping, ignoreNative, tickHook, tickLengthThreshold);
+ public JavaSampler(SparkPlatform platform, SamplerSettings settings, boolean ignoreSleeping, boolean ignoreNative, TickHook tickHook, int tickLengthThreshold) {
+ super(platform, settings);
+ this.dataAggregator = new TickedDataAggregator(this.workerPool, settings.threadGrouper(), settings.interval(), ignoreSleeping, ignoreNative, tickHook, tickLengthThreshold);
}
@Override
public void start() {
super.start();
+
+ TickHook tickHook = this.platform.getTickHook();
+ if (tickHook != null) {
+ if (this.dataAggregator instanceof TickedDataAggregator) {
+ WindowStatisticsCollector.ExplicitTickCounter counter = this.windowStatisticsCollector.startCountingTicksExplicit(tickHook);
+ ((TickedDataAggregator) this.dataAggregator).setTickCounter(counter);
+ } else {
+ this.windowStatisticsCollector.startCountingTicks(tickHook);
+ }
+ }
+
this.task = this.workerPool.scheduleAtFixedRate(this, 0, this.interval, TimeUnit.MICROSECONDS);
}
@Override
- public void stop() {
+ public void stop(boolean cancelled) {
+ super.stop(cancelled);
+
this.task.cancel(false);
+
+ if (!cancelled) {
+ // collect statistics for the final window
+ this.windowStatisticsCollector.measureNow(this.lastWindow.get());
+ }
}
@Override
@@ -89,27 +110,30 @@ public class JavaSampler extends AbstractSampler implements Runnable {
// this is effectively synchronized, the worker pool will not allow this task
// to concurrently execute.
try {
- if (this.autoEndTime != -1 && this.autoEndTime <= System.currentTimeMillis()) {
+ long time = System.currentTimeMillis();
+
+ if (this.autoEndTime != -1 && this.autoEndTime <= time) {
+ stop(false);
this.future.complete(this);
- stop();
return;
}
+ int window = ProfilingWindowUtils.unixMillisToWindow(time);
ThreadInfo[] threadDumps = this.threadDumper.dumpThreads(this.threadBean);
- this.workerPool.execute(new InsertDataTask(this.dataAggregator, threadDumps));
+ this.workerPool.execute(new InsertDataTask(threadDumps, window));
} catch (Throwable t) {
+ stop(false);
this.future.completeExceptionally(t);
- stop();
}
}
- private static final class InsertDataTask implements Runnable {
- private final JavaDataAggregator dataAggregator;
+ private final class InsertDataTask implements Runnable {
private final ThreadInfo[] threadDumps;
+ private final int window;
- InsertDataTask(JavaDataAggregator dataAggregator, ThreadInfo[] threadDumps) {
- this.dataAggregator = dataAggregator;
+ InsertDataTask(ThreadInfo[] threadDumps, int window) {
this.threadDumps = threadDumps;
+ this.window = window;
}
@Override
@@ -118,16 +142,29 @@ public class JavaSampler extends AbstractSampler implements Runnable {
if (threadInfo.getThreadName() == null || threadInfo.getStackTrace() == null) {
continue;
}
- this.dataAggregator.insertData(threadInfo);
+ JavaSampler.this.dataAggregator.insertData(threadInfo, this.window);
+ }
+
+ // if we have just stepped over into a new window...
+ int previousWindow = JavaSampler.this.lastWindow.getAndUpdate(previous -> Math.max(this.window, previous));
+ if (previousWindow != 0 && previousWindow != this.window) {
+
+ // collect statistics for the previous window
+ JavaSampler.this.windowStatisticsCollector.measureNow(previousWindow);
+
+ // prune data older than the history size
+ IntPredicate predicate = ProfilingWindowUtils.keepHistoryBefore(this.window);
+ JavaSampler.this.dataAggregator.pruneData(predicate);
+ JavaSampler.this.windowStatisticsCollector.pruneStatistics(predicate);
}
}
}
@Override
- public SamplerData toProto(SparkPlatform platform, CommandSender creator, Comparator<ThreadNode> outputOrder, String comment, MergeMode mergeMode, ClassSourceLookup classSourceLookup) {
+ public SamplerData toProto(SparkPlatform platform, CommandSender creator, String comment, MergeMode mergeMode, ClassSourceLookup classSourceLookup) {
SamplerData.Builder proto = SamplerData.newBuilder();
writeMetadataToProto(proto, platform, creator, comment, this.dataAggregator);
- writeDataToProto(proto, this.dataAggregator, outputOrder, mergeMode, classSourceLookup);
+ writeDataToProto(proto, this.dataAggregator, mergeMode, classSourceLookup);
return proto.build();
}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/SimpleDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/SimpleDataAggregator.java
index 39e21aa..54173fe 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/SimpleDataAggregator.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/SimpleDataAggregator.java
@@ -44,8 +44,8 @@ public class SimpleDataAggregator extends JavaDataAggregator {
}
@Override
- public void insertData(ThreadInfo threadInfo) {
- writeData(threadInfo);
+ public void insertData(ThreadInfo threadInfo, int window) {
+ writeData(threadInfo, window);
}
}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/TickedDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/TickedDataAggregator.java
index e062f31..d537b96 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/TickedDataAggregator.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/TickedDataAggregator.java
@@ -23,6 +23,7 @@ package me.lucko.spark.common.sampler.java;
import me.lucko.spark.common.sampler.ThreadGrouper;
import me.lucko.spark.common.sampler.aggregator.DataAggregator;
import me.lucko.spark.common.sampler.node.ThreadNode;
+import me.lucko.spark.common.sampler.window.WindowStatisticsCollector;
import me.lucko.spark.common.tick.TickHook;
import me.lucko.spark.proto.SparkSamplerProtos.SamplerMetadata;
@@ -31,7 +32,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
/**
* Implementation of {@link DataAggregator} which supports only including sampling data from "ticks"
@@ -48,14 +48,15 @@ public class TickedDataAggregator extends JavaDataAggregator {
/** The expected number of samples in each tick */
private final int expectedSize;
- /** The number of ticks aggregated so far */
- private final AtomicInteger numberOfTicks = new AtomicInteger();
-
- private final Object mutex = new Object();
+ /** Counts the number of ticks aggregated */
+ private WindowStatisticsCollector.ExplicitTickCounter tickCounter;
// state
private int currentTick = -1;
- private TickList currentData = new TickList(0);
+ private TickList currentData = null;
+
+ // guards currentData
+ private final Object mutex = new Object();
public TickedDataAggregator(ExecutorService workerPool, ThreadGrouper threadGrouper, int interval, boolean ignoreSleeping, boolean ignoreNative, TickHook tickHook, int tickLengthThreshold) {
super(workerPool, threadGrouper, interval, ignoreSleeping, ignoreNative);
@@ -66,29 +67,34 @@ public class TickedDataAggregator extends JavaDataAggregator {
this.expectedSize = (int) ((50 / intervalMilliseconds) + 10);
}
+ public void setTickCounter(WindowStatisticsCollector.ExplicitTickCounter tickCounter) {
+ this.tickCounter = tickCounter;
+ }
+
@Override
public SamplerMetadata.DataAggregator getMetadata() {
// push the current tick (so numberOfTicks is accurate)
synchronized (this.mutex) {
pushCurrentTick();
+ this.currentData = null;
}
return SamplerMetadata.DataAggregator.newBuilder()
.setType(SamplerMetadata.DataAggregator.Type.TICKED)
.setThreadGrouper(this.threadGrouper.asProto())
.setTickLengthThreshold(this.tickLengthThreshold)
- .setNumberOfIncludedTicks(this.numberOfTicks.get())
+ .setNumberOfIncludedTicks(this.tickCounter.getTotalCountedTicks())
.build();
}
@Override
- public void insertData(ThreadInfo threadInfo) {
+ public void insertData(ThreadInfo threadInfo, int window) {
synchronized (this.mutex) {
int tick = this.tickHook.getCurrentTick();
- if (this.currentTick != tick) {
+ if (this.currentTick != tick || this.currentData == null) {
pushCurrentTick();
this.currentTick = tick;
- this.currentData = new TickList(this.expectedSize);
+ this.currentData = new TickList(this.expectedSize, window);
}
this.currentData.addData(threadInfo);
@@ -98,6 +104,9 @@ public class TickedDataAggregator extends JavaDataAggregator {
// guarded by 'mutex'
private void pushCurrentTick() {
TickList currentData = this.currentData;
+ if (currentData == null) {
+ return;
+ }
// approximate how long the tick lasted
int tickLengthMicros = currentData.getList().size() * this.interval;
@@ -107,8 +116,8 @@ public class TickedDataAggregator extends JavaDataAggregator {
return;
}
- this.numberOfTicks.incrementAndGet();
this.workerPool.submit(currentData);
+ this.tickCounter.increment();
}
@Override
@@ -121,21 +130,19 @@ public class TickedDataAggregator extends JavaDataAggregator {
return super.exportData();
}
- public int getNumberOfTicks() {
- return this.numberOfTicks.get();
- }
-
private final class TickList implements Runnable {
private final List<ThreadInfo> list;
+ private final int window;
- TickList(int expectedSize) {
+ TickList(int expectedSize, int window) {
this.list = new ArrayList<>(expectedSize);
+ this.window = window;
}
@Override
public void run() {
for (ThreadInfo data : this.list) {
- writeData(data);
+ writeData(data, this.window);
}
}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/node/AbstractNode.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/node/AbstractNode.java
index fd2be8d..163365c 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/node/AbstractNode.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/node/AbstractNode.java
@@ -20,69 +20,81 @@
package me.lucko.spark.common.sampler.node;
+import me.lucko.spark.common.sampler.window.ProtoTimeEncoder;
+
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.LongAdder;
+import java.util.function.IntPredicate;
/**
* Encapsulates a timed node in the sampling stack.
*/
public abstract class AbstractNode {
- private static final int MAX_STACK_DEPTH = 300;
+ protected static final int MAX_STACK_DEPTH = Integer.getInteger("spark.maxStackDepth", 300);
/** A map of the nodes children */
private final Map<StackTraceNode.Description, StackTraceNode> children = new ConcurrentHashMap<>();
/** The accumulated sample time for this node, measured in microseconds */
- private final LongAdder totalTime = new LongAdder();
+ // Integer key = the window (effectively System.currentTimeMillis() / 60_000)
+ // LongAdder value = accumulated time in microseconds
+ private final Map<Integer, LongAdder> times = new ConcurrentHashMap<>();
/**
- * Gets the total sample time logged for this node in milliseconds.
+ * Gets the time accumulator for a given window
*
- * @return the total time
+ * @param window the window
+ * @return the accumulator
*/
- public double getTotalTime() {
- return this.totalTime.longValue() / 1000d;
+ protected LongAdder getTimeAccumulator(int window) {
+ LongAdder adder = this.times.get(window);
+ if (adder == null) {
+ adder = new LongAdder();
+ this.times.put(window, adder);
+ }
+ return adder;
}
- public Collection<StackTraceNode> getChildren() {
- return this.children.values();
+ /**
+ * Gets the time windows that have been logged for this node.
+ *
+ * @return the time windows
+ */
+ public Set<Integer> getTimeWindows() {
+ return this.times.keySet();
}
/**
- * Logs the given stack trace against this node and its children.
+ * Removes time windows from this node if they pass the given {@code predicate} test.
*
- * @param describer the function that describes the elements of the stack
- * @param stack the stack
- * @param time the total time to log
- * @param <T> the stack trace element type
+ * @param predicate the predicate
+ * @return true if any time windows were removed
*/
- public <T> void log(StackTraceNode.Describer<T> describer, T[] stack, long time) {
- if (stack.length == 0) {
- return;
- }
-
- this.totalTime.add(time);
-
- AbstractNode node = this;
- T previousElement = null;
-
- for (int offset = 0; offset < Math.min(MAX_STACK_DEPTH, stack.length); offset++) {
- T element = stack[(stack.length - 1) - offset];
+ public boolean removeTimeWindows(IntPredicate predicate) {
+ return this.times.keySet().removeIf(predicate::test);
+ }
- node = node.resolveChild(describer.describe(element, previousElement));
- node.totalTime.add(time);
+ /**
+ * Gets the encoded total sample times logged for this node in milliseconds.
+ *
+ * @return the total times
+ */
+ protected double[] encodeTimesForProto(ProtoTimeEncoder encoder) {
+ return encoder.encode(this.times);
+ }
- previousElement = element;
- }
+ public Collection<StackTraceNode> getChildren() {
+ return this.children.values();
}
- private StackTraceNode resolveChild(StackTraceNode.Description description) {
+ protected StackTraceNode resolveChild(StackTraceNode.Description description) {
StackTraceNode result = this.children.get(description); // fast path
if (result != null) {
return result;
@@ -96,7 +108,7 @@ public abstract class AbstractNode {
* @param other the other node
*/
protected void merge(AbstractNode other) {
- this.totalTime.add(other.totalTime.longValue());
+ other.times.forEach((key, value) -> getTimeAccumulator(key).add(value.longValue()));
for (Map.Entry<StackTraceNode.Description, StackTraceNode> child : other.children.entrySet()) {
resolveChild(child.getKey()).merge(child.getValue());
}
@@ -123,7 +135,6 @@ public abstract class AbstractNode {
list.add(child);
}
- list.sort(null);
return list;
}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/node/StackTraceNode.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/node/StackTraceNode.java
index b0d9237..c0dcc5b 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/node/StackTraceNode.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/node/StackTraceNode.java
@@ -20,6 +20,7 @@
package me.lucko.spark.common.sampler.node;
+import me.lucko.spark.common.sampler.window.ProtoTimeEncoder;
import me.lucko.spark.common.util.MethodDisambiguator;
import me.lucko.spark.proto.SparkSamplerProtos;
@@ -30,7 +31,7 @@ import java.util.Objects;
/**
* Represents a stack trace element within the {@link AbstractNode node} structure.
*/
-public final class StackTraceNode extends AbstractNode implements Comparable<StackTraceNode> {
+public final class StackTraceNode extends AbstractNode {
/**
* Magic number to denote "no present" line number for a node.
@@ -64,12 +65,16 @@ public final class StackTraceNode extends AbstractNode implements Comparable<Sta
return this.description.parentLineNumber;
}
- public SparkSamplerProtos.StackTraceNode toProto(MergeMode mergeMode) {
+ public SparkSamplerProtos.StackTraceNode toProto(MergeMode mergeMode, ProtoTimeEncoder timeEncoder, Iterable<Integer> childrenRefs) {
SparkSamplerProtos.StackTraceNode.Builder proto = SparkSamplerProtos.StackTraceNode.newBuilder()
- .setTime(getTotalTime())
.setClassName(this.description.className)
.setMethodName(this.description.methodName);
+ double[] times = encodeTimesForProto(timeEncoder);
+ for (double time : times) {
+ proto.addTimes(time);
+ }
+
if (this.description.lineNumber >= 0) {
proto.setLineNumber(this.description.lineNumber);
}
@@ -86,27 +91,11 @@ public final class StackTraceNode extends AbstractNode implements Comparable<Sta
.ifPresent(proto::setMethodDesc);
}
- for (StackTraceNode child : exportChildren(mergeMode)) {
- proto.addChildren(child.toProto(mergeMode));
- }
+ proto.addAllChildrenRefs(childrenRefs);
return proto.build();
}
- @Override
- public int compareTo(StackTraceNode that) {
- if (this == that) {
- return 0;
- }
-
- int i = -Double.compare(this.getTotalTime(), that.getTotalTime());
- if (i != 0) {
- return i;
- }
-
- return this.description.compareTo(that.description);
- }
-
/**
* Function to construct a {@link StackTraceNode.Description} from a stack trace element
* of type {@code T}.
@@ -129,7 +118,7 @@ public final class StackTraceNode extends AbstractNode implements Comparable<Sta
/**
* Encapsulates the attributes of a {@link StackTraceNode}.
*/
- public static final class Description implements Comparable<Description> {
+ public static final class Description {
private final String className;
private final String methodName;
@@ -162,54 +151,6 @@ public final class StackTraceNode extends AbstractNode implements Comparable<Sta
this.hash = Objects.hash(this.className, this.methodName, this.methodDescription);
}
- private static <T extends Comparable<T>> int nullCompare(T a, T b) {
- if (a == null && b == null) {
- return 0;
- } else if (a == null) {
- return -1;
- } else if (b == null) {
- return 1;
- } else {
- return a.compareTo(b);
- }
- }
-
- @Override
- public int compareTo(Description that) {
- if (this == that) {
- return 0;
- }
-
- int i = this.className.compareTo(that.className);
- if (i != 0) {
- return i;
- }
-
- i = this.methodName.compareTo(that.methodName);
- if (i != 0) {
- return i;
- }
-
- i = nullCompare(this.methodDescription, that.methodDescription);
- if (i != 0) {
- return i;
- }
-
- if (this.methodDescription != null && that.methodDescription != null) {
- i = this.methodDescription.compareTo(that.methodDescription);
- if (i != 0) {
- return i;
- }
- }
-
- i = Integer.compare(this.lineNumber, that.lineNumber);
- if (i != 0) {
- return i;
- }
-
- return Integer.compare(this.parentLineNumber, that.parentLineNumber);
- }
-
@Override
public boolean equals(Object o) {
if (this == o) return true;
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/node/ThreadNode.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/node/ThreadNode.java
index ed97443..37ff359 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/node/ThreadNode.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/node/ThreadNode.java
@@ -20,8 +20,19 @@
package me.lucko.spark.common.sampler.node;
+import me.lucko.spark.common.sampler.window.ProtoTimeEncoder;
+import me.lucko.spark.common.util.IndexedListBuilder;
import me.lucko.spark.proto.SparkSamplerProtos;
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.function.IntPredicate;
+
/**
* The root of a sampling stack for a given thread / thread group.
*/
@@ -53,15 +64,162 @@ public final class ThreadNode extends AbstractNode {
this.label = label;
}
- public SparkSamplerProtos.ThreadNode toProto(MergeMode mergeMode) {
+ /**
+ * Logs the given stack trace against this node and its children.
+ *
+ * @param describer the function that describes the elements of the stack
+ * @param stack the stack
+ * @param time the total time to log
+ * @param window the window
+ * @param <T> the stack trace element type
+ */
+ public <T> void log(StackTraceNode.Describer<T> describer, T[] stack, long time, int window) {
+ if (stack.length == 0) {
+ return;
+ }
+
+ getTimeAccumulator(window).add(time);
+
+ AbstractNode node = this;
+ T previousElement = null;
+
+ for (int offset = 0; offset < Math.min(MAX_STACK_DEPTH, stack.length); offset++) {
+ T element = stack[(stack.length - 1) - offset];
+
+ node = node.resolveChild(describer.describe(element, previousElement));
+ node.getTimeAccumulator(window).add(time);
+
+ previousElement = element;
+ }
+ }
+
+ /**
+ * Removes time windows that match the given {@code predicate}.
+ *
+ * @param predicate the predicate to use to test the time windows
+ * @return true if this node is now empty
+ */
+ public boolean removeTimeWindowsRecursively(IntPredicate predicate) {
+ Queue<AbstractNode> queue = new ArrayDeque<>();
+ queue.add(this);
+
+ while (!queue.isEmpty()) {
+ AbstractNode node = queue.remove();
+ Collection<StackTraceNode> children = node.getChildren();
+
+ boolean needToProcessChildren = false;
+
+ for (Iterator<StackTraceNode> it = children.iterator(); it.hasNext(); ) {
+ StackTraceNode child = it.next();
+
+ boolean windowsWereRemoved = child.removeTimeWindows(predicate);
+ boolean childIsNowEmpty = child.getTimeWindows().isEmpty();
+
+ if (childIsNowEmpty) {
+ it.remove();
+ continue;
+ }
+
+ if (windowsWereRemoved) {
+ needToProcessChildren = true;
+ }
+ }
+
+ if (needToProcessChildren) {
+ queue.addAll(children);
+ }
+ }
+
+ removeTimeWindows(predicate);
+ return getTimeWindows().isEmpty();
+ }
+
+ public SparkSamplerProtos.ThreadNode toProto(MergeMode mergeMode, ProtoTimeEncoder timeEncoder) {
SparkSamplerProtos.ThreadNode.Builder proto = SparkSamplerProtos.ThreadNode.newBuilder()
- .setName(getThreadLabel())
- .setTime(getTotalTime());
+ .setName(getThreadLabel());
+
+ double[] times = encodeTimesForProto(timeEncoder);
+ for (double time : times) {
+ proto.addTimes(time);
+ }
+
+ // When converting to a proto, we change the data structure from a recursive tree to an array.
+ // Effectively, instead of:
+ //
+ // {
+ // data: 'one',
+ // children: [
+ // {
+ // data: 'two',
+ // children: [{ data: 'four' }]
+ // },
+ // { data: 'three' }
+ // ]
+ // }
+ //
+ // we transmit:
+ //
+ // [
+ // { data: 'one', children: [1, 2] },
+ // { data: 'two', children: [3] }
+ // { data: 'three', children: [] }
+ // { data: 'four', children: [] }
+ // ]
+ //
+
+ // the flattened array of nodes
+ IndexedListBuilder<SparkSamplerProtos.StackTraceNode> nodesArray = new IndexedListBuilder<>();
+ // Perform a depth-first post order traversal of the tree
+ Deque<Node> stack = new ArrayDeque<>();
+
+ // push the thread node's children to the stack
+ List<Integer> childrenRefs = new LinkedList<>();
for (StackTraceNode child : exportChildren(mergeMode)) {
- proto.addChildren(child.toProto(mergeMode));
+ stack.push(new Node(child, childrenRefs));
+ }
+
+ Node node;
+ while (!stack.isEmpty()) {
+ node = stack.peek();
+
+ // on the first visit, just push this node's children and leave it on the stack
+ if (node.firstVisit) {
+ for (StackTraceNode child : node.stackTraceNode.exportChildren(mergeMode)) {
+ stack.push(new Node(child, node.childrenRefs));
+ }
+ node.firstVisit = false;
+ continue;
+ }
+
+ // convert StackTraceNode to a proto
+ // - at this stage, we have already visited this node's children
+ // - the refs for each child are stored in node.childrenRefs
+ SparkSamplerProtos.StackTraceNode childProto = node.stackTraceNode.toProto(mergeMode, timeEncoder, node.childrenRefs);
+
+ // add the child proto to the nodes array, and record the ref in the parent
+ int childIndex = nodesArray.add(childProto);
+ node.parentChildrenRefs.add(childIndex);
+
+ // pop from the stack
+ stack.pop();
}
+ proto.addAllChildrenRefs(childrenRefs);
+ proto.addAllChildren(nodesArray.build());
+
return proto.build();
}
+
+ private static final class Node {
+ private final StackTraceNode stackTraceNode;
+ private boolean firstVisit = true;
+ private final List<Integer> childrenRefs = new LinkedList<>();
+ private final List<Integer> parentChildrenRefs;
+
+ private Node(StackTraceNode node, List<Integer> parentChildrenRefs) {
+ this.stackTraceNode = node;
+ this.parentChildrenRefs = parentChildrenRefs;
+ }
+ }
}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/source/ClassSourceLookup.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/source/ClassSourceLookup.java
new file mode 100644
index 0000000..ab63c00
--- /dev/null
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/source/ClassSourceLookup.java
@@ -0,0 +1,462 @@
+/*
+ * This file is part of spark.
+ *
+ * Copyright (c) lucko (Luck) <luck@lucko.me>
+ * Copyright (c) contributors
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+package me.lucko.spark.common.sampler.source;
+
+import me.lucko.spark.common.SparkPlatform;
+import me.lucko.spark.common.sampler.node.StackTraceNode;
+import me.lucko.spark.common.sampler.node.ThreadNode;
+import me.lucko.spark.common.util.ClassFinder;
+
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.security.CodeSource;
+import java.security.ProtectionDomain;
+import java.util.ArrayDeque;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Queue;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * A function which defines the source of given {@link Class}es or (Mixin) method calls.
+ */
+public interface ClassSourceLookup {
+
+ /**
+ * Identify the given class.
+ *
+ * @param clazz the class
+ * @return the source of the class
+ */
+ @Nullable String identify(Class<?> clazz) throws Exception;
+
+ /**
+ * Identify the given method call.
+ *
+ * @param methodCall the method call info
+ * @return the source of the method call
+ */
+ default @Nullable String identify(MethodCall methodCall) throws Exception {
+ return null;
+ }
+
+ /**
+ * Identify the given method call.
+ *
+ * @param methodCall the method call info
+ * @return the source of the method call
+ */
+ default @Nullable String identify(MethodCallByLine methodCall) throws Exception {
+ return null;
+ }
+
+ /**
+ * A no-operation {@link ClassSourceLookup}.
+ */
+ ClassSourceLookup NO_OP = new ClassSourceLookup() {
+ @Override
+ public @Nullable String identify(Class<?> clazz) {
+ return null;
+ }
+ };
+
+ static ClassSourceLookup create(SparkPlatform platform) {
+ try {
+ return platform.createClassSourceLookup();
+ } catch (Exception e) {
+ e.printStackTrace();
+ return NO_OP;
+ }
+ }
+
+ /**
+ * A {@link ClassSourceLookup} which identifies classes based on their {@link ClassLoader}.
+ */
+ abstract class ByClassLoader implements ClassSourceLookup {
+
+ public abstract @Nullable String identify(ClassLoader loader) throws Exception;
+
+ @Override
+ public final @Nullable String identify(Class<?> clazz) throws Exception {
+ ClassLoader loader = clazz.getClassLoader();
+ while (loader != null) {
+ String source = identify(loader);
+ if (source != null) {
+ return source;
+ }
+ loader = loader.getParent();
+ }
+ return null;
+ }
+ }
+
+ /**
+ * A {@link ClassSourceLookup} which identifies classes based on URL.
+ */
+ interface ByUrl extends ClassSourceLookup {
+
+ default String identifyUrl(URL url) throws URISyntaxException, MalformedURLException {
+ Path path = null;
+
+ String protocol = url.getProtocol();
+ if (protocol.equals("file")) {
+ path = Paths.get(url.toURI());
+ } else if (protocol.equals("jar")) {
+ URL innerUrl = new URL(url.getPath());
+ path = Paths.get(innerUrl.getPath().split("!")[0]);
+ }
+
+ if (path != null) {
+ return identifyFile(path.toAbsolutePath().normalize());
+ }
+
+ return null;
+ }
+
+ default String identifyFile(Path path) {
+ return identifyFileName(path.getFileName().toString());
+ }
+
+ default String identifyFileName(String fileName) {
+ return fileName.endsWith(".jar") ? fileName.substring(0, fileName.length() - 4) : null;
+ }
+ }
+
+ /**
+ * A {@link ClassSourceLookup} which identifies classes based on the first URL in a {@link URLClassLoader}.
+ */
+ class ByFirstUrlSource extends ClassSourceLookup.ByClassLoader implements ClassSourceLookup.ByUrl {
+ @Override
+ public @Nullable String identify(ClassLoader loader) throws IOException, URISyntaxException {
+ if (loader instanceof URLClassLoader) {
+ URLClassLoader urlClassLoader = (URLClassLoader) loader;
+ URL[] urls = urlClassLoader.getURLs();
+ if (urls.length == 0) {
+ return null;
+ }
+ return identifyUrl(urls[0]);
+ }
+ return null;
+ }
+ }
+
+ /**
+ * A {@link ClassSourceLookup} which identifies classes based on their {@link ProtectionDomain#getCodeSource()}.
+ */
+ class ByCodeSource implements ClassSourceLookup, ClassSourceLookup.ByUrl {
+ @Override
+ public @Nullable String identify(Class<?> clazz) throws URISyntaxException, MalformedURLException {
+ ProtectionDomain protectionDomain = clazz.getProtectionDomain();
+ if (protectionDomain == null) {
+ return null;
+ }
+ CodeSource codeSource = protectionDomain.getCodeSource();
+ if (codeSource == null) {
+ return null;
+ }
+
+ URL url = codeSource.getLocation();
+ return url == null ? null : identifyUrl(url);
+ }
+ }
+
+ interface Visitor {
+ void visit(ThreadNode node);
+
+ boolean hasClassSourceMappings();
+
+ Map<String, String> getClassSourceMapping();
+
+ boolean hasMethodSourceMappings();
+
+ Map<String, String> getMethodSourceMapping();
+
+ boolean hasLineSourceMappings();
+
+ Map<String, String> getLineSourceMapping();
+ }
+
+ static Visitor createVisitor(ClassSourceLookup lookup) {
+ if (lookup == ClassSourceLookup.NO_OP) {
+ return NoOpVisitor.INSTANCE; // don't bother!
+ }
+ return new VisitorImpl(lookup);
+ }
+
+ enum NoOpVisitor implements Visitor {
+ INSTANCE;
+
+ @Override
+ public void visit(ThreadNode node) {
+
+ }
+
+ @Override
+ public boolean hasClassSourceMappings() {
+ return false;
+ }
+
+ @Override
+ public Map<String, String> getClassSourceMapping() {
+ return Collections.emptyMap();
+ }
+
+ @Override
+ public boolean hasMethodSourceMappings() {
+ return false;
+ }
+
+ @Override
+ public Map<String, String> getMethodSourceMapping() {
+ return Collections.emptyMap();
+ }
+
+ @Override
+ public boolean hasLineSourceMappings() {
+ return false;
+ }
+
+ @Override
+ public Map<String, String> getLineSourceMapping() {
+ return Collections.emptyMap();
+ }
+ }
+
+ /**
+ * Visitor which scans {@link StackTraceNode}s and accumulates class/method call identities.
+ */
+ class VisitorImpl implements Visitor {
+ private final ClassSourceLookup lookup;
+ private final ClassFinder classFinder = new ClassFinder();
+
+ private final SourcesMap<String> classSources = new SourcesMap<>(Function.identity());
+ private final SourcesMap<MethodCall> methodSources = new SourcesMap<>(MethodCall::toString);
+ private final SourcesMap<MethodCallByLine> lineSources = new SourcesMap<>(MethodCallByLine::toString);
+
+ VisitorImpl(ClassSourceLookup lookup) {
+ this.lookup = lookup;
+ }
+
+ @Override
+ public void visit(ThreadNode node) {
+ Queue<StackTraceNode> queue = new ArrayDeque<>(node.getChildren());
+ for (StackTraceNode n = queue.poll(); n != null; n = queue.poll()) {
+ visitStackNode(n);
+ queue.addAll(n.getChildren());
+ }
+ }
+
+ private void visitStackNode(StackTraceNode node) {
+ this.classSources.computeIfAbsent(
+ node.getClassName(),
+ className -> {
+ Class<?> clazz = this.classFinder.findClass(className);
+ if (clazz == null) {
+ return null;
+ }
+ return this.lookup.identify(clazz);
+ });
+
+ if (node.getMethodDescription() != null) {
+ MethodCall methodCall = new MethodCall(node.getClassName(), node.getMethodName(), node.getMethodDescription());
+ this.methodSources.computeIfAbsent(methodCall, this.lookup::identify);
+ } else {
+ MethodCallByLine methodCall = new MethodCallByLine(node.getClassName(), node.getMethodName(), node.getLineNumber());
+ this.lineSources.computeIfAbsent(methodCall, this.lookup::identify);
+ }
+ }
+
+ @Override
+ public boolean hasClassSourceMappings() {
+ return this.classSources.hasMappings();
+ }
+
+ @Override
+ public Map<String, String> getClassSourceMapping() {
+ return this.classSources.export();
+ }
+
+ @Override
+ public boolean hasMethodSourceMappings() {
+ return this.methodSources.hasMappings();
+ }
+
+ @Override
+ public Map<String, String> getMethodSourceMapping() {
+ return this.methodSources.export();
+ }
+
+ @Override
+ public boolean hasLineSourceMappings() {
+ return this.lineSources.hasMappings();
+ }
+
+ @Override
+ public Map<String, String> getLineSourceMapping() {
+ return this.lineSources.export();
+ }
+ }
+
+ final class SourcesMap<T> {
+ // <key> --> identifier (plugin name)
+ private final Map<T, String> map = new HashMap<>();
+ private final Function<? super T, String> keyToStringFunction;
+
+ private SourcesMap(Function<? super T, String> keyToStringFunction) {
+ this.keyToStringFunction = keyToStringFunction;
+ }
+
+ public void computeIfAbsent(T key, ComputeSourceFunction<T> function) {
+ if (!this.map.containsKey(key)) {
+ try {
+ this.map.put(key, function.compute(key));
+ } catch (Throwable e) {
+ this.map.put(key, null);
+ }
+ }
+ }
+
+ public boolean hasMappings() {
+ this.map.values().removeIf(Objects::isNull);
+ return !this.map.isEmpty();
+ }
+
+ public Map<String, String> export() {
+ this.map.values().removeIf(Objects::isNull);
+ if (this.keyToStringFunction.equals(Function.identity())) {
+ //noinspection unchecked
+ return (Map<String, String>) this.map;
+ } else {
+ return this.map.entrySet().stream().collect(Collectors.toMap(
+ e -> this.keyToStringFunction.apply(e.getKey()),
+ Map.Entry::getValue
+ ));
+ }
+ }
+
+ private interface ComputeSourceFunction<T> {
+ String compute(T key) throws Exception;
+ }
+ }
+
+ /**
+ * Encapsulates information about a given method call using the name + method description.
+ */
+ final class MethodCall {
+ private final String className;
+ private final String methodName;
+ private final String methodDescriptor;
+
+ public MethodCall(String className, String methodName, String methodDescriptor) {
+ this.className = className;
+ this.methodName = methodName;
+ this.methodDescriptor = methodDescriptor;
+ }
+
+ public String getClassName() {
+ return this.className;
+ }
+
+ public String getMethodName() {
+ return this.methodName;
+ }
+
+ public String getMethodDescriptor() {
+ return this.methodDescriptor;
+ }
+
+ @Override
+ public String toString() {
+ return this.className + ";" + this.methodName + ";" + this.methodDescriptor;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (!(o instanceof MethodCall)) return false;
+ MethodCall that = (MethodCall) o;
+ return this.className.equals(that.className) &&
+ this.methodName.equals(that.methodName) &&
+ this.methodDescriptor.equals(that.methodDescriptor);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(this.className, this.methodName, this.methodDescriptor);
+ }
+ }
+
+ /**
+ * Encapsulates information about a given method call using the name + line number.
+ */
+ final class MethodCallByLine {
+ private final String className;
+ private final String methodName;
+ private final int lineNumber;
+
+ public MethodCallByLine(String className, String methodName, int lineNumber) {
+ this.className = className;
+ this.methodName = methodName;
+ this.lineNumber = lineNumber;
+ }
+
+ public String getClassName() {
+ return this.className;
+ }
+
+ public String getMethodName() {
+ return this.methodName;
+ }
+
+ public int getLineNumber() {
+ return this.lineNumber;
+ }
+
+ @Override
+ public String toString() {
+ return this.className + ";" + this.lineNumber;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (!(o instanceof MethodCallByLine)) return false;
+ MethodCallByLine that = (MethodCallByLine) o;
+ return this.lineNumber == that.lineNumber && this.className.equals(that.className);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(this.className, this.lineNumber);
+ }
+ }
+
+}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/source/SourceMetadata.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/source/SourceMetadata.java
new file mode 100644
index 0000000..0808d66
--- /dev/null
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/source/SourceMetadata.java
@@ -0,0 +1,81 @@
+/*
+ * This file is part of spark.
+ *
+ * Copyright (c) lucko (Luck) <luck@lucko.me>
+ * Copyright (c) contributors
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+package me.lucko.spark.common.sampler.source;
+
+import com.google.common.collect.ImmutableList;
+
+import me.lucko.spark.proto.SparkSamplerProtos.SamplerMetadata;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.function.Function;
+
+/**
+ * A "source" is a plugin or mod on the platform that may be identified
+ * as a source of a method call in a profile.
+ */
+public class SourceMetadata {
+
+ public static <T> List<SourceMetadata> gather(Collection<T> sources, Function<? super T, String> nameFunction, Function<? super T, String> versionFunction, Function<? super T, String> authorFunction) {
+ ImmutableList.Builder<SourceMetadata> builder = ImmutableList.builder();
+
+ for (T source : sources) {
+ String name = nameFunction.apply(source);
+ String version = versionFunction.apply(source);
+ String author = authorFunction.apply(source);
+
+ SourceMetadata metadata = new SourceMetadata(name, version, author);
+ builder.add(metadata);
+ }
+
+ return builder.build();
+ }
+
+ private final String name;
+ private final String version;
+ private final String author;
+
+ public SourceMetadata(String name, String version, String author) {
+ this.name = name;
+ this.version = version;
+ this.author = author;
+ }
+
+ public String getName() {
+ return this.name;
+ }
+
+ public String getVersion() {
+ return this.version;
+ }
+
+ public String getAuthor() {
+ return this.author;
+ }
+
+ public SamplerMetadata.SourceMetadata toProto() {
+ return SamplerMetadata.SourceMetadata.newBuilder()
+ .setName(this.name)
+ .setVersion(this.version)
+ .build();
+ }
+
+}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/window/ProfilingWindowUtils.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/window/ProfilingWindowUtils.java
new file mode 100644
index 0000000..be6f08a
--- /dev/null
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/window/ProfilingWindowUtils.java
@@ -0,0 +1,70 @@
+/*
+ * This file is part of spark.
+ *
+ * Copyright (c) lucko (Luck) <luck@lucko.me>
+ * Copyright (c) contributors
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+package me.lucko.spark.common.sampler.window;
+
+import me.lucko.spark.common.sampler.aggregator.DataAggregator;
+
+import java.util.function.IntPredicate;
+
+public enum ProfilingWindowUtils {
+ ;
+
+ /**
+ * The size/duration of a profiling window in seconds.
+ * (1 window = 1 minute)
+ */
+ public static final int WINDOW_SIZE_SECONDS = 60;
+
+ /**
+ * The number of windows to record in continuous profiling before data is dropped.
+ * (60 windows * 1 minute = 1 hour of profiling data)
+ */
+ public static final int HISTORY_SIZE = Integer.getInteger("spark.continuousProfilingHistorySize", 60);
+
+ /**
+ * Gets the profiling window for the given time in unix-millis.
+ *
+ * @param time the time in milliseconds
+ * @return the window
+ */
+ public static int unixMillisToWindow(long time) {
+ return (int) (time / (WINDOW_SIZE_SECONDS * 1000L));
+ }
+
+ /**
+ * Gets the window at the current time.
+ *
+ * @return the window
+ */
+ public static int windowNow() {
+ return unixMillisToWindow(System.currentTimeMillis());
+ }
+
+ /**
+ * Gets a prune predicate that can be passed to {@link DataAggregator#pruneData(IntPredicate)}.
+ *
+ * @return the prune predicate
+ */
+ public static IntPredicate keepHistoryBefore(int currentWindow) {
+ // windows that were earlier than (currentWindow minus history size) should be pruned
+ return window -> window < (currentWindow - HISTORY_SIZE);
+ }
+}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/window/ProtoTimeEncoder.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/window/ProtoTimeEncoder.java
new file mode 100644
index 0000000..03da075
--- /dev/null
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/window/ProtoTimeEncoder.java
@@ -0,0 +1,93 @@
+/*
+ * This file is part of spark.
+ *
+ * Copyright (c) lucko (Luck) <luck@lucko.me>
+ * Copyright (c) contributors
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+package me.lucko.spark.common.sampler.window;
+
+import me.lucko.spark.common.sampler.async.jfr.Dictionary;
+import me.lucko.spark.common.sampler.node.ThreadNode;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.LongAdder;
+import java.util.stream.IntStream;
+
+/**
+ * Encodes a map of int->double into a double array.
+ */
+public class ProtoTimeEncoder {
+ /** A sorted array of all possible keys to encode */
+ private final int[] keys;
+ /** A map of key value -> index in the keys array */
+ private final Map<Integer, Integer> keysToIndex;
+
+ public ProtoTimeEncoder(List<ThreadNode> sourceData) {
+ // get an array of all keys that show up in the source data
+ this.keys = sourceData.stream()
+ .map(n -> n.getTimeWindows().stream().mapToInt(i -> i))
+ .reduce(IntStream.empty(), IntStream::concat)
+ .distinct()
+ .sorted()
+ .toArray();
+
+ // construct a reverse index lookup
+ this.keysToIndex = new HashMap<>(this.keys.length);
+ for (int i = 0; i < this.keys.length; i++) {
+ this.keysToIndex.put(this.keys[i], i);
+ }
+ }
+
+ /**
+ * Gets an array of the keys that could be encoded by this encoder.
+ *
+ * @return an array of keys
+ */
+ public int[] getKeys() {
+ return this.keys;
+ }
+
+ /**
+ * Encode a {@link Dictionary} (map) of times/durations into a double array.
+ *
+ * @param times a dictionary of times (unix-time millis -> duration in microseconds)
+ * @return the times encoded as a double array
+ */
+ public double[] encode(Map<Integer, LongAdder> times) {
+ // construct an array of values - length needs to exactly match the
+ // number of keys, even if some values are zero.
+ double[] array = new double[this.keys.length];
+
+ times.forEach((key, value) -> {
+ // get the index for the given key
+ Integer idx = this.keysToIndex.get(key);
+ if (idx == null) {
+ throw new RuntimeException("No index for key " + key + " in " + this.keysToIndex.keySet());
+ }
+
+ // convert the duration from microseconds -> milliseconds
+ double durationInMilliseconds = value.longValue() / 1000d;
+
+ // store in the array
+ array[idx] = durationInMilliseconds;
+ });
+
+ return array;
+ }
+}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/window/WindowStatisticsCollector.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/window/WindowStatisticsCollector.java
new file mode 100644
index 0000000..ce65013
--- /dev/null
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/window/WindowStatisticsCollector.java
@@ -0,0 +1,287 @@
+/*
+ * This file is part of spark.
+ *
+ * Copyright (c) lucko (Luck) <luck@lucko.me>
+ * Copyright (c) contributors
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+package me.lucko.spark.common.sampler.window;
+
+import me.lucko.spark.common.SparkPlatform;
+import me.lucko.spark.common.monitor.cpu.CpuMonitor;
+import me.lucko.spark.common.monitor.tick.TickStatistics;
+import me.lucko.spark.common.platform.world.AsyncWorldInfoProvider;
+import me.lucko.spark.common.platform.world.WorldInfoProvider;
+import me.lucko.spark.common.tick.TickHook;
+import me.lucko.spark.common.util.RollingAverage;
+import me.lucko.spark.proto.SparkProtos;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.IntPredicate;
+
+/**
+ * Collects statistics for each profiling window.
+ */
+public class WindowStatisticsCollector {
+ private static final SparkProtos.WindowStatistics ZERO = SparkProtos.WindowStatistics.newBuilder().build();
+
+ /** The platform */
+ private final SparkPlatform platform;
+
+ /** Map of profiling window -> statistics */
+ private final Map<Integer, SparkProtos.WindowStatistics> stats;
+
+ private TickCounter tickCounter;
+
+ public WindowStatisticsCollector(SparkPlatform platform) {
+ this.platform = platform;
+ this.stats = new ConcurrentHashMap<>();
+ }
+
+ /**
+ * Indicates to the statistics collector that it should count the number
+ * of ticks in each window using the provided {@link TickHook}.
+ *
+ * @param hook the tick hook
+ */
+ public void startCountingTicks(TickHook hook) {
+ this.tickCounter = new NormalTickCounter(this.platform, hook);
+ }
+
+ /**
+ * Indicates to the statistics collector that it should count the number
+ * of ticks in each window, according to how many times the
+ * {@link ExplicitTickCounter#increment()} method is called.
+ *
+ * @param hook the tick hook
+ * @return the counter
+ */
+ public ExplicitTickCounter startCountingTicksExplicit(TickHook hook) {
+ ExplicitTickCounter counter = new ExplicitTickCounter(this.platform, hook);
+ this.tickCounter = counter;
+ return counter;
+ }
+
+ public void stop() {
+ if (this.tickCounter != null) {
+ this.tickCounter.stop();
+ }
+ }
+
+ /**
+ * Gets the total number of ticks that have passed between the time
+ * when the profiler started and stopped.
+ *
+ * <p>Importantly, note that this metric is different to the total number of ticks in a window
+ * (which is recorded by {@link SparkProtos.WindowStatistics#getTicks()}) or the total number
+ * of observed ticks if the 'only-ticks-over' aggregator is being used
+ * (which is recorded by {@link SparkProtos.WindowStatistics#getTicks()}
+ * and {@link ExplicitTickCounter#getTotalCountedTicks()}.</p>
+ *
+ * @return the total number of ticks in the profile
+ */
+ public int getTotalTicks() {
+ return this.tickCounter == null ? -1 : this.tickCounter.getTotalTicks();
+ }
+
+ /**
+ * Measures statistics for the given window if none have been recorded yet.
+ *
+ * @param window the window
+ */
+ public void measureNow(int window) {
+ this.stats.computeIfAbsent(window, w -> measure());
+ }
+
+ /**
+ * Ensures that the exported map has statistics (even if they are zeroed) for all windows.
+ *
+ * @param windows the expected windows
+ */
+ public void ensureHasStatisticsForAllWindows(int[] windows) {
+ for (int window : windows) {
+ this.stats.computeIfAbsent(window, w -> ZERO);
+ }
+ }
+
+ public void pruneStatistics(IntPredicate predicate) {
+ this.stats.keySet().removeIf(predicate::test);
+ }
+
+ public Map<Integer, SparkProtos.WindowStatistics> export() {
+ return this.stats;
+ }
+
+ /**
+ * Measures current statistics, where possible averaging over the last minute. (1 min = 1 window)
+ *
+ * @return the current statistics
+ */
+ private SparkProtos.WindowStatistics measure() {
+ SparkProtos.WindowStatistics.Builder builder = SparkProtos.WindowStatistics.newBuilder();
+
+ TickStatistics tickStatistics = this.platform.getTickStatistics();
+ if (tickStatistics != null) {
+ builder.setTps(tickStatistics.tps1Min());
+
+ RollingAverage mspt = tickStatistics.duration1Min();
+ if (mspt != null) {
+ builder.setMsptMedian(mspt.median());
+ builder.setMsptMax(mspt.max());
+ }
+ }
+
+ if (this.tickCounter != null) {
+ int ticks = this.tickCounter.getCountedTicksThisWindowAndReset();
+ builder.setTicks(ticks);
+ }
+
+ builder.setCpuProcess(CpuMonitor.processLoad1MinAvg());
+ builder.setCpuSystem(CpuMonitor.systemLoad1MinAvg());
+
+ try {
+ AsyncWorldInfoProvider worldInfoProvider = new AsyncWorldInfoProvider(this.platform, this.platform.getPlugin().createWorldInfoProvider());
+ WorldInfoProvider.CountsResult counts = worldInfoProvider.getCounts();
+ if (counts != null) {
+ builder.setPlayers(counts.players());
+ builder.setEntities(counts.entities());
+ builder.setTileEntities(counts.tileEntities());
+ builder.setChunks(counts.chunks());
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ return builder.build();
+ }
+
+ /**
+ * Responsible for counting the number of ticks in a profile/window.
+ */
+ public interface TickCounter {
+
+ /**
+ * Stop the counter.
+ */
+ void stop();
+
+ /**
+ * Get the total number of ticks.
+ *
+ * <p>See {@link WindowStatisticsCollector#getTotalTicks()} for a longer explanation
+ * of what this means exactly.</p>
+ *
+ * @return the total ticks
+ */
+ int getTotalTicks();
+
+ /**
+ * Gets the total number of ticks counted in the last window,
+ * and resets the counter to zero.
+ *
+ * @return the number of ticks counted since the last time this method was called
+ */
+ int getCountedTicksThisWindowAndReset();
+ }
+
+ private static abstract class BaseTickCounter implements TickCounter {
+ protected final SparkPlatform platform;
+ protected final TickHook tickHook;
+
+ /** The game tick when sampling first began */
+ private final int startTick;
+
+ /** The game tick when sampling stopped */
+ private int stopTick = -1;
+
+ BaseTickCounter(SparkPlatform platform, TickHook tickHook) {
+ this.platform = platform;
+ this.tickHook = tickHook;
+ this.startTick = this.tickHook.getCurrentTick();
+ }
+
+ @Override
+ public void stop() {
+ this.stopTick = this.tickHook.getCurrentTick();
+ }
+
+ @Override
+ public int getTotalTicks() {
+ if (this.startTick == -1) {
+ throw new IllegalStateException("start tick not recorded");
+ }
+ if (this.stopTick == -1) {
+ throw new IllegalStateException("stop tick not recorded");
+ }
+
+ return this.stopTick - this.startTick;
+ }
+ }
+
+ /**
+ * Counts the number of ticks in a window using a {@link TickHook}.
+ */
+ public static final class NormalTickCounter extends BaseTickCounter {
+ private int last;
+
+ NormalTickCounter(SparkPlatform platform, TickHook tickHook) {
+ super(platform, tickHook);
+ this.last = this.tickHook.getCurrentTick();
+ }
+
+ @Override
+ public int getCountedTicksThisWindowAndReset() {
+ synchronized (this) {
+ int now = this.tickHook.getCurrentTick();
+ int ticks = now - this.last;
+ this.last = now;
+ return ticks;
+ }
+ }
+ }
+
+ /**
+ * Counts the number of ticks in a window according to the number of times
+ * {@link #increment()} is called.
+ *
+ * Used by the {@link me.lucko.spark.common.sampler.java.TickedDataAggregator}.
+ */
+ public static final class ExplicitTickCounter extends BaseTickCounter {
+ private final AtomicInteger counted = new AtomicInteger();
+ private final AtomicInteger total = new AtomicInteger();
+
+ ExplicitTickCounter(SparkPlatform platform, TickHook tickHook) {
+ super(platform, tickHook);
+ }
+
+ public void increment() {
+ this.counted.incrementAndGet();
+ this.total.incrementAndGet();
+ }
+
+ public int getTotalCountedTicks() {
+ return this.total.get();
+ }
+
+ @Override
+ public int getCountedTicksThisWindowAndReset() {
+ return this.counted.getAndSet(0);
+ }
+ }
+
+}