aboutsummaryrefslogtreecommitdiff
path: root/spark-common/src/main/java/me/lucko/spark/common/sampler
diff options
context:
space:
mode:
Diffstat (limited to 'spark-common/src/main/java/me/lucko/spark/common/sampler')
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/AbstractSampler.java56
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/BackgroundSamplerManager.java22
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java83
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java39
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerMode.java74
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadDumper.java89
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadGrouper.java60
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncDataAggregator.java2
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerAccess.java25
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerJob.java67
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java83
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/async/ProfileSegment.java14
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/async/SampleCollector.java154
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java48
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/java/TickedDataAggregator.java11
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/window/ProtoTimeEncoder.java14
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/window/WindowStatisticsCollector.java44
17 files changed, 725 insertions, 160 deletions
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/AbstractSampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/AbstractSampler.java
index e324fd3..d814002 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/AbstractSampler.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/AbstractSampler.java
@@ -32,9 +32,12 @@ import me.lucko.spark.common.sampler.source.ClassSourceLookup;
import me.lucko.spark.common.sampler.source.SourceMetadata;
import me.lucko.spark.common.sampler.window.ProtoTimeEncoder;
import me.lucko.spark.common.sampler.window.WindowStatisticsCollector;
+import me.lucko.spark.common.ws.ViewerSocket;
+import me.lucko.spark.proto.SparkProtos;
import me.lucko.spark.proto.SparkSamplerProtos.SamplerData;
import me.lucko.spark.proto.SparkSamplerProtos.SamplerMetadata;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
@@ -74,6 +77,9 @@ public abstract class AbstractSampler implements Sampler {
/** The garbage collector statistics when profiling started */
protected Map<String, GarbageCollectorStatistics> initialGcStats;
+ /** A set of viewer sockets linked to the sampler */
+ protected List<ViewerSocket> viewerSockets = new ArrayList<>();
+
protected AbstractSampler(SparkPlatform platform, SamplerSettings settings) {
this.platform = platform;
this.interval = settings.interval();
@@ -122,12 +128,54 @@ public abstract class AbstractSampler implements Sampler {
@Override
public void stop(boolean cancelled) {
this.windowStatisticsCollector.stop();
+ for (ViewerSocket viewerSocket : this.viewerSockets) {
+ viewerSocket.processSamplerStopped(this);
+ }
+ }
+
+ @Override
+ public void attachSocket(ViewerSocket socket) {
+ this.viewerSockets.add(socket);
+ }
+
+ @Override
+ public Collection<ViewerSocket> getAttachedSockets() {
+ return this.viewerSockets;
+ }
+
+ protected void processWindowRotate() {
+ this.viewerSockets.removeIf(socket -> {
+ if (!socket.isOpen()) {
+ return true;
+ }
+
+ socket.processWindowRotate(this);
+ return false;
+ });
+ }
+
+ protected void sendStatisticsToSocket() {
+ try {
+ if (this.viewerSockets.isEmpty()) {
+ return;
+ }
+
+ SparkProtos.PlatformStatistics platform = this.platform.getStatisticsProvider().getPlatformStatistics(getInitialGcStats(), false);
+ SparkProtos.SystemStatistics system = this.platform.getStatisticsProvider().getSystemStatistics();
+
+ for (ViewerSocket viewerSocket : this.viewerSockets) {
+ viewerSocket.sendUpdatedStatistics(platform, system);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
}
- protected void writeMetadataToProto(SamplerData.Builder proto, SparkPlatform platform, CommandSender creator, String comment, DataAggregator dataAggregator) {
+ protected void writeMetadataToProto(SamplerData.Builder proto, SparkPlatform platform, CommandSender.Data creator, String comment, DataAggregator dataAggregator) {
SamplerMetadata.Builder metadata = SamplerMetadata.newBuilder()
+ .setSamplerMode(getMode().asProto())
.setPlatformMetadata(platform.getPlugin().getPlatformInfo().toData().toProto())
- .setCreator(creator.toData().toProto())
+ .setCreator(creator.toProto())
.setStartTime(this.startTime)
.setEndTime(System.currentTimeMillis())
.setInterval(this.interval)
@@ -144,7 +192,7 @@ public abstract class AbstractSampler implements Sampler {
}
try {
- metadata.setPlatformStatistics(platform.getStatisticsProvider().getPlatformStatistics(getInitialGcStats()));
+ metadata.setPlatformStatistics(platform.getStatisticsProvider().getPlatformStatistics(getInitialGcStats(), true));
} catch (Exception e) {
e.printStackTrace();
}
@@ -187,7 +235,7 @@ public abstract class AbstractSampler implements Sampler {
ClassSourceLookup.Visitor classSourceVisitor = ClassSourceLookup.createVisitor(classSourceLookup);
- ProtoTimeEncoder timeEncoder = new ProtoTimeEncoder(data);
+ ProtoTimeEncoder timeEncoder = new ProtoTimeEncoder(getMode().valueTransformer(), data);
int[] timeWindows = timeEncoder.getKeys();
for (int timeWindow : timeWindows) {
proto.addTimeWindows(timeWindow);
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/BackgroundSamplerManager.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/BackgroundSamplerManager.java
index 7e3b6b4..4e9ca9e 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/BackgroundSamplerManager.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/BackgroundSamplerManager.java
@@ -31,6 +31,8 @@ public class BackgroundSamplerManager {
private static final String OPTION_ENABLED = "backgroundProfiler";
private static final String OPTION_ENGINE = "backgroundProfilerEngine";
private static final String OPTION_INTERVAL = "backgroundProfilerInterval";
+ private static final String OPTION_THREAD_GROUPER = "backgroundProfilerThreadGrouper";
+ private static final String OPTION_THREAD_DUMPER = "backgroundProfilerThreadDumper";
private static final String MARKER_FAILED = "_marker_background_profiler_failed";
@@ -101,13 +103,21 @@ public class BackgroundSamplerManager {
private void startSampler() {
boolean forceJavaEngine = this.configuration.getString(OPTION_ENGINE, "async").equals("java");
+ ThreadGrouper threadGrouper = ThreadGrouper.parseConfigSetting(this.configuration.getString(OPTION_THREAD_GROUPER, "by-pool"));
+ ThreadDumper threadDumper = ThreadDumper.parseConfigSetting(this.configuration.getString(OPTION_THREAD_DUMPER, "default"));
+ if (threadDumper == null) {
+ threadDumper = this.platform.getPlugin().getDefaultThreadDumper();
+ }
+
+ int interval = this.configuration.getInteger(OPTION_INTERVAL, 10);
+
Sampler sampler = new SamplerBuilder()
- .background(true)
- .threadDumper(this.platform.getPlugin().getDefaultThreadDumper())
- .threadGrouper(ThreadGrouper.BY_POOL)
- .samplingInterval(this.configuration.getInteger(OPTION_INTERVAL, 10))
- .forceJavaSampler(forceJavaEngine)
- .start(this.platform);
+ .background(true)
+ .threadDumper(threadDumper)
+ .threadGrouper(threadGrouper)
+ .samplingInterval(interval)
+ .forceJavaSampler(forceJavaEngine)
+ .start(this.platform);
this.platform.getSamplerContainer().setActiveSampler(sampler);
}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java
index 36a63f1..844ab0b 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java
@@ -24,9 +24,13 @@ import me.lucko.spark.common.SparkPlatform;
import me.lucko.spark.common.command.sender.CommandSender;
import me.lucko.spark.common.sampler.node.MergeMode;
import me.lucko.spark.common.sampler.source.ClassSourceLookup;
+import me.lucko.spark.common.ws.ViewerSocket;
import me.lucko.spark.proto.SparkSamplerProtos.SamplerData;
+import me.lucko.spark.proto.SparkSamplerProtos.SocketChannelInfo;
+import java.util.Collection;
import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
/**
* Abstract superinterface for all sampler implementations.
@@ -44,6 +48,20 @@ public interface Sampler {
void stop(boolean cancelled);
/**
+ * Attaches a viewer socket to this sampler.
+ *
+ * @param socket the socket
+ */
+ void attachSocket(ViewerSocket socket);
+
+ /**
+ * Gets the sockets attached to this sampler.
+ *
+ * @return the attached sockets
+ */
+ Collection<ViewerSocket> getAttachedSockets();
+
+ /**
* Gets the time when the sampler started (unix timestamp in millis)
*
* @return the start time
@@ -65,6 +83,13 @@ public interface Sampler {
boolean isRunningInBackground();
/**
+ * Gets the sampler mode.
+ *
+ * @return the sampler mode
+ */
+ SamplerMode getMode();
+
+ /**
* Gets a future to encapsulate the completion of the sampler
*
* @return a future
@@ -72,6 +97,62 @@ public interface Sampler {
CompletableFuture<Sampler> getFuture();
// Methods used to export the sampler data to the web viewer.
- SamplerData toProto(SparkPlatform platform, CommandSender creator, String comment, MergeMode mergeMode, ClassSourceLookup classSourceLookup);
+ SamplerData toProto(SparkPlatform platform, ExportProps exportProps);
+
+ final class ExportProps {
+ private CommandSender.Data creator;
+ private String comment;
+ private Supplier<MergeMode> mergeMode;
+ private Supplier<ClassSourceLookup> classSourceLookup;
+ private SocketChannelInfo channelInfo;
+
+ public ExportProps() {
+ }
+
+ public CommandSender.Data creator() {
+ return this.creator;
+ }
+
+ public String comment() {
+ return this.comment;
+ }
+
+ public Supplier<MergeMode> mergeMode() {
+ return this.mergeMode;
+ }
+
+ public Supplier<ClassSourceLookup> classSourceLookup() {
+ return this.classSourceLookup;
+ }
+
+ public SocketChannelInfo channelInfo() {
+ return this.channelInfo;
+ }
+
+ public ExportProps creator(CommandSender.Data creator) {
+ this.creator = creator;
+ return this;
+ }
+
+ public ExportProps comment(String comment) {
+ this.comment = comment;
+ return this;
+ }
+
+ public ExportProps mergeMode(Supplier<MergeMode> mergeMode) {
+ this.mergeMode = mergeMode;
+ return this;
+ }
+
+ public ExportProps classSourceLookup(Supplier<ClassSourceLookup> classSourceLookup) {
+ this.classSourceLookup = classSourceLookup;
+ return this;
+ }
+
+ public ExportProps channelInfo(SocketChannelInfo channelInfo) {
+ this.channelInfo = channelInfo;
+ return this;
+ }
+ }
}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java
index ec635ef..b6895ce 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java
@@ -23,6 +23,7 @@ package me.lucko.spark.common.sampler;
import me.lucko.spark.common.SparkPlatform;
import me.lucko.spark.common.sampler.async.AsyncProfilerAccess;
import me.lucko.spark.common.sampler.async.AsyncSampler;
+import me.lucko.spark.common.sampler.async.SampleCollector;
import me.lucko.spark.common.sampler.java.JavaSampler;
import me.lucko.spark.common.tick.TickHook;
@@ -34,10 +35,12 @@ import java.util.concurrent.TimeUnit;
@SuppressWarnings("UnusedReturnValue")
public class SamplerBuilder {
- private double samplingInterval = 4; // milliseconds
+ private SamplerMode mode = SamplerMode.EXECUTION;
+ private double samplingInterval = -1;
private boolean ignoreSleeping = false;
private boolean ignoreNative = false;
private boolean useAsyncProfiler = true;
+ private boolean allocLiveOnly = false;
private long autoEndTime = -1;
private boolean background = false;
private ThreadDumper threadDumper = ThreadDumper.ALL;
@@ -49,6 +52,11 @@ public class SamplerBuilder {
public SamplerBuilder() {
}
+ public SamplerBuilder mode(SamplerMode mode) {
+ this.mode = mode;
+ return this;
+ }
+
public SamplerBuilder samplingInterval(double samplingInterval) {
this.samplingInterval = samplingInterval;
return this;
@@ -98,21 +106,38 @@ public class SamplerBuilder {
return this;
}
- public Sampler start(SparkPlatform platform) {
+ public SamplerBuilder allocLiveOnly(boolean allocLiveOnly) {
+ this.allocLiveOnly = allocLiveOnly;
+ return this;
+ }
+
+ public Sampler start(SparkPlatform platform) throws UnsupportedOperationException {
+ if (this.samplingInterval <= 0) {
+ throw new IllegalArgumentException("samplingInterval = " + this.samplingInterval);
+ }
+
boolean onlyTicksOverMode = this.ticksOver != -1 && this.tickHook != null;
boolean canUseAsyncProfiler = this.useAsyncProfiler &&
!onlyTicksOverMode &&
!(this.ignoreSleeping || this.ignoreNative) &&
- !(this.threadDumper instanceof ThreadDumper.Regex) &&
AsyncProfilerAccess.getInstance(platform).checkSupported(platform);
+ if (this.mode == SamplerMode.ALLOCATION && (!canUseAsyncProfiler || !AsyncProfilerAccess.getInstance(platform).checkAllocationProfilingSupported(platform))) {
+ throw new UnsupportedOperationException("Allocation profiling is not supported on your system. Check the console for more info.");
+ }
+
+ int interval = (int) (this.mode == SamplerMode.EXECUTION ?
+ this.samplingInterval * 1000d : // convert to microseconds
+ this.samplingInterval
+ );
- int intervalMicros = (int) (this.samplingInterval * 1000d);
- SamplerSettings settings = new SamplerSettings(intervalMicros, this.threadDumper, this.threadGrouper, this.autoEndTime, this.background);
+ SamplerSettings settings = new SamplerSettings(interval, this.threadDumper, this.threadGrouper, this.autoEndTime, this.background);
Sampler sampler;
- if (canUseAsyncProfiler) {
- sampler = new AsyncSampler(platform, settings);
+ if (this.mode == SamplerMode.ALLOCATION) {
+ sampler = new AsyncSampler(platform, settings, new SampleCollector.Allocation(interval, this.allocLiveOnly));
+ } else if (canUseAsyncProfiler) {
+ sampler = new AsyncSampler(platform, settings, new SampleCollector.Execution(interval));
} else if (onlyTicksOverMode) {
sampler = new JavaSampler(platform, settings, this.ignoreSleeping, this.ignoreNative, this.tickHook, this.ticksOver);
} else {
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerMode.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerMode.java
new file mode 100644
index 0000000..f9a6e41
--- /dev/null
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerMode.java
@@ -0,0 +1,74 @@
+/*
+ * This file is part of spark.
+ *
+ * Copyright (c) lucko (Luck) <luck@lucko.me>
+ * Copyright (c) contributors
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+package me.lucko.spark.common.sampler;
+
+import me.lucko.spark.proto.SparkSamplerProtos.SamplerMetadata;
+
+import java.util.function.LongToDoubleFunction;
+
+public enum SamplerMode {
+
+ EXECUTION(
+ value -> {
+ // convert the duration from microseconds -> milliseconds
+ return value / 1000d;
+ },
+ 4, // ms
+ SamplerMetadata.SamplerMode.EXECUTION
+ ),
+
+ ALLOCATION(
+ value -> {
+ // do nothing
+ return value;
+ },
+ 524287, // 512 KiB
+ SamplerMetadata.SamplerMode.ALLOCATION
+ );
+
+ private final LongToDoubleFunction valueTransformer;
+ private final int defaultInterval;
+ private final SamplerMetadata.SamplerMode proto;
+
+ SamplerMode(LongToDoubleFunction valueTransformer, int defaultInterval, SamplerMetadata.SamplerMode proto) {
+ this.valueTransformer = valueTransformer;
+ this.defaultInterval = defaultInterval;
+ this.proto = proto;
+ }
+
+ public LongToDoubleFunction valueTransformer() {
+ return this.valueTransformer;
+ }
+
+ public int defaultInterval() {
+ return this.defaultInterval;
+ }
+
+ /**
+ * Gets the metadata enum instance for this sampler mode.
+ *
+ * @return proto metadata
+ */
+ public SamplerMetadata.SamplerMode asProto() {
+ return this.proto;
+ }
+
+}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadDumper.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadDumper.java
index fd0c413..c68384b 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadDumper.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadDumper.java
@@ -32,7 +32,6 @@ import java.util.Objects;
import java.util.Set;
import java.util.function.Supplier;
import java.util.regex.Pattern;
-import java.util.regex.PatternSyntaxException;
import java.util.stream.Collectors;
/**
@@ -50,11 +49,38 @@ public interface ThreadDumper {
ThreadInfo[] dumpThreads(ThreadMXBean threadBean);
/**
+ * Gets if the given thread should be included in the output.
+ *
+ * @param threadId the thread id
+ * @param threadName the thread name
+ * @return if the thread should be included
+ */
+ boolean isThreadIncluded(long threadId, String threadName);
+
+ /**
* Gets metadata about the thread dumper instance.
*/
SamplerMetadata.ThreadDumper getMetadata();
/**
+ * Creates a new {@link ThreadDumper} by parsing the given config setting.
+ *
+ * @param setting the config setting
+ * @return the thread dumper
+ */
+ static ThreadDumper parseConfigSetting(String setting) {
+ switch (setting) {
+ case "default":
+ return null;
+ case "all":
+ return ALL;
+ default:
+ Set<String> threadNames = Arrays.stream(setting.split(",")).collect(Collectors.toSet());
+ return new ThreadDumper.Specific(threadNames);
+ }
+ }
+
+ /**
* Implementation of {@link ThreadDumper} that generates data for all threads.
*/
ThreadDumper ALL = new ThreadDumper() {
@@ -64,6 +90,11 @@ public interface ThreadDumper {
}
@Override
+ public boolean isThreadIncluded(long threadId, String threadName) {
+ return true;
+ }
+
+ @Override
public SamplerMetadata.ThreadDumper getMetadata() {
return SamplerMetadata.ThreadDumper.newBuilder()
.setType(SamplerMetadata.ThreadDumper.Type.ALL)
@@ -98,7 +129,7 @@ public interface ThreadDumper {
}
public void setThread(Thread thread) {
- this.dumper = new Specific(new long[]{thread.getId()});
+ this.dumper = new Specific(thread);
}
}
@@ -114,10 +145,6 @@ public interface ThreadDumper {
this.ids = new long[]{thread.getId()};
}
- public Specific(long[] ids) {
- this.ids = ids;
- }
-
public Specific(Set<String> names) {
this.threadNamesLowerCase = names.stream().map(String::toLowerCase).collect(Collectors.toSet());
this.ids = new ThreadFinder().getThreads()
@@ -146,6 +173,14 @@ public interface ThreadDumper {
}
@Override
+ public boolean isThreadIncluded(long threadId, String threadName) {
+ if (Arrays.binarySearch(this.ids, threadId) >= 0) {
+ return true;
+ }
+ return getThreadNames().contains(threadName.toLowerCase());
+ }
+
+ @Override
public ThreadInfo[] dumpThreads(ThreadMXBean threadBean) {
return threadBean.getThreadInfo(this.ids, Integer.MAX_VALUE);
}
@@ -169,35 +204,31 @@ public interface ThreadDumper {
public Regex(Set<String> namePatterns) {
this.namePatterns = namePatterns.stream()
- .map(regex -> {
- try {
- return Pattern.compile(regex, Pattern.CASE_INSENSITIVE);
- } catch (PatternSyntaxException e) {
- return null;
- }
- })
- .filter(Objects::nonNull)
+ .map(regex -> Pattern.compile(regex, Pattern.CASE_INSENSITIVE))
.collect(Collectors.toSet());
}
@Override
+ public boolean isThreadIncluded(long threadId, String threadName) {
+ Boolean result = this.cache.get(threadId);
+ if (result != null) {
+ return result;
+ }
+
+ for (Pattern pattern : this.namePatterns) {
+ if (pattern.matcher(threadName).matches()) {
+ this.cache.put(threadId, true);
+ return true;
+ }
+ }
+ this.cache.put(threadId, false);
+ return false;
+ }
+
+ @Override
public ThreadInfo[] dumpThreads(ThreadMXBean threadBean) {
return this.threadFinder.getThreads()
- .filter(thread -> {
- Boolean result = this.cache.get(thread.getId());
- if (result != null) {
- return result;
- }
-
- for (Pattern pattern : this.namePatterns) {
- if (pattern.matcher(thread.getName()).matches()) {
- this.cache.put(thread.getId(), true);
- return true;
- }
- }
- this.cache.put(thread.getId(), false);
- return false;
- })
+ .filter(thread -> isThreadIncluded(thread.getId(), thread.getName()))
.map(thread -> threadBean.getThreadInfo(thread.getId(), Integer.MAX_VALUE))
.filter(Objects::nonNull)
.toArray(ThreadInfo[]::new);
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadGrouper.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadGrouper.java
index 9ad84df..b6cfbea 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadGrouper.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadGrouper.java
@@ -35,6 +35,47 @@ import java.util.regex.Pattern;
public interface ThreadGrouper {
/**
+ * Gets the group for the given thread.
+ *
+ * @param threadId the id of the thread
+ * @param threadName the name of the thread
+ * @return the group
+ */
+ String getGroup(long threadId, String threadName);
+
+ /**
+ * Gets the label to use for a given group.
+ *
+ * @param group the group
+ * @return the label
+ */
+ String getLabel(String group);
+
+ /**
+ * Gets the metadata enum instance for this thread grouper.
+ *
+ * @return proto metadata
+ */
+ SamplerMetadata.DataAggregator.ThreadGrouper asProto();
+
+ /**
+ * Creates a new {@link ThreadGrouper} by parsing the given config setting.
+ *
+ * @param setting the config setting
+ * @return the thread grouper
+ */
+ static ThreadGrouper parseConfigSetting(String setting) {
+ switch (setting) {
+ case "as-one":
+ return AS_ONE;
+ case "by-name":
+ return BY_NAME;
+ default:
+ return BY_POOL;
+ }
+ }
+
+ /**
* Implementation of {@link ThreadGrouper} that just groups by thread name.
*/
ThreadGrouper BY_NAME = new ThreadGrouper() {
@@ -126,23 +167,4 @@ public interface ThreadGrouper {
}
};
- /**
- * Gets the group for the given thread.
- *
- * @param threadId the id of the thread
- * @param threadName the name of the thread
- * @return the group
- */
- String getGroup(long threadId, String threadName);
-
- /**
- * Gets the label to use for a given group.
- *
- * @param group the group
- * @return the label
- */
- String getLabel(String group);
-
- SamplerMetadata.DataAggregator.ThreadGrouper asProto();
-
}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncDataAggregator.java
index 402330a..b9a80e0 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncDataAggregator.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncDataAggregator.java
@@ -50,7 +50,7 @@ public class AsyncDataAggregator extends AbstractDataAggregator {
public void insertData(ProfileSegment element, int window) {
try {
ThreadNode node = getNode(this.threadGrouper.getGroup(element.getNativeThreadId(), element.getThreadName()));
- node.log(STACK_TRACE_DESCRIBER, element.getStackTrace(), element.getTime(), window);
+ node.log(STACK_TRACE_DESCRIBER, element.getStackTrace(), element.getValue(), window);
} catch (Exception e) {
e.printStackTrace();
}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerAccess.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerAccess.java
index 1480650..5bee56f 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerAccess.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerAccess.java
@@ -61,6 +61,8 @@ public class AsyncProfilerAccess {
/** The event to use for profiling */
private final ProfilingEvent profilingEvent;
+ /** The event to use for allocation profiling */
+ private final ProfilingEvent allocationProfilingEvent;
/** If profiler is null, contains the reason why setup failed */
private final Exception setupException;
@@ -68,10 +70,16 @@ public class AsyncProfilerAccess {
AsyncProfilerAccess(SparkPlatform platform) {
AsyncProfiler profiler;
ProfilingEvent profilingEvent = null;
+ ProfilingEvent allocationProfilingEvent = null;
Exception setupException = null;
try {
profiler = load(platform);
+
+ if (isEventSupported(profiler, ProfilingEvent.ALLOC, false)) {
+ allocationProfilingEvent = ProfilingEvent.ALLOC;
+ }
+
if (isEventSupported(profiler, ProfilingEvent.CPU, false)) {
profilingEvent = ProfilingEvent.CPU;
} else if (isEventSupported(profiler, ProfilingEvent.WALL, true)) {
@@ -84,6 +92,7 @@ public class AsyncProfilerAccess {
this.profiler = profiler;
this.profilingEvent = profilingEvent;
+ this.allocationProfilingEvent = allocationProfilingEvent;
this.setupException = setupException;
}
@@ -98,6 +107,10 @@ public class AsyncProfilerAccess {
return this.profilingEvent;
}
+ public ProfilingEvent getAllocationProfilingEvent() {
+ return this.allocationProfilingEvent;
+ }
+
public boolean checkSupported(SparkPlatform platform) {
if (this.setupException != null) {
if (this.setupException instanceof UnsupportedSystemException) {
@@ -116,6 +129,15 @@ public class AsyncProfilerAccess {
return this.profiler != null;
}
+ public boolean checkAllocationProfilingSupported(SparkPlatform platform) {
+ boolean supported = this.allocationProfilingEvent != null;
+ if (!supported && this.profiler != null) {
+ platform.getPlugin().log(Level.WARNING, "The allocation profiling mode is not supported on your system. This is most likely because Hotspot debug symbols are not available.");
+ platform.getPlugin().log(Level.WARNING, "To resolve, try installing the 'openjdk-11-dbg' or 'openjdk-8-dbg' package using your OS package manager.");
+ }
+ return supported;
+ }
+
private static AsyncProfiler load(SparkPlatform platform) throws Exception {
// check compatibility
String os = System.getProperty("os.name").toLowerCase(Locale.ROOT).replace(" ", "");
@@ -183,7 +205,8 @@ public class AsyncProfilerAccess {
enum ProfilingEvent {
CPU(Events.CPU),
- WALL(Events.WALL);
+ WALL(Events.WALL),
+ ALLOC(Events.ALLOC);
private final String id;
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerJob.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerJob.java
index d74b75f..2fd304c 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerJob.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerJob.java
@@ -20,6 +20,8 @@
package me.lucko.spark.common.sampler.async;
+import com.google.common.collect.ImmutableList;
+
import me.lucko.spark.common.SparkPlatform;
import me.lucko.spark.common.sampler.ThreadDumper;
import me.lucko.spark.common.sampler.async.jfr.JfrReader;
@@ -29,10 +31,9 @@ import one.profiler.AsyncProfiler;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
+import java.util.Collection;
import java.util.List;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Predicate;
/**
* Represents a profiling job within async-profiler.
@@ -77,8 +78,8 @@ public class AsyncProfilerJob {
// Set on init
/** The platform */
private SparkPlatform platform;