aboutsummaryrefslogtreecommitdiff
path: root/spark-common/src/main/java
diff options
context:
space:
mode:
Diffstat (limited to 'spark-common/src/main/java')
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/AbstractSampler.java11
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java3
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadGrouper.java48
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadNodeOrder.java11
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/AbstractDataAggregator.java10
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/DataAggregator.java4
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java3
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaDataAggregator.java6
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java3
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/java/TickedDataAggregator.java5
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/node/ThreadNode.java27
11 files changed, 91 insertions, 40 deletions
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/AbstractSampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/AbstractSampler.java
index 9ae82e8..5b236f1 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/AbstractSampler.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/AbstractSampler.java
@@ -31,7 +31,6 @@ import me.lucko.spark.common.util.ClassSourceLookup;
import me.lucko.spark.proto.SparkSamplerProtos.SamplerData;
import me.lucko.spark.proto.SparkSamplerProtos.SamplerMetadata;
-import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
@@ -127,15 +126,15 @@ public abstract class AbstractSampler implements Sampler {
proto.setMetadata(metadata);
}
- protected void writeDataToProto(SamplerData.Builder proto, DataAggregator dataAggregator, Comparator<? super Map.Entry<String, ThreadNode>> outputOrder, MergeMode mergeMode, ClassSourceLookup classSourceLookup) {
- List<Map.Entry<String, ThreadNode>> data = new ArrayList<>(dataAggregator.getData().entrySet());
+ protected void writeDataToProto(SamplerData.Builder proto, DataAggregator dataAggregator, Comparator<ThreadNode> outputOrder, MergeMode mergeMode, ClassSourceLookup classSourceLookup) {
+ List<ThreadNode> data = dataAggregator.exportData();
data.sort(outputOrder);
ClassSourceLookup.Visitor classSourceVisitor = ClassSourceLookup.createVisitor(classSourceLookup);
- for (Map.Entry<String, ThreadNode> entry : data) {
- proto.addThreads(entry.getValue().toProto(mergeMode));
- classSourceVisitor.visit(entry.getValue());
+ for (ThreadNode entry : data) {
+ proto.addThreads(entry.toProto(mergeMode));
+ classSourceVisitor.visit(entry);
}
if (classSourceVisitor.hasMappings()) {
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java
index d27b2fc..845043f 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java
@@ -28,7 +28,6 @@ import me.lucko.spark.common.util.ClassSourceLookup;
import me.lucko.spark.proto.SparkSamplerProtos.SamplerData;
import java.util.Comparator;
-import java.util.Map;
import java.util.concurrent.CompletableFuture;
/**
@@ -68,6 +67,6 @@ public interface Sampler {
CompletableFuture<Sampler> getFuture();
// Methods used to export the sampler data to the web viewer.
- SamplerData toProto(SparkPlatform platform, CommandSender creator, Comparator<? super Map.Entry<String, ThreadNode>> outputOrder, String comment, MergeMode mergeMode, ClassSourceLookup classSourceLookup);
+ SamplerData toProto(SparkPlatform platform, CommandSender creator, Comparator<ThreadNode> outputOrder, String comment, MergeMode mergeMode, ClassSourceLookup classSourceLookup);
}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadGrouper.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadGrouper.java
index 225f768..f71ad9f 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadGrouper.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadGrouper.java
@@ -22,7 +22,9 @@ package me.lucko.spark.common.sampler;
import me.lucko.spark.proto.SparkSamplerProtos.SamplerMetadata;
+import java.util.Collections;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -42,6 +44,11 @@ public interface ThreadGrouper {
}
@Override
+ public String getLabel(String group) {
+ return group;
+ }
+
+ @Override
public SamplerMetadata.DataAggregator.ThreadGrouper asProto() {
return SamplerMetadata.DataAggregator.ThreadGrouper.BY_NAME;
}
@@ -55,14 +62,18 @@ public interface ThreadGrouper {
* separated from the pool name with any of one or more of ' ', '-', or '#'.</p>
*/
ThreadGrouper BY_POOL = new ThreadGrouper() {
+ private /* static */ final Pattern pattern = Pattern.compile("^(.*?)[-# ]+\\d+$");
+
+ // thread id -> group
private final Map<Long, String> cache = new ConcurrentHashMap<>();
- private final Pattern pattern = Pattern.compile("^(.*?)[-# ]+\\d+$");
+ // group -> thread ids
+ private final Map<String, Set<Long>> seen = new ConcurrentHashMap<>();
@Override
public String getGroup(long threadId, String threadName) {
- String group = this.cache.get(threadId);
- if (group != null) {
- return group;
+ String cached = this.cache.get(threadId);
+ if (cached != null) {
+ return cached;
}
Matcher matcher = this.pattern.matcher(threadName);
@@ -70,12 +81,19 @@ public interface ThreadGrouper {
return threadName;
}
- group = matcher.group(1).trim() + " (Combined)";
- this.cache.put(threadId, group); // we don't care about race conditions here
+ String group = matcher.group(1).trim();
+ this.cache.put(threadId, group);
+ this.seen.computeIfAbsent(group, g -> ConcurrentHashMap.newKeySet()).add(threadId);
return group;
}
@Override
+ public String getLabel(String group) {
+ int count = this.seen.getOrDefault(group, Collections.emptySet()).size();
+ return group + " (x" + count + ")";
+ }
+
+ @Override
public SamplerMetadata.DataAggregator.ThreadGrouper asProto() {
return SamplerMetadata.DataAggregator.ThreadGrouper.BY_POOL;
}
@@ -86,9 +104,17 @@ public interface ThreadGrouper {
* the name "All".
*/
ThreadGrouper AS_ONE = new ThreadGrouper() {
+ private final Set<Long> seen = ConcurrentHashMap.newKeySet();
+
@Override
public String getGroup(long threadId, String threadName) {
- return "All";
+ this.seen.add(threadId);
+ return "root";
+ }
+
+ @Override
+ public String getLabel(String group) {
+ return "All (x" + this.seen.size() + ")";
}
@Override
@@ -106,6 +132,14 @@ public interface ThreadGrouper {
*/
String getGroup(long threadId, String threadName);
+ /**
+ * Gets the label to use for a given group.
+ *
+ * @param group the group
+ * @return the label
+ */
+ String getLabel(String group);
+
SamplerMetadata.DataAggregator.ThreadGrouper asProto();
}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadNodeOrder.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadNodeOrder.java
index 4fa8ff4..adcedcd 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadNodeOrder.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadNodeOrder.java
@@ -23,20 +23,19 @@ package me.lucko.spark.common.sampler;
import me.lucko.spark.common.sampler.node.ThreadNode;
import java.util.Comparator;
-import java.util.Map;
/**
* Methods of ordering {@link ThreadNode}s in the output data.
*/
-public enum ThreadNodeOrder implements Comparator<Map.Entry<String, ThreadNode>> {
+public enum ThreadNodeOrder implements Comparator<ThreadNode> {
/**
* Order by the name of the thread (alphabetically)
*/
BY_NAME {
@Override
- public int compare(Map.Entry<String, ThreadNode> o1, Map.Entry<String, ThreadNode> o2) {
- return o1.getKey().compareTo(o2.getKey());
+ public int compare(ThreadNode o1, ThreadNode o2) {
+ return o1.getThreadLabel().compareTo(o2.getThreadLabel());
}
},
@@ -45,8 +44,8 @@ public enum ThreadNodeOrder implements Comparator<Map.Entry<String, ThreadNode>>
*/
BY_TIME {
@Override
- public int compare(Map.Entry<String, ThreadNode> o1, Map.Entry<String, ThreadNode> o2) {
- return -Double.compare(o1.getValue().getTotalTime(), o2.getValue().getTotalTime());
+ public int compare(ThreadNode o1, ThreadNode o2) {
+ return -Double.compare(o1.getTotalTime(), o2.getTotalTime());
}
}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/AbstractDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/AbstractDataAggregator.java
index 7640d60..ad9dee4 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/AbstractDataAggregator.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/AbstractDataAggregator.java
@@ -23,6 +23,8 @@ package me.lucko.spark.common.sampler.aggregator;
import me.lucko.spark.common.sampler.ThreadGrouper;
import me.lucko.spark.common.sampler.node.ThreadNode;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -50,7 +52,11 @@ public abstract class AbstractDataAggregator implements DataAggregator {
}
@Override
- public Map<String, ThreadNode> getData() {
- return this.threadData;
+ public List<ThreadNode> exportData() {
+ List<ThreadNode> data = new ArrayList<>(this.threadData.values());
+ for (ThreadNode node : data) {
+ node.setThreadLabel(this.threadGrouper.getLabel(node.getThreadGroup()));
+ }
+ return data;
}
}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/DataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/DataAggregator.java
index 3b1d349..5590a96 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/DataAggregator.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/DataAggregator.java
@@ -23,7 +23,7 @@ package me.lucko.spark.common.sampler.aggregator;
import me.lucko.spark.common.sampler.node.ThreadNode;
import me.lucko.spark.proto.SparkSamplerProtos.SamplerMetadata;
-import java.util.Map;
+import java.util.List;
/**
* Aggregates sampling data.
@@ -35,7 +35,7 @@ public interface DataAggregator {
*
* @return the output data
*/
- Map<String, ThreadNode> getData();
+ List<ThreadNode> exportData();
/**
* Gets metadata about the data aggregator instance.
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java
index 5d587a0..60c4e03 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java
@@ -42,7 +42,6 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Comparator;
import java.util.List;
-import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -154,7 +153,7 @@ public class AsyncSampler extends AbstractSampler {
}
@Override
- public SamplerData toProto(SparkPlatform platform, CommandSender creator, Comparator<? super Map.Entry<String, ThreadNode>> outputOrder, String comment, MergeMode mergeMode, ClassSourceLookup classSourceLookup) {
+ public SamplerData toProto(SparkPlatform platform, CommandSender creator, Comparator<ThreadNode> outputOrder, String comment, MergeMode mergeMode, ClassSourceLookup classSourceLookup) {
SamplerData.Builder proto = SamplerData.newBuilder();
writeMetadataToProto(proto, platform, creator, comment, this.dataAggregator);
aggregateOutput();
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaDataAggregator.java
index 54d9e1c..cc530d6 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaDataAggregator.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaDataAggregator.java
@@ -27,7 +27,7 @@ import me.lucko.spark.common.sampler.node.StackTraceNode;
import me.lucko.spark.common.sampler.node.ThreadNode;
import java.lang.management.ThreadInfo;
-import java.util.Map;
+import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
@@ -86,7 +86,7 @@ public abstract class JavaDataAggregator extends AbstractDataAggregator {
}
@Override
- public Map<String, ThreadNode> getData() {
+ public List<ThreadNode> exportData() {
// wait for all pending data to be inserted
this.workerPool.shutdown();
try {
@@ -95,7 +95,7 @@ public abstract class JavaDataAggregator extends AbstractDataAggregator {
e.printStackTrace();
}
- return super.getData();
+ return super.exportData();
}
private static boolean isSleeping(ThreadInfo thread) {
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java
index 2bedae6..cfa0a0f 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java
@@ -37,7 +37,6 @@ import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.util.Comparator;
-import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
@@ -125,7 +124,7 @@ public class JavaSampler extends AbstractSampler implements Runnable {
}
@Override
- public SamplerData toProto(SparkPlatform platform, CommandSender creator, Comparator<? super Map.Entry<String, ThreadNode>> outputOrder, String comment, MergeMode mergeMode, ClassSourceLookup classSourceLookup) {
+ public SamplerData toProto(SparkPlatform platform, CommandSender creator, Comparator<ThreadNode> outputOrder, String comment, MergeMode mergeMode, ClassSourceLookup classSourceLookup) {
SamplerData.Builder proto = SamplerData.newBuilder();
writeMetadataToProto(proto, platform, creator, comment, this.dataAggregator);
writeDataToProto(proto, this.dataAggregator, outputOrder, mergeMode, classSourceLookup);
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/TickedDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/TickedDataAggregator.java
index ac34d01..e817828 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/TickedDataAggregator.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/TickedDataAggregator.java
@@ -29,7 +29,6 @@ import me.lucko.spark.proto.SparkSamplerProtos.SamplerMetadata;
import java.lang.management.ThreadInfo;
import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
@@ -102,13 +101,13 @@ public class TickedDataAggregator extends JavaDataAggregator {
}
@Override
- public Map<String, ThreadNode> getData() {
+ public List<ThreadNode> exportData() {
// push the current tick
synchronized (this.mutex) {
pushCurrentTick();
}
- return super.getData();
+ return super.exportData();
}
private final class TickList implements Runnable {
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/node/ThreadNode.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/node/ThreadNode.java
index fc56987..ed97443 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/node/ThreadNode.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/node/ThreadNode.java
@@ -28,17 +28,34 @@ import me.lucko.spark.proto.SparkSamplerProtos;
public final class ThreadNode extends AbstractNode {
/**
- * The name of this thread
+ * The name of this thread / thread group
*/
- private final String threadName;
+ private final String name;
- public ThreadNode(String threadName) {
- this.threadName = threadName;
+ /**
+ * The label used to describe this thread in the viewer
+ */
+ public String label;
+
+ public ThreadNode(String name) {
+ this.name = name;
+ }
+
+ public String getThreadLabel() {
+ return this.label != null ? this.label : this.name;
+ }
+
+ public String getThreadGroup() {
+ return this.name;
+ }
+
+ public void setThreadLabel(String label) {
+ this.label = label;
}
public SparkSamplerProtos.ThreadNode toProto(MergeMode mergeMode) {
SparkSamplerProtos.ThreadNode.Builder proto = SparkSamplerProtos.ThreadNode.newBuilder()
- .setName(this.threadName)
+ .setName(getThreadLabel())
.setTime(getTotalTime());
for (StackTraceNode child : exportChildren(mergeMode)) {