aboutsummaryrefslogtreecommitdiff
path: root/spark-common/src/main/java
diff options
context:
space:
mode:
authorLuck <git@lucko.me>2019-05-13 11:09:20 +0100
committerLuck <git@lucko.me>2019-05-13 11:09:31 +0100
commit226d205fc6606cc3360791f2aea3a491d577750e (patch)
treed834525350840cba8dbc637c2eb2fe46d3e1e977 /spark-common/src/main/java
parent32f6355c8c7fc1611140bfcce6afe8b25cea5697 (diff)
downloadspark-226d205fc6606cc3360791f2aea3a491d577750e.tar.gz
spark-226d205fc6606cc3360791f2aea3a491d577750e.tar.bz2
spark-226d205fc6606cc3360791f2aea3a491d577750e.zip
Write metadata to sampler output
Diffstat (limited to 'spark-common/src/main/java')
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java18
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadDumper.java43
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadGrouper.java39
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/DataAggregator.java10
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/SimpleDataAggregator.java8
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/TickedDataAggregator.java9
6 files changed, 110 insertions, 17 deletions
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java
index c57e4e2..f68a611 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java
@@ -163,10 +163,28 @@ public class Sampler implements Runnable {
}
}
+ private void writeMetadata(JsonWriter writer) throws IOException {
+ writer.name("startTime").value(startTime);
+ writer.name("interval").value(interval);
+
+ writer.name("threadDumper").beginObject();
+ threadDumper.writeMetadata(writer);
+ writer.endObject();
+
+ writer.name("dataAggregator").beginObject();
+ dataAggregator.writeMetadata(writer);
+ writer.endObject();
+ }
+
private void writeOutput(JsonWriter writer) throws IOException {
writer.beginObject();
writer.name("type").value("sampler");
+
+ writer.name("metadata").beginObject();
+ writeMetadata(writer);
+ writer.endObject();
+
writer.name("threads").beginArray();
List<Map.Entry<String, ThreadNode>> data = new ArrayList<>(this.dataAggregator.getData().entrySet());
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadDumper.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadDumper.java
index 14938ac..2ceec04 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadDumper.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadDumper.java
@@ -21,8 +21,10 @@
package me.lucko.spark.common.sampler;
+import com.google.gson.stream.JsonWriter;
import me.lucko.spark.common.util.ThreadFinder;
+import java.io.IOException;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.util.HashMap;
@@ -37,7 +39,6 @@ import java.util.stream.Collectors;
* Uses the {@link ThreadMXBean} to generate {@link ThreadInfo} instances for the threads being
* sampled.
*/
-@FunctionalInterface
public interface ThreadDumper {
/**
@@ -49,9 +50,27 @@ public interface ThreadDumper {
ThreadInfo[] dumpThreads(ThreadMXBean threadBean);
/**
+ * Writes metadata about the thread dumper instance to the given {@code writer}.
+ *
+ * @param writer the writer
+ * @throws IOException if thrown by the writer
+ */
+ void writeMetadata(JsonWriter writer) throws IOException;
+
+ /**
* Implementation of {@link ThreadDumper} that generates data for all threads.
*/
- ThreadDumper ALL = threadBean -> threadBean.dumpAllThreads(false, false);
+ ThreadDumper ALL = new ThreadDumper() {
+ @Override
+ public ThreadInfo[] dumpThreads(final ThreadMXBean threadBean) {
+ return threadBean.dumpAllThreads(false, false);
+ }
+
+ @Override
+ public void writeMetadata(JsonWriter writer) throws IOException {
+ writer.name("type").value("all");
+ }
+ };
/**
* Implementation of {@link ThreadDumper} that generates data for a specific set of threads.
@@ -76,6 +95,16 @@ public interface ThreadDumper {
public ThreadInfo[] dumpThreads(ThreadMXBean threadBean) {
return threadBean.getThreadInfo(this.ids, Integer.MAX_VALUE);
}
+
+ @Override
+ public void writeMetadata(JsonWriter writer) throws IOException {
+ writer.name("type").value("specific");
+ writer.name("ids").beginArray();
+ for (long id : ids) {
+ writer.value(id);
+ }
+ writer.endArray();
+ }
}
/**
@@ -121,6 +150,16 @@ public interface ThreadDumper {
.filter(Objects::nonNull)
.toArray(ThreadInfo[]::new);
}
+
+ @Override
+ public void writeMetadata(JsonWriter writer) throws IOException {
+ writer.name("type").value("regex");
+ writer.name("patterns").beginArray();
+ for (Pattern pattern : namePatterns) {
+ writer.value(pattern.pattern());
+ }
+ writer.endArray();
+ }
}
}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadGrouper.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadGrouper.java
index 44cf54e..3023676 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadGrouper.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadGrouper.java
@@ -28,22 +28,17 @@ import java.util.regex.Pattern;
/**
* Function for grouping threads together
*/
-@FunctionalInterface
-public interface ThreadGrouper {
-
- /**
- * Gets the group for the given thread.
- *
- * @param threadId the id of the thread
- * @param threadName the name of the thread
- * @return the group
- */
- String getGroup(long threadId, String threadName);
+public enum ThreadGrouper {
/**
* Implementation of {@link ThreadGrouper} that just groups by thread name.
*/
- ThreadGrouper BY_NAME = (threadId, threadName) -> threadName;
+ BY_NAME {
+ @Override
+ public String getGroup(long threadId, String threadName) {
+ return threadName;
+ }
+ },
/**
* Implementation of {@link ThreadGrouper} that attempts to group by the name of the pool
@@ -52,7 +47,7 @@ public interface ThreadGrouper {
* <p>The regex pattern used to match pools expects a digit at the end of the thread name,
* separated from the pool name with any of one or more of ' ', '-', or '#'.</p>
*/
- ThreadGrouper BY_POOL = new ThreadGrouper() {
+ BY_POOL {
private final Map<Long, String> cache = new ConcurrentHashMap<>();
private final Pattern pattern = Pattern.compile("^(.*?)[-# ]+\\d+$");
@@ -72,12 +67,26 @@ public interface ThreadGrouper {
this.cache.put(threadId, group); // we don't care about race conditions here
return group;
}
- };
+ },
/**
* Implementation of {@link ThreadGrouper} which groups all threads as one, under
* the name "All".
*/
- ThreadGrouper AS_ONE = (threadId, threadName) -> "All";
+ AS_ONE {
+ @Override
+ public String getGroup(long threadId, String threadName) {
+ return "All";
+ }
+ };
+
+ /**
+ * Gets the group for the given thread.
+ *
+ * @param threadId the id of the thread
+ * @param threadName the name of the thread
+ * @return the group
+ */
+ public abstract String getGroup(long threadId, String threadName);
}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/DataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/DataAggregator.java
index b8ca0e7..6fc62d4 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/DataAggregator.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/DataAggregator.java
@@ -20,8 +20,10 @@
package me.lucko.spark.common.sampler.aggregator;
+import com.google.gson.stream.JsonWriter;
import me.lucko.spark.common.sampler.node.ThreadNode;
+import java.io.IOException;
import java.util.Map;
/**
@@ -44,4 +46,12 @@ public interface DataAggregator {
*/
void insertData(long threadId, String threadName, StackTraceElement[] stack);
+ /**
+ * Writes metadata about the data aggregator instance to the given {@code writer}.
+ *
+ * @param writer the writer
+ * @throws IOException if thrown by the writer
+ */
+ void writeMetadata(JsonWriter writer) throws IOException;
+
}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/SimpleDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/SimpleDataAggregator.java
index 8a38612..52e4912 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/SimpleDataAggregator.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/SimpleDataAggregator.java
@@ -20,9 +20,11 @@
package me.lucko.spark.common.sampler.aggregator;
+import com.google.gson.stream.JsonWriter;
import me.lucko.spark.common.sampler.ThreadGrouper;
import me.lucko.spark.common.sampler.node.ThreadNode;
+import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
@@ -85,4 +87,10 @@ public class SimpleDataAggregator implements DataAggregator {
return this.threadData;
}
+
+ @Override
+ public void writeMetadata(JsonWriter writer) throws IOException {
+ writer.name("type").value("simple");
+ writer.name("threadGrouper").value(this.threadGrouper.name().toLowerCase());
+ }
}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/TickedDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/TickedDataAggregator.java
index d51c0a2..3b9bcd2 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/TickedDataAggregator.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/TickedDataAggregator.java
@@ -20,11 +20,13 @@
package me.lucko.spark.common.sampler.aggregator;
+import com.google.gson.stream.JsonWriter;
import me.lucko.spark.common.sampler.ThreadGrouper;
import me.lucko.spark.common.sampler.TickCounter;
import me.lucko.spark.common.sampler.node.AbstractNode;
import me.lucko.spark.common.sampler.node.ThreadNode;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -150,6 +152,13 @@ public class TickedDataAggregator implements DataAggregator {
}
}
+ @Override
+ public void writeMetadata(JsonWriter writer) throws IOException {
+ writer.name("type").value("ticked");
+ writer.name("threadGrouper").value(this.threadGrouper.name().toLowerCase());
+ writer.name("tickLengthThreshold").value(this.tickLengthThreshold);
+ }
+
private final class TickList implements Runnable {
private final List<QueuedThreadInfo> list;