diff options
Diffstat (limited to 'spark-common/src/main/java')
11 files changed, 91 insertions, 40 deletions
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/AbstractSampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/AbstractSampler.java index 9ae82e8..5b236f1 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/AbstractSampler.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/AbstractSampler.java @@ -31,7 +31,6 @@ import me.lucko.spark.common.util.ClassSourceLookup; import me.lucko.spark.proto.SparkSamplerProtos.SamplerData; import me.lucko.spark.proto.SparkSamplerProtos.SamplerMetadata; -import java.util.ArrayList; import java.util.Comparator; import java.util.List; import java.util.Map; @@ -127,15 +126,15 @@ public abstract class AbstractSampler implements Sampler { proto.setMetadata(metadata); } - protected void writeDataToProto(SamplerData.Builder proto, DataAggregator dataAggregator, Comparator<? super Map.Entry<String, ThreadNode>> outputOrder, MergeMode mergeMode, ClassSourceLookup classSourceLookup) { - List<Map.Entry<String, ThreadNode>> data = new ArrayList<>(dataAggregator.getData().entrySet()); + protected void writeDataToProto(SamplerData.Builder proto, DataAggregator dataAggregator, Comparator<ThreadNode> outputOrder, MergeMode mergeMode, ClassSourceLookup classSourceLookup) { + List<ThreadNode> data = dataAggregator.exportData(); data.sort(outputOrder); ClassSourceLookup.Visitor classSourceVisitor = ClassSourceLookup.createVisitor(classSourceLookup); - for (Map.Entry<String, ThreadNode> entry : data) { - proto.addThreads(entry.getValue().toProto(mergeMode)); - classSourceVisitor.visit(entry.getValue()); + for (ThreadNode entry : data) { + proto.addThreads(entry.toProto(mergeMode)); + classSourceVisitor.visit(entry); } if (classSourceVisitor.hasMappings()) { diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java index d27b2fc..845043f 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java @@ -28,7 +28,6 @@ import me.lucko.spark.common.util.ClassSourceLookup; import me.lucko.spark.proto.SparkSamplerProtos.SamplerData; import java.util.Comparator; -import java.util.Map; import java.util.concurrent.CompletableFuture; /** @@ -68,6 +67,6 @@ public interface Sampler { CompletableFuture<Sampler> getFuture(); // Methods used to export the sampler data to the web viewer. - SamplerData toProto(SparkPlatform platform, CommandSender creator, Comparator<? super Map.Entry<String, ThreadNode>> outputOrder, String comment, MergeMode mergeMode, ClassSourceLookup classSourceLookup); + SamplerData toProto(SparkPlatform platform, CommandSender creator, Comparator<ThreadNode> outputOrder, String comment, MergeMode mergeMode, ClassSourceLookup classSourceLookup); } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadGrouper.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadGrouper.java index 225f768..f71ad9f 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadGrouper.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadGrouper.java @@ -22,7 +22,9 @@ package me.lucko.spark.common.sampler; import me.lucko.spark.proto.SparkSamplerProtos.SamplerMetadata; +import java.util.Collections; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -42,6 +44,11 @@ public interface ThreadGrouper { } @Override + public String getLabel(String group) { + return group; + } + + @Override public SamplerMetadata.DataAggregator.ThreadGrouper asProto() { return SamplerMetadata.DataAggregator.ThreadGrouper.BY_NAME; } @@ -55,14 +62,18 @@ public interface ThreadGrouper { * separated from the pool name with any of one or more of ' ', '-', or '#'.</p> */ ThreadGrouper BY_POOL = new ThreadGrouper() { + private /* static */ final Pattern pattern = Pattern.compile("^(.*?)[-# ]+\\d+$"); + + // thread id -> group private final Map<Long, String> cache = new ConcurrentHashMap<>(); - private final Pattern pattern = Pattern.compile("^(.*?)[-# ]+\\d+$"); + // group -> thread ids + private final Map<String, Set<Long>> seen = new ConcurrentHashMap<>(); @Override public String getGroup(long threadId, String threadName) { - String group = this.cache.get(threadId); - if (group != null) { - return group; + String cached = this.cache.get(threadId); + if (cached != null) { + return cached; } Matcher matcher = this.pattern.matcher(threadName); @@ -70,12 +81,19 @@ public interface ThreadGrouper { return threadName; } - group = matcher.group(1).trim() + " (Combined)"; - this.cache.put(threadId, group); // we don't care about race conditions here + String group = matcher.group(1).trim(); + this.cache.put(threadId, group); + this.seen.computeIfAbsent(group, g -> ConcurrentHashMap.newKeySet()).add(threadId); return group; } @Override + public String getLabel(String group) { + int count = this.seen.getOrDefault(group, Collections.emptySet()).size(); + return group + " (x" + count + ")"; + } + + @Override public SamplerMetadata.DataAggregator.ThreadGrouper asProto() { return SamplerMetadata.DataAggregator.ThreadGrouper.BY_POOL; } @@ -86,9 +104,17 @@ public interface ThreadGrouper { * the name "All". */ ThreadGrouper AS_ONE = new ThreadGrouper() { + private final Set<Long> seen = ConcurrentHashMap.newKeySet(); + @Override public String getGroup(long threadId, String threadName) { - return "All"; + this.seen.add(threadId); + return "root"; + } + + @Override + public String getLabel(String group) { + return "All (x" + this.seen.size() + ")"; } @Override @@ -106,6 +132,14 @@ public interface ThreadGrouper { */ String getGroup(long threadId, String threadName); + /** + * Gets the label to use for a given group. + * + * @param group the group + * @return the label + */ + String getLabel(String group); + SamplerMetadata.DataAggregator.ThreadGrouper asProto(); } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadNodeOrder.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadNodeOrder.java index 4fa8ff4..adcedcd 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadNodeOrder.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadNodeOrder.java @@ -23,20 +23,19 @@ package me.lucko.spark.common.sampler; import me.lucko.spark.common.sampler.node.ThreadNode; import java.util.Comparator; -import java.util.Map; /** * Methods of ordering {@link ThreadNode}s in the output data. */ -public enum ThreadNodeOrder implements Comparator<Map.Entry<String, ThreadNode>> { +public enum ThreadNodeOrder implements Comparator<ThreadNode> { /** * Order by the name of the thread (alphabetically) */ BY_NAME { @Override - public int compare(Map.Entry<String, ThreadNode> o1, Map.Entry<String, ThreadNode> o2) { - return o1.getKey().compareTo(o2.getKey()); + public int compare(ThreadNode o1, ThreadNode o2) { + return o1.getThreadLabel().compareTo(o2.getThreadLabel()); } }, @@ -45,8 +44,8 @@ public enum ThreadNodeOrder implements Comparator<Map.Entry<String, ThreadNode>> */ BY_TIME { @Override - public int compare(Map.Entry<String, ThreadNode> o1, Map.Entry<String, ThreadNode> o2) { - return -Double.compare(o1.getValue().getTotalTime(), o2.getValue().getTotalTime()); + public int compare(ThreadNode o1, ThreadNode o2) { + return -Double.compare(o1.getTotalTime(), o2.getTotalTime()); } } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/AbstractDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/AbstractDataAggregator.java index 7640d60..ad9dee4 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/AbstractDataAggregator.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/AbstractDataAggregator.java @@ -23,6 +23,8 @@ package me.lucko.spark.common.sampler.aggregator; import me.lucko.spark.common.sampler.ThreadGrouper; import me.lucko.spark.common.sampler.node.ThreadNode; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -50,7 +52,11 @@ public abstract class AbstractDataAggregator implements DataAggregator { } @Override - public Map<String, ThreadNode> getData() { - return this.threadData; + public List<ThreadNode> exportData() { + List<ThreadNode> data = new ArrayList<>(this.threadData.values()); + for (ThreadNode node : data) { + node.setThreadLabel(this.threadGrouper.getLabel(node.getThreadGroup())); + } + return data; } } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/DataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/DataAggregator.java index 3b1d349..5590a96 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/DataAggregator.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/DataAggregator.java @@ -23,7 +23,7 @@ package me.lucko.spark.common.sampler.aggregator; import me.lucko.spark.common.sampler.node.ThreadNode; import me.lucko.spark.proto.SparkSamplerProtos.SamplerMetadata; -import java.util.Map; +import java.util.List; /** * Aggregates sampling data. @@ -35,7 +35,7 @@ public interface DataAggregator { * * @return the output data */ - Map<String, ThreadNode> getData(); + List<ThreadNode> exportData(); /** * Gets metadata about the data aggregator instance. diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java index 5d587a0..60c4e03 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java @@ -42,7 +42,6 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.Comparator; import java.util.List; -import java.util.Map; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -154,7 +153,7 @@ public class AsyncSampler extends AbstractSampler { } @Override - public SamplerData toProto(SparkPlatform platform, CommandSender creator, Comparator<? super Map.Entry<String, ThreadNode>> outputOrder, String comment, MergeMode mergeMode, ClassSourceLookup classSourceLookup) { + public SamplerData toProto(SparkPlatform platform, CommandSender creator, Comparator<ThreadNode> outputOrder, String comment, MergeMode mergeMode, ClassSourceLookup classSourceLookup) { SamplerData.Builder proto = SamplerData.newBuilder(); writeMetadataToProto(proto, platform, creator, comment, this.dataAggregator); aggregateOutput(); diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaDataAggregator.java index 54d9e1c..cc530d6 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaDataAggregator.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaDataAggregator.java @@ -27,7 +27,7 @@ import me.lucko.spark.common.sampler.node.StackTraceNode; import me.lucko.spark.common.sampler.node.ThreadNode; import java.lang.management.ThreadInfo; -import java.util.Map; +import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; @@ -86,7 +86,7 @@ public abstract class JavaDataAggregator extends AbstractDataAggregator { } @Override - public Map<String, ThreadNode> getData() { + public List<ThreadNode> exportData() { // wait for all pending data to be inserted this.workerPool.shutdown(); try { @@ -95,7 +95,7 @@ public abstract class JavaDataAggregator extends AbstractDataAggregator { e.printStackTrace(); } - return super.getData(); + return super.exportData(); } private static boolean isSleeping(ThreadInfo thread) { diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java index 2bedae6..cfa0a0f 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java @@ -37,7 +37,6 @@ import java.lang.management.ManagementFactory; import java.lang.management.ThreadInfo; import java.lang.management.ThreadMXBean; import java.util.Comparator; -import java.util.Map; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; @@ -125,7 +124,7 @@ public class JavaSampler extends AbstractSampler implements Runnable { } @Override - public SamplerData toProto(SparkPlatform platform, CommandSender creator, Comparator<? super Map.Entry<String, ThreadNode>> outputOrder, String comment, MergeMode mergeMode, ClassSourceLookup classSourceLookup) { + public SamplerData toProto(SparkPlatform platform, CommandSender creator, Comparator<ThreadNode> outputOrder, String comment, MergeMode mergeMode, ClassSourceLookup classSourceLookup) { SamplerData.Builder proto = SamplerData.newBuilder(); writeMetadataToProto(proto, platform, creator, comment, this.dataAggregator); writeDataToProto(proto, this.dataAggregator, outputOrder, mergeMode, classSourceLookup); diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/TickedDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/TickedDataAggregator.java index ac34d01..e817828 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/TickedDataAggregator.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/TickedDataAggregator.java @@ -29,7 +29,6 @@ import me.lucko.spark.proto.SparkSamplerProtos.SamplerMetadata; import java.lang.management.ThreadInfo; import java.util.ArrayList; import java.util.List; -import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; @@ -102,13 +101,13 @@ public class TickedDataAggregator extends JavaDataAggregator { } @Override - public Map<String, ThreadNode> getData() { + public List<ThreadNode> exportData() { // push the current tick synchronized (this.mutex) { pushCurrentTick(); } - return super.getData(); + return super.exportData(); } private final class TickList implements Runnable { diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/node/ThreadNode.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/node/ThreadNode.java index fc56987..ed97443 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/node/ThreadNode.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/node/ThreadNode.java @@ -28,17 +28,34 @@ import me.lucko.spark.proto.SparkSamplerProtos; public final class ThreadNode extends AbstractNode { /** - * The name of this thread + * The name of this thread / thread group */ - private final String threadName; + private final String name; - public ThreadNode(String threadName) { - this.threadName = threadName; + /** + * The label used to describe this thread in the viewer + */ + public String label; + + public ThreadNode(String name) { + this.name = name; + } + + public String getThreadLabel() { + return this.label != null ? this.label : this.name; + } + + public String getThreadGroup() { + return this.name; + } + + public void setThreadLabel(String label) { + this.label = label; } public SparkSamplerProtos.ThreadNode toProto(MergeMode mergeMode) { SparkSamplerProtos.ThreadNode.Builder proto = SparkSamplerProtos.ThreadNode.newBuilder() - .setName(this.threadName) + .setName(getThreadLabel()) .setTime(getTotalTime()); for (StackTraceNode child : exportChildren(mergeMode)) { |