aboutsummaryrefslogtreecommitdiff
path: root/spark-common/src/main/java/me/lucko/spark/common/sampler
diff options
context:
space:
mode:
Diffstat (limited to 'spark-common/src/main/java/me/lucko/spark/common/sampler')
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/AbstractSampler.java56
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/BackgroundSamplerManager.java22
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java83
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java39
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerMode.java74
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadDumper.java89
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadGrouper.java60
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncDataAggregator.java2
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerAccess.java25
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerJob.java67
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java83
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/async/ProfileSegment.java14
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/async/SampleCollector.java154
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java48
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/java/TickedDataAggregator.java11
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/window/ProtoTimeEncoder.java14
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/window/WindowStatisticsCollector.java44
17 files changed, 725 insertions, 160 deletions
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/AbstractSampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/AbstractSampler.java
index e324fd3..d814002 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/AbstractSampler.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/AbstractSampler.java
@@ -32,9 +32,12 @@ import me.lucko.spark.common.sampler.source.ClassSourceLookup;
import me.lucko.spark.common.sampler.source.SourceMetadata;
import me.lucko.spark.common.sampler.window.ProtoTimeEncoder;
import me.lucko.spark.common.sampler.window.WindowStatisticsCollector;
+import me.lucko.spark.common.ws.ViewerSocket;
+import me.lucko.spark.proto.SparkProtos;
import me.lucko.spark.proto.SparkSamplerProtos.SamplerData;
import me.lucko.spark.proto.SparkSamplerProtos.SamplerMetadata;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
@@ -74,6 +77,9 @@ public abstract class AbstractSampler implements Sampler {
/** The garbage collector statistics when profiling started */
protected Map<String, GarbageCollectorStatistics> initialGcStats;
+ /** A set of viewer sockets linked to the sampler */
+ protected List<ViewerSocket> viewerSockets = new ArrayList<>();
+
protected AbstractSampler(SparkPlatform platform, SamplerSettings settings) {
this.platform = platform;
this.interval = settings.interval();
@@ -122,12 +128,54 @@ public abstract class AbstractSampler implements Sampler {
@Override
public void stop(boolean cancelled) {
this.windowStatisticsCollector.stop();
+ for (ViewerSocket viewerSocket : this.viewerSockets) {
+ viewerSocket.processSamplerStopped(this);
+ }
+ }
+
+ @Override
+ public void attachSocket(ViewerSocket socket) {
+ this.viewerSockets.add(socket);
+ }
+
+ @Override
+ public Collection<ViewerSocket> getAttachedSockets() {
+ return this.viewerSockets;
+ }
+
+ protected void processWindowRotate() {
+ this.viewerSockets.removeIf(socket -> {
+ if (!socket.isOpen()) {
+ return true;
+ }
+
+ socket.processWindowRotate(this);
+ return false;
+ });
+ }
+
+ protected void sendStatisticsToSocket() {
+ try {
+ if (this.viewerSockets.isEmpty()) {
+ return;
+ }
+
+ SparkProtos.PlatformStatistics platform = this.platform.getStatisticsProvider().getPlatformStatistics(getInitialGcStats(), false);
+ SparkProtos.SystemStatistics system = this.platform.getStatisticsProvider().getSystemStatistics();
+
+ for (ViewerSocket viewerSocket : this.viewerSockets) {
+ viewerSocket.sendUpdatedStatistics(platform, system);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
}
- protected void writeMetadataToProto(SamplerData.Builder proto, SparkPlatform platform, CommandSender creator, String comment, DataAggregator dataAggregator) {
+ protected void writeMetadataToProto(SamplerData.Builder proto, SparkPlatform platform, CommandSender.Data creator, String comment, DataAggregator dataAggregator) {
SamplerMetadata.Builder metadata = SamplerMetadata.newBuilder()
+ .setSamplerMode(getMode().asProto())
.setPlatformMetadata(platform.getPlugin().getPlatformInfo().toData().toProto())
- .setCreator(creator.toData().toProto())
+ .setCreator(creator.toProto())
.setStartTime(this.startTime)
.setEndTime(System.currentTimeMillis())
.setInterval(this.interval)
@@ -144,7 +192,7 @@ public abstract class AbstractSampler implements Sampler {
}
try {
- metadata.setPlatformStatistics(platform.getStatisticsProvider().getPlatformStatistics(getInitialGcStats()));
+ metadata.setPlatformStatistics(platform.getStatisticsProvider().getPlatformStatistics(getInitialGcStats(), true));
} catch (Exception e) {
e.printStackTrace();
}
@@ -187,7 +235,7 @@ public abstract class AbstractSampler implements Sampler {
ClassSourceLookup.Visitor classSourceVisitor = ClassSourceLookup.createVisitor(classSourceLookup);
- ProtoTimeEncoder timeEncoder = new ProtoTimeEncoder(data);
+ ProtoTimeEncoder timeEncoder = new ProtoTimeEncoder(getMode().valueTransformer(), data);
int[] timeWindows = timeEncoder.getKeys();
for (int timeWindow : timeWindows) {
proto.addTimeWindows(timeWindow);
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/BackgroundSamplerManager.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/BackgroundSamplerManager.java
index 7e3b6b4..4e9ca9e 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/BackgroundSamplerManager.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/BackgroundSamplerManager.java
@@ -31,6 +31,8 @@ public class BackgroundSamplerManager {
private static final String OPTION_ENABLED = "backgroundProfiler";
private static final String OPTION_ENGINE = "backgroundProfilerEngine";
private static final String OPTION_INTERVAL = "backgroundProfilerInterval";
+ private static final String OPTION_THREAD_GROUPER = "backgroundProfilerThreadGrouper";
+ private static final String OPTION_THREAD_DUMPER = "backgroundProfilerThreadDumper";
private static final String MARKER_FAILED = "_marker_background_profiler_failed";
@@ -101,13 +103,21 @@ public class BackgroundSamplerManager {
private void startSampler() {
boolean forceJavaEngine = this.configuration.getString(OPTION_ENGINE, "async").equals("java");
+ ThreadGrouper threadGrouper = ThreadGrouper.parseConfigSetting(this.configuration.getString(OPTION_THREAD_GROUPER, "by-pool"));
+ ThreadDumper threadDumper = ThreadDumper.parseConfigSetting(this.configuration.getString(OPTION_THREAD_DUMPER, "default"));
+ if (threadDumper == null) {
+ threadDumper = this.platform.getPlugin().getDefaultThreadDumper();
+ }
+
+ int interval = this.configuration.getInteger(OPTION_INTERVAL, 10);
+
Sampler sampler = new SamplerBuilder()
- .background(true)
- .threadDumper(this.platform.getPlugin().getDefaultThreadDumper())
- .threadGrouper(ThreadGrouper.BY_POOL)
- .samplingInterval(this.configuration.getInteger(OPTION_INTERVAL, 10))
- .forceJavaSampler(forceJavaEngine)
- .start(this.platform);
+ .background(true)
+ .threadDumper(threadDumper)
+ .threadGrouper(threadGrouper)
+ .samplingInterval(interval)
+ .forceJavaSampler(forceJavaEngine)
+ .start(this.platform);
this.platform.getSamplerContainer().setActiveSampler(sampler);
}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java
index 36a63f1..844ab0b 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java
@@ -24,9 +24,13 @@ import me.lucko.spark.common.SparkPlatform;
import me.lucko.spark.common.command.sender.CommandSender;
import me.lucko.spark.common.sampler.node.MergeMode;
import me.lucko.spark.common.sampler.source.ClassSourceLookup;
+import me.lucko.spark.common.ws.ViewerSocket;
import me.lucko.spark.proto.SparkSamplerProtos.SamplerData;
+import me.lucko.spark.proto.SparkSamplerProtos.SocketChannelInfo;
+import java.util.Collection;
import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
/**
* Abstract superinterface for all sampler implementations.
@@ -44,6 +48,20 @@ public interface Sampler {
void stop(boolean cancelled);
/**
+ * Attaches a viewer socket to this sampler.
+ *
+ * @param socket the socket
+ */
+ void attachSocket(ViewerSocket socket);
+
+ /**
+ * Gets the sockets attached to this sampler.
+ *
+ * @return the attached sockets
+ */
+ Collection<ViewerSocket> getAttachedSockets();
+
+ /**
* Gets the time when the sampler started (unix timestamp in millis)
*
* @return the start time
@@ -65,6 +83,13 @@ public interface Sampler {
boolean isRunningInBackground();
/**
+ * Gets the sampler mode.
+ *
+ * @return the sampler mode
+ */
+ SamplerMode getMode();
+
+ /**
* Gets a future to encapsulate the completion of the sampler
*
* @return a future
@@ -72,6 +97,62 @@ public interface Sampler {
CompletableFuture<Sampler> getFuture();
// Methods used to export the sampler data to the web viewer.
- SamplerData toProto(SparkPlatform platform, CommandSender creator, String comment, MergeMode mergeMode, ClassSourceLookup classSourceLookup);
+ SamplerData toProto(SparkPlatform platform, ExportProps exportProps);
+
+ final class ExportProps {
+ private CommandSender.Data creator;
+ private String comment;
+ private Supplier<MergeMode> mergeMode;
+ private Supplier<ClassSourceLookup> classSourceLookup;
+ private SocketChannelInfo channelInfo;
+
+ public ExportProps() {
+ }
+
+ public CommandSender.Data creator() {
+ return this.creator;
+ }
+
+ public String comment() {
+ return this.comment;
+ }
+
+ public Supplier<MergeMode> mergeMode() {
+ return this.mergeMode;
+ }
+
+ public Supplier<ClassSourceLookup> classSourceLookup() {
+ return this.classSourceLookup;
+ }
+
+ public SocketChannelInfo channelInfo() {
+ return this.channelInfo;
+ }
+
+ public ExportProps creator(CommandSender.Data creator) {
+ this.creator = creator;
+ return this;
+ }
+
+ public ExportProps comment(String comment) {
+ this.comment = comment;
+ return this;
+ }
+
+ public ExportProps mergeMode(Supplier<MergeMode> mergeMode) {
+ this.mergeMode = mergeMode;
+ return this;
+ }
+
+ public ExportProps classSourceLookup(Supplier<ClassSourceLookup> classSourceLookup) {
+ this.classSourceLookup = classSourceLookup;
+ return this;
+ }
+
+ public ExportProps channelInfo(SocketChannelInfo channelInfo) {
+ this.channelInfo = channelInfo;
+ return this;
+ }
+ }
}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java
index ec635ef..b6895ce 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java
@@ -23,6 +23,7 @@ package me.lucko.spark.common.sampler;
import me.lucko.spark.common.SparkPlatform;
import me.lucko.spark.common.sampler.async.AsyncProfilerAccess;
import me.lucko.spark.common.sampler.async.AsyncSampler;
+import me.lucko.spark.common.sampler.async.SampleCollector;
import me.lucko.spark.common.sampler.java.JavaSampler;
import me.lucko.spark.common.tick.TickHook;
@@ -34,10 +35,12 @@ import java.util.concurrent.TimeUnit;
@SuppressWarnings("UnusedReturnValue")
public class SamplerBuilder {
- private double samplingInterval = 4; // milliseconds
+ private SamplerMode mode = SamplerMode.EXECUTION;
+ private double samplingInterval = -1;
private boolean ignoreSleeping = false;
private boolean ignoreNative = false;
private boolean useAsyncProfiler = true;
+ private boolean allocLiveOnly = false;
private long autoEndTime = -1;
private boolean background = false;
private ThreadDumper threadDumper = ThreadDumper.ALL;
@@ -49,6 +52,11 @@ public class SamplerBuilder {
public SamplerBuilder() {
}
+ public SamplerBuilder mode(SamplerMode mode) {
+ this.mode = mode;
+ return this;
+ }
+
public SamplerBuilder samplingInterval(double samplingInterval) {
this.samplingInterval = samplingInterval;
return this;
@@ -98,21 +106,38 @@ public class SamplerBuilder {
return this;
}
- public Sampler start(SparkPlatform platform) {
+ public SamplerBuilder allocLiveOnly(boolean allocLiveOnly) {
+ this.allocLiveOnly = allocLiveOnly;
+ return this;
+ }
+
+ public Sampler start(SparkPlatform platform) throws UnsupportedOperationException {
+ if (this.samplingInterval <= 0) {
+ throw new IllegalArgumentException("samplingInterval = " + this.samplingInterval);
+ }
+
boolean onlyTicksOverMode = this.ticksOver != -1 && this.tickHook != null;
boolean canUseAsyncProfiler = this.useAsyncProfiler &&
!onlyTicksOverMode &&
!(this.ignoreSleeping || this.ignoreNative) &&
- !(this.threadDumper instanceof ThreadDumper.Regex) &&
AsyncProfilerAccess.getInstance(platform).checkSupported(platform);
+ if (this.mode == SamplerMode.ALLOCATION && (!canUseAsyncProfiler || !AsyncProfilerAccess.getInstance(platform).checkAllocationProfilingSupported(platform))) {
+ throw new UnsupportedOperationException("Allocation profiling is not supported on your system. Check the console for more info.");
+ }
+
+ int interval = (int) (this.mode == SamplerMode.EXECUTION ?
+ this.samplingInterval * 1000d : // convert to microseconds
+ this.samplingInterval
+ );
- int intervalMicros = (int) (this.samplingInterval * 1000d);
- SamplerSettings settings = new SamplerSettings(intervalMicros, this.threadDumper, this.threadGrouper, this.autoEndTime, this.background);
+ SamplerSettings settings = new SamplerSettings(interval, this.threadDumper, this.threadGrouper, this.autoEndTime, this.background);
Sampler sampler;
- if (canUseAsyncProfiler) {
- sampler = new AsyncSampler(platform, settings);
+ if (this.mode == SamplerMode.ALLOCATION) {
+ sampler = new AsyncSampler(platform, settings, new SampleCollector.Allocation(interval, this.allocLiveOnly));
+ } else if (canUseAsyncProfiler) {
+ sampler = new AsyncSampler(platform, settings, new SampleCollector.Execution(interval));
} else if (onlyTicksOverMode) {
sampler = new JavaSampler(platform, settings, this.ignoreSleeping, this.ignoreNative, this.tickHook, this.ticksOver);
} else {
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerMode.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerMode.java
new file mode 100644
index 0000000..f9a6e41
--- /dev/null
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerMode.java
@@ -0,0 +1,74 @@
+/*
+ * This file is part of spark.
+ *
+ * Copyright (c) lucko (Luck) <luck@lucko.me>
+ * Copyright (c) contributors
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+package me.lucko.spark.common.sampler;
+
+import me.lucko.spark.proto.SparkSamplerProtos.SamplerMetadata;
+
+import java.util.function.LongToDoubleFunction;
+
+public enum SamplerMode {
+
+ EXECUTION(
+ value -> {
+ // convert the duration from microseconds -> milliseconds
+ return value / 1000d;
+ },
+ 4, // ms
+ SamplerMetadata.SamplerMode.EXECUTION
+ ),
+
+ ALLOCATION(
+ value -> {
+ // do nothing
+ return value;
+ },
+ 524287, // 512 KiB
+ SamplerMetadata.SamplerMode.ALLOCATION
+ );
+
+ private final LongToDoubleFunction valueTransformer;
+ private final int defaultInterval;
+ private final SamplerMetadata.SamplerMode proto;
+
+ SamplerMode(LongToDoubleFunction valueTransformer, int defaultInterval, SamplerMetadata.SamplerMode proto) {
+ this.valueTransformer = valueTransformer;
+ this.defaultInterval = defaultInterval;
+ this.proto = proto;
+ }
+
+ public LongToDoubleFunction valueTransformer() {
+ return this.valueTransformer;
+ }
+
+ public int defaultInterval() {
+ return this.defaultInterval;
+ }
+
+ /**
+ * Gets the metadata enum instance for this sampler mode.
+ *
+ * @return proto metadata
+ */
+ public SamplerMetadata.SamplerMode asProto() {
+ return this.proto;
+ }
+
+}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadDumper.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadDumper.java
index fd0c413..c68384b 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadDumper.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadDumper.java
@@ -32,7 +32,6 @@ import java.util.Objects;
import java.util.Set;
import java.util.function.Supplier;
import java.util.regex.Pattern;
-import java.util.regex.PatternSyntaxException;
import java.util.stream.Collectors;
/**
@@ -50,11 +49,38 @@ public interface ThreadDumper {
ThreadInfo[] dumpThreads(ThreadMXBean threadBean);
/**
+ * Gets if the given thread should be included in the output.
+ *
+ * @param threadId the thread id
+ * @param threadName the thread name
+ * @return if the thread should be included
+ */
+ boolean isThreadIncluded(long threadId, String threadName);
+
+ /**
* Gets metadata about the thread dumper instance.
*/
SamplerMetadata.ThreadDumper getMetadata();
/**
+ * Creates a new {@link ThreadDumper} by parsing the given config setting.
+ *
+ * @param setting the config setting
+ * @return the thread dumper
+ */
+ static ThreadDumper parseConfigSetting(String setting) {
+ switch (setting) {
+ case "default":
+ return null;
+ case "all":
+ return ALL;
+ default:
+ Set<String> threadNames = Arrays.stream(setting.split(",")).collect(Collectors.toSet());
+ return new ThreadDumper.Specific(threadNames);
+ }
+ }
+
+ /**
* Implementation of {@link ThreadDumper} that generates data for all threads.
*/
ThreadDumper ALL = new ThreadDumper() {
@@ -64,6 +90,11 @@ public interface ThreadDumper {
}
@Override
+ public boolean isThreadIncluded(long threadId, String threadName) {
+ return true;
+ }
+
+ @Override
public SamplerMetadata.ThreadDumper getMetadata() {
return SamplerMetadata.ThreadDumper.newBuilder()
.setType(SamplerMetadata.ThreadDumper.Type.ALL)
@@ -98,7 +129,7 @@ public interface ThreadDumper {
}
public void setThread(Thread thread) {
- this.dumper = new Specific(new long[]{thread.getId()});
+ this.dumper = new Specific(thread);
}
}
@@ -114,10 +145,6 @@ public interface ThreadDumper {
this.ids = new long[]{thread.getId()};
}
- public Specific(long[] ids) {
- this.ids = ids;
- }
-
public Specific(Set<String> names) {
this.threadNamesLowerCase = names.stream().map(String::toLowerCase).collect(Collectors.toSet());
this.ids = new ThreadFinder().getThreads()
@@ -146,6 +173,14 @@ public interface ThreadDumper {
}
@Override
+ public boolean isThreadIncluded(long threadId, String threadName) {
+ if (Arrays.binarySearch(this.ids, threadId) >= 0) {
+ return true;
+ }
+ return getThreadNames().contains(threadName.toLowerCase());
+ }
+
+ @Override
public ThreadInfo[] dumpThreads(ThreadMXBean threadBean) {
return threadBean.getThreadInfo(this.ids, Integer.MAX_VALUE);
}
@@ -169,35 +204,31 @@ public interface ThreadDumper {
public Regex(Set<String> namePatterns) {
this.namePatterns = namePatterns.stream()
- .map(regex -> {
- try {
- return Pattern.compile(regex, Pattern.CASE_INSENSITIVE);
- } catch (PatternSyntaxException e) {
- return null;
- }
- })
- .filter(Objects::nonNull)
+ .map(regex -> Pattern.compile(regex, Pattern.CASE_INSENSITIVE))
.collect(Collectors.toSet());
}
@Override
+ public boolean isThreadIncluded(long threadId, String threadName) {
+ Boolean result = this.cache.get(threadId);
+ if (result != null) {
+ return result;
+ }
+
+ for (Pattern pattern : this.namePatterns) {
+ if (pattern.matcher(threadName).matches()) {
+ this.cache.put(threadId, true);
+ return true;
+ }
+ }
+ this.cache.put(threadId, false);
+ return false;
+ }
+
+ @Override
public ThreadInfo[] dumpThreads(ThreadMXBean threadBean) {
return this.threadFinder.getThreads()
- .filter(thread -> {
- Boolean result = this.cache.get(thread.getId());
- if (result != null) {
- return result;
- }
-
- for (Pattern pattern : this.namePatterns) {
- if (pattern.matcher(thread.getName()).matches()) {
- this.cache.put(thread.getId(), true);
- return true;
- }
- }
- this.cache.put(thread.getId(), false);
- return false;
- })
+ .filter(thread -> isThreadIncluded(thread.getId(), thread.getName()))
.map(thread -> threadBean.getThreadInfo(thread.getId(), Integer.MAX_VALUE))
.filter(Objects::nonNull)
.toArray(ThreadInfo[]::new);
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadGrouper.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadGrouper.java
index 9ad84df..b6cfbea 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadGrouper.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadGrouper.java
@@ -35,6 +35,47 @@ import java.util.regex.Pattern;
public interface ThreadGrouper {
/**
+ * Gets the group for the given thread.
+ *
+ * @param threadId the id of the thread
+ * @param threadName the name of the thread
+ * @return the group
+ */
+ String getGroup(long threadId, String threadName);
+
+ /**
+ * Gets the label to use for a given group.
+ *
+ * @param group the group
+ * @return the label
+ */
+ String getLabel(String group);
+
+ /**
+ * Gets the metadata enum instance for this thread grouper.
+ *
+ * @return proto metadata
+ */
+ SamplerMetadata.DataAggregator.ThreadGrouper asProto();
+
+ /**
+ * Creates a new {@link ThreadGrouper} by parsing the given config setting.
+ *
+ * @param setting the config setting
+ * @return the thread grouper
+ */
+ static ThreadGrouper parseConfigSetting(String setting) {
+ switch (setting) {
+ case "as-one":
+ return AS_ONE;
+ case "by-name":
+ return BY_NAME;
+ default:
+ return BY_POOL;
+ }
+ }
+
+ /**
* Implementation of {@link ThreadGrouper} that just groups by thread name.
*/
ThreadGrouper BY_NAME = new ThreadGrouper() {
@@ -126,23 +167,4 @@ public interface ThreadGrouper {
}
};
- /**
- * Gets the group for the given thread.
- *
- * @param threadId the id of the thread
- * @param threadName the name of the thread
- * @return the group
- */
- String getGroup(long threadId, String threadName);
-
- /**
- * Gets the label to use for a given group.
- *
- * @param group the group
- * @return the label
- */
- String getLabel(String group);
-
- SamplerMetadata.DataAggregator.ThreadGrouper asProto();
-
}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncDataAggregator.java
index 402330a..b9a80e0 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncDataAggregator.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncDataAggregator.java
@@ -50,7 +50,7 @@ public class AsyncDataAggregator extends AbstractDataAggregator {
public void insertData(ProfileSegment element, int window) {
try {
ThreadNode node = getNode(this.threadGrouper.getGroup(element.getNativeThreadId(), element.getThreadName()));
- node.log(STACK_TRACE_DESCRIBER, element.getStackTrace(), element.getTime(), window);
+ node.log(STACK_TRACE_DESCRIBER, element.getStackTrace(), element.getValue(), window);
} catch (Exception e) {
e.printStackTrace();
}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerAccess.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerAccess.java
index 1480650..5bee56f 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerAccess.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerAccess.java
@@ -61,6 +61,8 @@ public class AsyncProfilerAccess {
/** The event to use for profiling */
private final ProfilingEvent profilingEvent;
+ /** The event to use for allocation profiling */
+ private final ProfilingEvent allocationProfilingEvent;
/** If profiler is null, contains the reason why setup failed */
private final Exception setupException;
@@ -68,10 +70,16 @@ public class AsyncProfilerAccess {
AsyncProfilerAccess(SparkPlatform platform) {
AsyncProfiler profiler;
ProfilingEvent profilingEvent = null;
+ ProfilingEvent allocationProfilingEvent = null;
Exception setupException = null;
try {
profiler = load(platform);
+
+ if (isEventSupported(profiler, ProfilingEvent.ALLOC, false)) {
+ allocationProfilingEvent = ProfilingEvent.ALLOC;
+ }
+
if (isEventSupported(profiler, ProfilingEvent.CPU, false)) {
profilingEvent = ProfilingEvent.CPU;
} else if (isEventSupported(profiler, ProfilingEvent.WALL, true)) {
@@ -84,6 +92,7 @@ public class AsyncProfilerAccess {
this.profiler = profiler;
this.profilingEvent = profilingEvent;
+ this.allocationProfilingEvent = allocationProfilingEvent;
this.setupException = setupException;
}
@@ -98,6 +107,10 @@ public class AsyncProfilerAccess {
return this.profilingEvent;
}
+ public ProfilingEvent getAllocationProfilingEvent() {
+ return this.allocationProfilingEvent;
+ }
+
public boolean checkSupported(SparkPlatform platform) {
if (this.setupException != null) {
if (this.setupException instanceof UnsupportedSystemException) {
@@ -116,6 +129,15 @@ public class AsyncProfilerAccess {
return this.profiler != null;
}
+ public boolean checkAllocationProfilingSupported(SparkPlatform platform) {
+ boolean supported = this.allocationProfilingEvent != null;
+ if (!supported && this.profiler != null) {
+ platform.getPlugin().log(Level.WARNING, "The allocation profiling mode is not supported on your system. This is most likely because Hotspot debug symbols are not available.");
+ platform.getPlugin().log(Level.WARNING, "To resolve, try installing the 'openjdk-11-dbg' or 'openjdk-8-dbg' package using your OS package manager.");
+ }
+ return supported;
+ }
+
private static AsyncProfiler load(SparkPlatform platform) throws Exception {
// check compatibility
String os = System.getProperty("os.name").toLowerCase(Locale.ROOT).replace(" ", "");
@@ -183,7 +205,8 @@ public class AsyncProfilerAccess {
enum ProfilingEvent {
CPU(Events.CPU),
- WALL(Events.WALL);
+ WALL(Events.WALL),
+ ALLOC(Events.ALLOC);
private final String id;
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerJob.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerJob.java
index d74b75f..2fd304c 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerJob.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerJob.java
@@ -20,6 +20,8 @@
package me.lucko.spark.common.sampler.async;
+import com.google.common.collect.ImmutableList;
+
import me.lucko.spark.common.SparkPlatform;
import me.lucko.spark.common.sampler.ThreadDumper;
import me.lucko.spark.common.sampler.async.jfr.JfrReader;
@@ -29,10 +31,9 @@ import one.profiler.AsyncProfiler;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
+import java.util.Collection;
import java.util.List;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Predicate;
/**
* Represents a profiling job within async-profiler.
@@ -77,8 +78,8 @@ public class AsyncProfilerJob {
// Set on init
/** The platform */
private SparkPlatform platform;
- /** The sampling interval in microseconds */
- private int interval;
+ /** The sample collector */
+ private SampleCollector<?> sampleCollector;
/** The thread dumper */
private ThreadDumper threadDumper;
/** The profiling window */
@@ -100,9 +101,9 @@ public class AsyncProfilerJob {
* @param command the command
* @return the output
*/
- private String execute(String command) {
+ private String execute(Collection<String> command) {
try {
- return this.profiler.execute(command);
+ return this.profiler.execute(String.join(",", command));
} catch (IOException e) {
throw new RuntimeException("Exception whilst executing profiler command", e);
}
@@ -118,9 +119,9 @@ public class AsyncProfilerJob {
}
// Initialise the job
- public void init(SparkPlatform platform, int interval, ThreadDumper threadDumper, int window, boolean quiet) {
+ public void init(SparkPlatform platform, SampleCollector<?> collector, ThreadDumper threadDumper, int window, boolean quiet) {
this.platform = platform;
- this.interval = interval;
+ this.sampleCollector = collector;
this.threadDumper = threadDumper;
this.window = window;
this.quiet = quiet;
@@ -141,16 +142,20 @@ public class AsyncProfilerJob {
}
// construct a command to send to async-profiler
- String command = "start,event=" + this.access.getProfilingEvent() + ",interval=" + this.interval + "us,threads,jfr,file=" + this.outputFile.toString();
+ ImmutableList.Builder<String> command = ImmutableList.<String>builder()
+ .add("start")
+ .addAll(this.sampleCollector.initArguments(this.access))
+ .add("threads").add("jfr").add("file=" + this.outputFile.toString());
+
if (this.quiet) {
- command += ",loglevel=NONE";
+ command.add("loglevel=NONE");
}
if (this.threadDumper instanceof ThreadDumper.Specific) {
- command += ",filter";
+ command.add("filter");
}
// start the profiler
- String resp = execute(command).trim();
+ String resp = execute(command.build()).trim();
if (!resp.equalsIgnoreCase("profiling started")) {
throw new RuntimeException("Unexpected response: " + resp);
@@ -197,18 +202,9 @@ public class AsyncProfilerJob {
* Aggregates the collected data.
*/
public void aggregate(AsyncDataAggregator dataAggregator) {
-
- Predicate<String> threadFilter;
- if (this.threadDumper instanceof ThreadDumper.Specific) {
- ThreadDumper.Specific specificDumper = (ThreadDumper.Specific) this.threadDumper;
- threadFilter = n -> specificDumper.getThreadNames().contains(n.toLowerCase());
- } else {
- threadFilter = n -> true;
- }
-
// read the jfr file produced by async-profiler
try (JfrReader reader = new JfrReader(this.outputFile)) {
- readSegments(reader, threadFilter, dataAggregator, this.window);
+ readSegments(reader, this.sampleCollector, dataAggregator);
} catch (Exception e) {
boolean fileExists;
try {
@@ -235,34 +231,23 @@ public class AsyncProfilerJob {
}
}
- private void readSegments(JfrReader reader, Predicate<String> threadFilter, AsyncDataAggregator dataAggregator, int window) throws IOException {
- List<JfrReader.ExecutionSample> samples = reader.readAllEvents(JfrReader.ExecutionSample.class);
- for (int i = 0; i < samples.size(); i++) {
- JfrReader.ExecutionSample sample = samples.get(i);
-
- long duration;
- if (i == 0) {
- // we don't really know the duration of the first sample, so just use the sampling
- // interval
- duration = this.interval;
- } else {
- // calculate the duration of the sample by calculating the time elapsed since the
- // previous sample
- duration = TimeUnit.NANOSECONDS.toMicros(sample.time - samples.get(i - 1).time);
- }
-
+ private <E extends JfrReader.Event> void readSegments(JfrReader reader, SampleCollector<E> collector, AsyncDataAggregator dataAggregator) throws IOException {
+ List<E> samples = reader.readAllEvents(collector.eventClass());
+ for (E sample : samples) {
String threadName = reader.threads.get((long) sample.tid);
if (threadName == null) {
continue;
}
- if (!threadFilter.test(threadName)) {
+ if (!this.threadDumper.isThreadIncluded(sample.tid, threadName)) {
continue;
}
+ long value = collector.measure(sample);
+
// parse the segment and give it to the data aggregator
- ProfileSegment segment = ProfileSegment.parseSegment(reader, sample, threadName, duration);
- dataAggregator.insertData(segment, window);
+ ProfileSegment segment = ProfileSegment.parseSegment(reader, sample, threadName, value);
+ dataAggregator.insertData(segment, this.window);
}
}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java
index 178f055..961c3e9 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java
@@ -23,17 +23,18 @@ package me.lucko.spark.common.sampler.async;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import me.lucko.spark.common.SparkPlatform;
-import me.lucko.spark.common.command.sender.CommandSender;
import me.lucko.spark.common.sampler.AbstractSampler;
+import me.lucko.spark.common.sampler.SamplerMode;
import me.lucko.spark.common.sampler.SamplerSettings;
-import me.lucko.spark.common.sampler.node.MergeMode;
-import me.lucko.spark.common.sampler.source.ClassSourceLookup;
import me.lucko.spark.common.sampler.window.ProfilingWindowUtils;
import me.lucko.spark.common.tick.TickHook;
+import me.lucko.spark.common.util.SparkThreadFactory;
+import me.lucko.spark.common.ws.ViewerSocket;
import me.lucko.spark.proto.SparkSamplerProtos.SamplerData;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.IntPredicate;
@@ -41,6 +42,11 @@ import java.util.function.IntPredicate;
* A sampler implementation using async-profiler.
*/
public class AsyncSampler extends AbstractSampler {
+
+ /** Function to collect and measure samples - either execution or allocation */
+ private final SampleCollector<?> sampleCollector;
+
+ /** Object that provides access to the async-profiler API */
private final AsyncProfilerAccess profilerAccess;
/** Responsible for aggregating and then outputting collected sampling data */
@@ -55,12 +61,19 @@ public class AsyncSampler extends AbstractSampler {
/** The executor used for scheduling and management */
private ScheduledExecutorService scheduler;
- public AsyncSampler(SparkPlatform platform, SamplerSettings settings) {
+ /** The task to send statistics to the viewer socket */
+ private ScheduledFuture<?> socketStatisticsTask;
+
+ public AsyncSampler(SparkPlatform platform, SamplerSettings settings, SampleCollector<?> collector) {
super(platform, settings);
+ this.sampleCollector = collector;
this.profilerAccess = AsyncProfilerAccess.getInstance(platform);
this.dataAggregator = new AsyncDataAggregator(settings.threadGrouper());
this.scheduler = Executors.newSingleThreadScheduledExecutor(
- new ThreadFactoryBuilder().setNameFormat("spark-asyncsampler-worker-thread").build()
+ new ThreadFactoryBuilder()
+ .setNameFormat("spark-async-sampler-worker-thread")
+ .setUncaughtExceptionHandler(SparkThreadFactory.EXCEPTION_HANDLER)
+ .build()
);
}
@@ -79,17 +92,21 @@ public class AsyncSampler extends AbstractSampler {
int window = ProfilingWindowUtils.windowNow();
AsyncProfilerJob job = this.profilerAccess.startNewProfilerJob();
- job.init(this.platform, this.interval, this.threadDumper, window, this.background);
+ job.init(this.platform, this.sampleCollector, this.threadDumper, window, this.background);
job.start();
+ this.windowStatisticsCollector.recordWindowStartTime(window);
this.currentJob = job;
// rotate the sampler job to put data into a new window
- this.scheduler.scheduleAtFixedRate(
- this::rotateProfilerJob,
- ProfilingWindowUtils.WINDOW_SIZE_SECONDS,
- ProfilingWindowUtils.WINDOW_SIZE_SECONDS,
- TimeUnit.SECONDS
- );
+ boolean shouldNotRotate = this.sampleCollector instanceof SampleCollector.Allocation && ((SampleCollector.Allocation) this.sampleCollector).isLiveOnly();
+ if (!shouldNotRotate) {
+ this.scheduler.scheduleAtFixedRate(
+ this::rotateProfilerJob,
+ ProfilingWindowUtils.WINDOW_SIZE_SECONDS,
+ ProfilingWindowUtils.WINDOW_SIZE_SECONDS,
+ TimeUnit.SECONDS
+ );
+ }
recordInitialGcStats();
scheduleTimeout();
@@ -106,9 +123,6 @@ public class AsyncSampler extends AbstractSampler {
try {
// stop the previous job
previousJob.stop();
-
- // collect statistics for the window
- this.windowStatisticsCollector.measureNow(previousJob.getWindow());
} catch (Exception e) {
e.printStackTrace();
}
@@ -116,10 +130,18 @@ public class AsyncSampler extends AbstractSampler {
// start a new job
int window = previousJob.getWindow() + 1;
AsyncProfilerJob newJob = this.profilerAccess.startNewProfilerJob();
- newJob.init(this.platform, this.interval, this.threadDumper, window, this.background);
+ newJob.init(this.platform, this.sampleCollector, this.threadDumper, window, this.background);
newJob.start();
+ this.windowStatisticsCollector.recordWindowStartTime(window);
this.currentJob = newJob;
+ // collect statistics for the previous window
+ try {
+ this.windowStatisticsCollector.measureNow(previousJob.getWindow());
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
// aggregate the output of the previous job
previousJob.aggregate(this.dataAggregator);
@@ -127,6 +149,8 @@ public class AsyncSampler extends AbstractSampler {
IntPredicate predicate = ProfilingWindowUtils.keepHistoryBefore(window);
this.dataAggregator.pruneData(predicate);
this.windowStatisticsCollector.pruneStatistics(predicate);
+
+ this.scheduler.execute(this::processWindowRotate);
}
} catch (Throwable e) {
e.printStackTrace();
@@ -167,6 +191,10 @@ public class AsyncSampler extends AbstractSampler {
this.currentJob = null;
}
+ if (this.socketStatisticsTask != null) {
+ this.socketStatisticsTask.cancel(false);
+ }
+
if (this.scheduler != null) {
this.scheduler.shutdown();
this.scheduler = null;
@@ -174,10 +202,27 @@ public class AsyncSampler extends AbstractSampler {
}
@Override
- public SamplerData toProto(SparkPlatform platform, CommandSender creator, String comment, MergeMode mergeMode, ClassSourceLookup classSourceLookup) {
+ public void attachSocket(ViewerSocket socket) {
+ super.attachSocket(socket);
+
+ if (this.socketStatisticsTask == null) {
+ this.socketStatisticsTask = this.scheduler.scheduleAtFixedRate(this::sendStatisticsToSocket, 10, 10, TimeUnit.SECONDS);
+ }
+ }
+
+ @Override
+ public SamplerMode getMode() {
+ return this.sampleCollector.getMode();
+ }
+
+ @Override
+ public SamplerData toProto(SparkPlatform platform, ExportProps exportProps) {
SamplerData.Builder proto = SamplerData.newBuilder();
- writeMetadataToProto(proto, platform, creator, comment, this.dataAggregator);
- writeDataToProto(proto, this.dataAggregator, mergeMode, classSourceLookup);
+ if (exportProps.channelInfo() != null) {
+ proto.setChannelInfo(exportProps.channelInfo());
+ }
+ writeMetadataToProto(proto, platform, exportProps.creator(), exportProps.comment(), this.dataAggregator);
+ writeDataToProto(proto, this.dataAggregator, exportProps.mergeMode().get(), exportProps.classSourceLookup().get());
return proto.build();
}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/ProfileSegment.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/ProfileSegment.java
index 26debaf..0804ccf 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/ProfileSegment.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/ProfileSegment.java
@@ -38,13 +38,13 @@ public class ProfileSegment {
/** The stack trace for this segment */
private final AsyncStackTraceElement[] stackTrace;
/** The time spent executing this segment in microseconds */
- private final long time;
+ private final long value;
- public ProfileSegment(int nativeThreadId, String threadName, AsyncStackTraceElement[] stackTrace, long time) {
+ public ProfileSegment(int nativeThreadId, String threadName, AsyncStackTraceElement[] stackTrace, long value) {
this.nativeThreadId = nativeThreadId;
this.threadName = threadName;
this.stackTrace = stackTrace;
- this.time = time;
+ this.value = value;
}
public int getNativeThreadId() {
@@ -59,11 +59,11 @@ public class ProfileSegment {
return this.stackTrace;
}
- public long getTime() {
- return this.time;
+ public long getValue() {
+ return this.value;
}
- public static ProfileSegment parseSegment(JfrReader reader, JfrReader.ExecutionSample sample, String threadName, long duration) {
+ public static ProfileSegment parseSegment(JfrReader reader, JfrReader.Event sample, String threadName, long value) {
JfrReader.StackTrace stackTrace = reader.stackTraces.get(sample.stackTraceId);
int len = stackTrace.methods.length;
@@ -72,7 +72,7 @@ public class ProfileSegment {
stack[i] = parseStackFrame(reader, stackTrace.methods[i]);
}
- return new ProfileSegment(sample.tid, threadName, stack, duration);
+ return new ProfileSegment(sample.tid, threadName, stack, value);
}
private static AsyncStackTraceElement parseStackFrame(JfrReader reader, long methodId) {
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/SampleCollector.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/SampleCollector.java
new file mode 100644
index 0000000..6054b91
--- /dev/null
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/SampleCollector.java
@@ -0,0 +1,154 @@
+/*
+ * This file is part of spark.
+ *
+ * Copyright (c) lucko (Luck) <luck@lucko.me>
+ * Copyright (c) contributors
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+package me.lucko.spark.common.sampler.async;
+
+import com.google.common.collect.ImmutableList;
+
+import me.lucko.spark.common.sampler.SamplerMode;
+import me.lucko.spark.common.sampler.async.AsyncProfilerAccess.ProfilingEvent;
+import me.lucko.spark.common.sampler.async.jfr.JfrReader.AllocationSample;
+import me.lucko.spark.common.sampler.async.jfr.JfrReader.Event;
+import me.lucko.spark.common.sampler.async.jfr.JfrReader.ExecutionSample;
+
+import java.util.Collection;
+import java.util.Objects;
+
+/**
+ * Collects and processes sample events for a given type.
+ *
+ * @param <E> the event type
+ */
+public interface SampleCollector<E extends Event> {
+
+ /**
+ * Gets the arguments to initialise the profiler.
+ *
+ * @param access the async profiler access object
+ * @return the initialisation arguments
+ */
+ Collection<String> initArguments(AsyncProfilerAccess access);
+
+ /**
+ * Gets the event class processed by this sample collector.
+ *
+ * @return the event class
+ */
+ Class<E> eventClass();
+
+ /**
+ * Gets the measurements for a given event
+ *
+ * @param event the event
+ * @return the measurement
+ */
+ long measure(E event);
+
+ /**
+ * Gets the mode for the collector.
+ *
+ * @return the mode
+ */
+ SamplerMode getMode();
+
+ /**
+ * Sample collector for execution (cpu time) profiles.
+ */
+ final class Execution implements SampleCollector<ExecutionSample> {
+ private final int interval; // time in microseconds
+
+ public Execution(int interval) {
+ this.interval = interval;
+ }
+
+ @Override
+ public Collection<String> initArguments(AsyncProfilerAccess access) {
+ ProfilingEvent event = access.getProfilingEvent();
+ Objects.requireNonNull(event, "event");
+
+ return ImmutableList.of(
+ "event=" + event,
+ "interval=" + this.interval + "us"
+ );
+ }
+
+ @Override
+ public Class<ExecutionSample> eventClass() {
+ return ExecutionSample.class;
+ }
+
+ @Override
+ public long measure(ExecutionSample event) {
+ return event.value() * this.interval;
+ }
+
+ @Override
+ public SamplerMode getMode() {
+ return SamplerMode.EXECUTION;
+ }
+ }
+
+ /**
+ * Sample collector for allocation (memory) profiles.
+ */
+ final class Allocation implements SampleCollector<AllocationSample> {
+ private final int intervalBytes;
+ private final boolean liveOnly;
+
+ public Allocation(int intervalBytes, boolean liveOnly) {
+ this.intervalBytes = intervalBytes;
+ this.liveOnly = liveOnly;
+ }
+
+ public boolean isLiveOnly() {
+ return this.liveOnly;
+ }
+
+ @Override
+ public Collection<String> initArguments(AsyncProfilerAccess access) {
+ ProfilingEvent event = access.getAllocationProfilingEvent();
+ Objects.requireNonNull(event, "event");
+
+ ImmutableList.Builder<String> builder = ImmutableList.builder();
+ builder.add("event=" + event);
+ builder.add("alloc=" + this.intervalBytes);
+ if (this.liveOnly) {
+ builder.add("live");
+ }
+ return builder.build();
+ }
+
+ @Override
+ public Class<AllocationSample> eventClass() {
+ return AllocationSample.class;
+ }
+
+ @Override
+ public long measure(AllocationSample event) {
+ return event.value();
+ }
+
+ @Override
+ public SamplerMode getMode() {
+ return SamplerMode.ALLOCATION;
+ }
+ }
+
+}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java
index 72a37e8..e29619b 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java
@@ -23,14 +23,14 @@ package me.lucko.spark.common.sampler.java;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import me.lucko.spark.common.SparkPlatform;
-import me.lucko.spark.common.command.sender.CommandSender;
import me.lucko.spark.common.sampler.AbstractSampler;
+import me.lucko.spark.common.sampler.SamplerMode;
import me.lucko.spark.common.sampler.SamplerSettings;
-import me.lucko.spark.common.sampler.node.MergeMode;
-import me.lucko.spark.common.sampler.source.ClassSourceLookup;
import me.lucko.spark.common.sampler.window.ProfilingWindowUtils;
import me.lucko.spark.common.sampler.window.WindowStatisticsCollector;
import me.lucko.spark.common.tick.TickHook;
+import me.lucko.spark.common.util.SparkThreadFactory;
+import me.lucko.spark.common.ws.ViewerSocket;
import me.lucko.spark.proto.SparkSamplerProtos.SamplerData;
import java.lang.management.ManagementFactory;
@@ -51,12 +51,18 @@ public class JavaSampler extends AbstractSampler implements Runnable {
/** The worker pool for inserting stack nodes */
private final ScheduledExecutorService workerPool = Executors.newScheduledThreadPool(
- 6, new ThreadFactoryBuilder().setNameFormat("spark-worker-" + THREAD_ID.getAndIncrement() + "-%d").build()
+ 6, new ThreadFactoryBuilder()
+ .setNameFormat("spark-java-sampler-" + THREAD_ID.getAndIncrement() + "-%d")
+ .setUncaughtExceptionHandler(SparkThreadFactory.EXCEPTION_HANDLER)
+ .build()
);
/** The main sampling task */
private ScheduledFuture<?> task;
+ /** The task to send statistics to the viewer socket */
+ private ScheduledFuture<?> socketStatisticsTask;
+
/** The thread management interface for the current JVM */
private final ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
@@ -90,6 +96,7 @@ public class JavaSampler extends AbstractSampler implements Runnable {
}
}
+ this.windowStatisticsCollector.recordWindowStartTime(ProfilingWindowUtils.unixMillisToWindow(this.startTime));
this.task = this.workerPool.scheduleAtFixedRate(this, 0, this.interval, TimeUnit.MICROSECONDS);
}
@@ -99,10 +106,16 @@ public class JavaSampler extends AbstractSampler implements Runnable {
this.task.cancel(false);
+ if (this.socketStatisticsTask != null) {
+ this.socketStatisticsTask.cancel(false);
+ }
+
if (!cancelled) {
// collect statistics for the final window
this.windowStatisticsCollector.measureNow(this.lastWindow.get());
}
+
+ this.workerPool.shutdown();
}
@Override
@@ -127,6 +140,15 @@ public class JavaSampler extends AbstractSampler implements Runnable {
}
}
+ @Override
+ public void attachSocket(ViewerSocket socket) {
+ super.attachSocket(socket);
+
+ if (this.socketStatisticsTask == null) {
+ this.socketStatisticsTask = this.workerPool.scheduleAtFixedRate(this::sendStatisticsToSocket, 10, 10, TimeUnit.SECONDS);
+ }
+ }
+
private final class InsertDataTask implements Runnable {
private final ThreadInfo[] threadDumps;
private final int window;
@@ -149,6 +171,9 @@ public class JavaSampler extends AbstractSampler implements Runnable {
int previousWindow = JavaSampler.this.lastWindow.getAndUpdate(previous -> Math.max(this.window, previous));
if (previousWindow != 0 && previousWindow != this.window) {
+ // record the start time for the new window
+ JavaSampler.this.windowStatisticsCollector.recordWindowStartTime(this.window);
+
// collect statistics for the previous window
JavaSampler.this.windowStatisticsCollector.measureNow(previousWindow);
@@ -156,16 +181,25 @@ public class JavaSampler extends AbstractSampler implements Runnable {
IntPredicate predicate = ProfilingWindowUtils.keepHistoryBefore(this.window);
JavaSampler.this.dataAggregator.pruneData(predicate);
JavaSampler.this.windowStatisticsCollector.pruneStatistics(predicate);
+
+ JavaSampler.this.workerPool.execute(JavaSampler.this::processWindowRotate);
}
}
}
@Override
- public SamplerData toProto(SparkPlatform platform, CommandSender creator, String comment, MergeMode mergeMode, ClassSourceLookup classSourceLookup) {
+ public SamplerData toProto(SparkPlatform platform, ExportProps exportProps) {
SamplerData.Builder proto = SamplerData.newBuilder();
- writeMetadataToProto(proto, platform, creator, comment, this.dataAggregator);
- writeDataToProto(proto, this.dataAggregator, mergeMode, classSourceLookup);
+ if (exportProps.channelInfo() != null) {
+ proto.setChannelInfo(exportProps.channelInfo());
+ }
+ writeMetadataToProto(proto, platform, exportProps.creator(), exportProps.comment(), this.dataAggregator);
+ writeDataToProto(proto, this.dataAggregator, exportProps.mergeMode().get(), exportProps.classSourceLookup().get());
return proto.build();
}
+ @Override
+ public SamplerMode getMode() {
+ return SamplerMode.EXECUTION;
+ }
}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/TickedDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/TickedDataAggregator.java
index d537b96..08cb719 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/TickedDataAggregator.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/TickedDataAggregator.java
@@ -30,6 +30,7 @@ import me.lucko.spark.proto.SparkSamplerProtos.SamplerMetadata;
import java.lang.management.ThreadInfo;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
@@ -75,7 +76,7 @@ public class TickedDataAggregator extends JavaDataAggregator {
public SamplerMetadata.DataAggregator getMetadata() {
// push the current tick (so numberOfTicks is accurate)
synchronized (this.mutex) {
- pushCurrentTick();
+ pushCurrentTick(Runnable::run);
this.currentData = null;
}
@@ -92,7 +93,7 @@ public class TickedDataAggregator extends JavaDataAggregator {
synchronized (this.mutex) {
int tick = this.tickHook.getCurrentTick();
if (this.currentTick != tick || this.currentData == null) {
- pushCurrentTick();
+ pushCurrentTick(this.workerPool);
this.currentTick = tick;
this.currentData = new TickList(this.expectedSize, window);
}
@@ -102,7 +103,7 @@ public class TickedDataAggregator extends JavaDataAggregator {
}
// guarded by 'mutex'
- private void pushCurrentTick() {
+ private void pushCurrentTick(Executor executor) {
TickList currentData = this.currentData;
if (currentData == null) {
return;
@@ -116,7 +117,7 @@ public class TickedDataAggregator extends JavaDataAggregator {
return;
}
- this.workerPool.submit(currentData);
+ executor.execute(currentData);
this.tickCounter.increment();
}
@@ -124,7 +125,7 @@ public class TickedDataAggregator extends JavaDataAggregator {
public List<ThreadNode> exportData() {
// push the current tick
synchronized (this.mutex) {
- pushCurrentTick();
+ pushCurrentTick(Runnable::run);
}
return super.exportData();
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/window/ProtoTimeEncoder.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/window/ProtoTimeEncoder.java
index 03da075..fb4a4fc 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/window/ProtoTimeEncoder.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/window/ProtoTimeEncoder.java
@@ -27,18 +27,25 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.LongAdder;
+import java.util.function.LongToDoubleFunction;
import java.util.stream.IntStream;
/**
* Encodes a map of int->double into a double array.
*/
public class ProtoTimeEncoder {
+
+ /** A transformer function to transform the 'time' value from a long to a double */
+ private final LongToDoubleFunction valueTransformer;
+
/** A sorted array of all possible keys to encode */
private final int[] keys;
/** A map of key value -> index in the keys array */
private final Map<Integer, Integer> keysToIndex;
- public ProtoTimeEncoder(List<ThreadNode> sourceData) {
+ public ProtoTimeEncoder(LongToDoubleFunction valueTransformer, List<ThreadNode> sourceData) {
+ this.valueTransformer = valueTransformer;
+
// get an array of all keys that show up in the source data
this.keys = sourceData.stream()
.map(n -> n.getTimeWindows().stream().mapToInt(i -> i))
@@ -81,11 +88,8 @@ public class ProtoTimeEncoder {
throw new RuntimeException("No index for key " + key + " in " + this.keysToIndex.keySet());
}
- // convert the duration from microseconds -> milliseconds
- double durationInMilliseconds = value.longValue() / 1000d;
-
// store in the array
- array[idx] = durationInMilliseconds;
+ array[idx] = this.valueTransformer.applyAsDouble(value.longValue());
});
return array;
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/window/WindowStatisticsCollector.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/window/WindowStatisticsCollector.java
index ce65013..86c0b20 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/window/WindowStatisticsCollector.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/window/WindowStatisticsCollector.java
@@ -20,29 +20,35 @@
package me.lucko.spark.common.sampler.window;
+import me.lucko.spark.api.statistic.misc.DoubleAverageInfo;
import me.lucko.spark.common.SparkPlatform;
import me.lucko.spark.common.monitor.cpu.CpuMonitor;
import me.lucko.spark.common.monitor.tick.TickStatistics;
import me.lucko.spark.common.platform.world.AsyncWorldInfoProvider;
import me.lucko.spark.common.platform.world.WorldInfoProvider;
import me.lucko.spark.common.tick.TickHook;
-import me.lucko.spark.common.util.RollingAverage;
import me.lucko.spark.proto.SparkProtos;
+import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.IntPredicate;
+import java.util.logging.Level;
/**
* Collects statistics for each profiling window.
*/
public class WindowStatisticsCollector {
- private static final SparkProtos.WindowStatistics ZERO = SparkProtos.WindowStatistics.newBuilder().build();
+ private static final SparkProtos.WindowStatistics ZERO = SparkProtos.WindowStatistics.newBuilder()
+ .setDuration(ProfilingWindowUtils.WINDOW_SIZE_SECONDS * 1000)
+ .build();
/** The platform */
private final SparkPlatform platform;
+ /** Map of profiling window -> start time */
+ private final Map<Integer, Long> windowStartTimes = new HashMap<>();
/** Map of profiling window -> statistics */
private final Map<Integer, SparkProtos.WindowStatistics> stats;
@@ -100,12 +106,21 @@ public class WindowStatisticsCollector {
}
/**
+ * Records the wall-clock time when a window was started.
+ *
+ * @param window the window
+ */
+ public void recordWindowStartTime(int window) {
+ this.windowStartTimes.put(window, System.currentTimeMillis());
+ }
+
+ /**
* Measures statistics for the given window if none have been recorded yet.
*
* @param window the window
*/
public void measureNow(int window) {
- this.stats.computeIfAbsent(window, w -> measure());
+ this.stats.computeIfAbsent(window, this::measure);
}
/**
@@ -132,14 +147,25 @@ public class WindowStatisticsCollector {
*
* @return the current statistics
*/
- private SparkProtos.WindowStatistics measure() {
+ private SparkProtos.WindowStatistics measure(int window) {
SparkProtos.WindowStatistics.Builder builder = SparkProtos.WindowStatistics.newBuilder();
+ long endTime = System.currentTimeMillis();
+ Long startTime = this.windowStartTimes.get(window);
+ if (startTime == null) {
+ this.platform.getPlugin().log(Level.WARNING, "Unknown start time for window " + window);
+ startTime = endTime - (ProfilingWindowUtils.WINDOW_SIZE_SECONDS * 1000); // guess
+ }
+
+ builder.setStartTime(startTime);
+ builder.setEndTime(endTime);
+ builder.setDuration((int) (endTime - startTime));
+
TickStatistics tickStatistics = this.platform.getTickStatistics();
if (tickStatistics != null) {
builder.setTps(tickStatistics.tps1Min());
- RollingAverage mspt = tickStatistics.duration1Min();
+ DoubleAverageInfo mspt = tickStatistics.duration1Min();
if (mspt != null) {
builder.setMsptMedian(mspt.median());
builder.setMsptMax(mspt.max());
@@ -225,11 +251,13 @@ public class WindowStatisticsCollector {
if (this.startTick == -1) {
throw new IllegalStateException("start tick not recorded");
}
- if (this.stopTick == -1) {
- throw new IllegalStateException("stop tick not recorded");
+
+ int stopTick = this.stopTick;
+ if (stopTick == -1) {
+ stopTick = this.tickHook.getCurrentTick();
}
- return this.stopTick - this.startTick;
+ return stopTick - this.startTick;
}
}