aboutsummaryrefslogtreecommitdiff
path: root/spark-common/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'spark-common/src/main')
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/command/modules/SamplerModule.java7
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java17
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java12
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/AbstractDataAggregator.java80
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/DataAggregator.java8
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/SimpleDataAggregator.java56
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/TickedDataAggregator.java94
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/protocol/ProtocolConstants.java31
8 files changed, 137 insertions, 168 deletions
diff --git a/spark-common/src/main/java/me/lucko/spark/common/command/modules/SamplerModule.java b/spark-common/src/main/java/me/lucko/spark/common/command/modules/SamplerModule.java
index ee16830..fe7ffe0 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/command/modules/SamplerModule.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/command/modules/SamplerModule.java
@@ -76,6 +76,7 @@ public class SamplerModule implements CommandModule {
.argumentUsage("interval", "interval millis")
.argumentUsage("only-ticks-over", "tick length millis")
.argumentUsage("include-line-numbers", null)
+ .argumentUsage("ignore-sleeping", null)
.executor((platform, sender, resp, arguments) -> {
if (arguments.boolFlag("info")) {
if (this.activeSampler == null) {
@@ -135,6 +136,7 @@ public class SamplerModule implements CommandModule {
}
boolean includeLineNumbers = arguments.boolFlag("include-line-numbers");
+ boolean ignoreSleeping = arguments.boolFlag("ignore-sleeping");
Set<String> threads = arguments.stringFlag("thread");
ThreadDumper threadDumper;
@@ -186,6 +188,7 @@ public class SamplerModule implements CommandModule {
}
builder.samplingInterval(intervalMillis);
builder.includeLineNumbers(includeLineNumbers);
+ builder.ignoreSleeping(ignoreSleeping);
if (ticksOver != -1) {
builder.ticksOver(ticksOver, tickCounter);
}
@@ -198,7 +201,7 @@ public class SamplerModule implements CommandModule {
resp.broadcastPrefixed(TextComponent.of("The results will be automatically returned after the profiler has been running for " + timeoutSeconds + " seconds."));
}
- CompletableFuture<Sampler> future = activeSampler.getFuture();
+ CompletableFuture<Sampler> future = this.activeSampler.getFuture();
// send message if profiling fails
future.whenCompleteAsync((s, throwable) -> {
@@ -230,7 +233,7 @@ public class SamplerModule implements CommandModule {
List<String> opts = new ArrayList<>(Arrays.asList("--info", "--stop", "--cancel",
"--timeout", "--regex", "--combine-all", "--not-combined", "--interval",
- "--only-ticks-over", "--include-line-numbers"));
+ "--only-ticks-over", "--include-line-numbers", "--ignore-sleeping"));
opts.removeAll(arguments);
opts.add("--thread"); // allowed multiple times
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java
index 1fb4d2d..7418776 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java
@@ -78,16 +78,16 @@ public class Sampler implements Runnable {
/** The unix timestamp (in millis) when this sampler should automatically complete.*/
private final long endTime; // -1 for nothing
- public Sampler(int interval, ThreadDumper threadDumper, ThreadGrouper threadGrouper, long endTime, boolean includeLineNumbers) {
+ public Sampler(int interval, ThreadDumper threadDumper, ThreadGrouper threadGrouper, long endTime, boolean includeLineNumbers, boolean ignoreSleeping) {
this.threadDumper = threadDumper;
- this.dataAggregator = new SimpleDataAggregator(this.workerPool, threadGrouper, interval, includeLineNumbers);
+ this.dataAggregator = new SimpleDataAggregator(this.workerPool, threadGrouper, interval, includeLineNumbers, ignoreSleeping);
this.interval = interval;
this.endTime = endTime;
}
- public Sampler(int interval, ThreadDumper threadDumper, ThreadGrouper threadGrouper, long endTime, boolean includeLineNumbers, TickCounter tickCounter, int tickLengthThreshold) {
+ public Sampler(int interval, ThreadDumper threadDumper, ThreadGrouper threadGrouper, long endTime, boolean includeLineNumbers, boolean ignoreSleeping, TickCounter tickCounter, int tickLengthThreshold) {
this.threadDumper = threadDumper;
- this.dataAggregator = new TickedDataAggregator(this.workerPool, tickCounter, threadGrouper, interval, includeLineNumbers, tickLengthThreshold);
+ this.dataAggregator = new TickedDataAggregator(this.workerPool, threadGrouper, interval, includeLineNumbers, ignoreSleeping, tickCounter, tickLengthThreshold);
this.interval = interval;
this.endTime = endTime;
}
@@ -150,15 +150,10 @@ public class Sampler implements Runnable {
@Override
public void run() {
for (ThreadInfo threadInfo : this.threadDumps) {
- long threadId = threadInfo.getThreadId();
- String threadName = threadInfo.getThreadName();
- StackTraceElement[] stack = threadInfo.getStackTrace();
-
- if (threadName == null || stack == null) {
+ if (threadInfo.getThreadName() == null || threadInfo.getStackTrace() == null) {
continue;
}
-
- this.dataAggregator.insertData(threadId, threadName, stack);
+ this.dataAggregator.insertData(threadInfo);
}
}
}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java
index 4ce69df..4808214 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java
@@ -29,6 +29,7 @@ public class SamplerBuilder {
private double samplingInterval = 4; // milliseconds
private boolean includeLineNumbers = false;
+ private boolean ignoreSleeping = false;
private long timeout = -1;
private ThreadDumper threadDumper = ThreadDumper.ALL;
private ThreadGrouper threadGrouper = ThreadGrouper.BY_NAME;
@@ -73,14 +74,19 @@ public class SamplerBuilder {
return this;
}
+ public SamplerBuilder ignoreSleeping(boolean ignoreSleeping) {
+ this.ignoreSleeping = ignoreSleeping;
+ return this;
+ }
+
public Sampler start() {
Sampler sampler;
int intervalMicros = (int) (this.samplingInterval * 1000d);
- if (this.ticksOver != -1 && this.tickCounter != null) {
- sampler = new Sampler(intervalMicros, this.threadDumper, this.threadGrouper, this.timeout, this.includeLineNumbers, this.tickCounter, this.ticksOver);
+ if (this.ticksOver == -1 || this.tickCounter == null) {
+ sampler = new Sampler(intervalMicros, this.threadDumper, this.threadGrouper, this.timeout, this.includeLineNumbers, this.ignoreSleeping);
} else {
- sampler = new Sampler(intervalMicros, this.threadDumper, this.threadGrouper, this.timeout, this.includeLineNumbers);
+ sampler = new Sampler(intervalMicros, this.threadDumper, this.threadGrouper, this.timeout, this.includeLineNumbers, this.ignoreSleeping, this.tickCounter, this.ticksOver);
}
sampler.start();
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/AbstractDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/AbstractDataAggregator.java
new file mode 100644
index 0000000..e72dfc5
--- /dev/null
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/AbstractDataAggregator.java
@@ -0,0 +1,80 @@
+/*
+ * This file is part of spark.
+ *
+ * Copyright (c) lucko (Luck) <luck@lucko.me>
+ * Copyright (c) contributors
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+package me.lucko.spark.common.sampler.aggregator;
+
+import me.lucko.spark.common.sampler.ThreadGrouper;
+import me.lucko.spark.common.sampler.node.ThreadNode;
+
+import java.lang.management.ThreadInfo;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+
+public abstract class AbstractDataAggregator implements DataAggregator {
+
+ /** A map of root stack nodes for each thread with sampling data */
+ protected final Map<String, ThreadNode> threadData = new ConcurrentHashMap<>();
+
+ /** The worker pool for inserting stack nodes */
+ protected final ExecutorService workerPool;
+
+ /** The instance used to group threads together */
+ protected final ThreadGrouper threadGrouper;
+
+ /** The interval to wait between sampling, in microseconds */
+ protected final int interval;
+
+ /** If line numbers should be included in the output */
+ private final boolean includeLineNumbers;
+
+ /** If sleeping threads should be ignored */
+ private final boolean ignoreSleeping;
+
+ public AbstractDataAggregator(ExecutorService workerPool, ThreadGrouper threadGrouper, int interval, boolean includeLineNumbers, boolean ignoreSleeping) {
+ this.workerPool = workerPool;
+ this.threadGrouper = threadGrouper;
+ this.interval = interval;
+ this.includeLineNumbers = includeLineNumbers;
+ this.ignoreSleeping = ignoreSleeping;
+ }
+
+ protected ThreadNode getNode(String group) {
+ ThreadNode node = this.threadData.get(group); // fast path
+ if (node != null) {
+ return node;
+ }
+ return this.threadData.computeIfAbsent(group, ThreadNode::new);
+ }
+
+ protected void writeData(ThreadInfo threadInfo) {
+ if (this.ignoreSleeping && (threadInfo.getThreadState() == Thread.State.WAITING || threadInfo.getThreadState() == Thread.State.TIMED_WAITING)) {
+ return;
+ }
+
+ try {
+ ThreadNode node = getNode(this.threadGrouper.getGroup(threadInfo.getThreadId(), threadInfo.getThreadName()));
+ node.log(threadInfo.getStackTrace(), this.interval, this.includeLineNumbers);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/DataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/DataAggregator.java
index 2024f97..8318fbd 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/DataAggregator.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/DataAggregator.java
@@ -23,6 +23,7 @@ package me.lucko.spark.common.sampler.aggregator;
import me.lucko.spark.common.sampler.node.ThreadNode;
import me.lucko.spark.proto.SparkProtos.SamplerMetadata;
+import java.lang.management.ThreadInfo;
import java.util.Map;
/**
@@ -39,11 +40,10 @@ public interface DataAggregator {
/**
* Inserts sampling data into this aggregator
- * @param threadId the id of the thread
- * @param threadName the name of the thread
- * @param stack the call stack
+ *
+ * @param threadInfo the thread info
*/
- void insertData(long threadId, String threadName, StackTraceElement[] stack);
+ void insertData(ThreadInfo threadInfo);
/**
* Gets metadata about the data aggregator instance.
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/SimpleDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/SimpleDataAggregator.java
index 7df3b9e..ef968fe 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/SimpleDataAggregator.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/SimpleDataAggregator.java
@@ -24,54 +24,30 @@ import me.lucko.spark.common.sampler.ThreadGrouper;
import me.lucko.spark.common.sampler.node.ThreadNode;
import me.lucko.spark.proto.SparkProtos.SamplerMetadata;
+import java.lang.management.ThreadInfo;
import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
/**
* Basic implementation of {@link DataAggregator}.
*/
-public class SimpleDataAggregator implements DataAggregator {
-
- /** A map of root stack nodes for each thread with sampling data */
- private final Map<String, ThreadNode> threadData = new ConcurrentHashMap<>();
-
- /** The worker pool used for sampling */
- private final ExecutorService workerPool;
-
- /** The instance used to group threads together */
- private final ThreadGrouper threadGrouper;
-
- /** The interval to wait between sampling, in microseconds */
- private final int interval;
-
- /** If line numbers should be included in the output */
- private final boolean includeLineNumbers;
-
- public SimpleDataAggregator(ExecutorService workerPool, ThreadGrouper threadGrouper, int interval, boolean includeLineNumbers) {
- this.workerPool = workerPool;
- this.threadGrouper = threadGrouper;
- this.interval = interval;
- this.includeLineNumbers = includeLineNumbers;
+public class SimpleDataAggregator extends AbstractDataAggregator {
+ public SimpleDataAggregator(ExecutorService workerPool, ThreadGrouper threadGrouper, int interval, boolean includeLineNumbers, boolean ignoreSleeping) {
+ super(workerPool, threadGrouper, interval, includeLineNumbers, ignoreSleeping);
}
- private ThreadNode getNode(String group) {
- ThreadNode node = this.threadData.get(group); // fast path
- if (node != null) {
- return node;
- }
- return this.threadData.computeIfAbsent(group, ThreadNode::new);
+ @Override
+ public SamplerMetadata.DataAggregator getMetadata() {
+ return SamplerMetadata.DataAggregator.newBuilder()
+ .setType(SamplerMetadata.DataAggregator.Type.SIMPLE)
+ .setThreadGrouper(ThreadGrouper.asProto(this.threadGrouper))
+ .build();
}
@Override
- public void insertData(long threadId, String threadName, StackTraceElement[] stack) {
- try {
- ThreadNode node = getNode(this.threadGrouper.getGroup(threadId, threadName));
- node.log(stack, this.interval, this.includeLineNumbers);
- } catch (Exception e) {
- e.printStackTrace();
- }
+ public void insertData(ThreadInfo threadInfo) {
+ writeData(threadInfo);
}
@Override
@@ -86,12 +62,4 @@ public class SimpleDataAggregator implements DataAggregator {
return this.threadData;
}
-
- @Override
- public SamplerMetadata.DataAggregator getMetadata() {
- return SamplerMetadata.DataAggregator.newBuilder()
- .setType(SamplerMetadata.DataAggregator.Type.SIMPLE)
- .setThreadGrouper(ThreadGrouper.asProto(this.threadGrouper))
- .build();
- }
}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/TickedDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/TickedDataAggregator.java
index 2437de8..a4dfdb8 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/TickedDataAggregator.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/TickedDataAggregator.java
@@ -22,14 +22,13 @@ package me.lucko.spark.common.sampler.aggregator;
import me.lucko.spark.common.sampler.ThreadGrouper;
import me.lucko.spark.common.sampler.TickCounter;
-import me.lucko.spark.common.sampler.node.AbstractNode;
import me.lucko.spark.common.sampler.node.ThreadNode;
import me.lucko.spark.proto.SparkProtos.SamplerMetadata;
+import java.lang.management.ThreadInfo;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
@@ -37,26 +36,11 @@ import java.util.concurrent.TimeUnit;
* Implementation of {@link DataAggregator} which supports only including sampling data from "ticks"
* which exceed a certain threshold in duration.
*/
-public class TickedDataAggregator implements DataAggregator {
-
- /** A map of root stack nodes for each thread with sampling data */
- private final Map<String, ThreadNode> threadData = new ConcurrentHashMap<>();
-
- /** The worker pool for inserting stack nodes */
- private final ExecutorService workerPool;
+public class TickedDataAggregator extends AbstractDataAggregator {
/** Used to monitor the current "tick" of the server */
private final TickCounter tickCounter;
- /** The instance used to group threads together */
- private final ThreadGrouper threadGrouper;
-
- /** The interval to wait between sampling, in microseconds */
- private final int interval;
-
- /** If line numbers should be included in the output */
- private final boolean includeLineNumbers;
-
/** Tick durations under this threshold will not be inserted, measured in microseconds */
private final long tickLengthThreshold;
@@ -69,12 +53,9 @@ public class TickedDataAggregator implements DataAggregator {
private int currentTick = -1;
private TickList currentData = new TickList(0);
- public TickedDataAggregator(ExecutorService workerPool, TickCounter tickCounter, ThreadGrouper threadGrouper, int interval, boolean includeLineNumbers, int tickLengthThreshold) {
- this.workerPool = workerPool;
+ public TickedDataAggregator(ExecutorService workerPool, ThreadGrouper threadGrouper, int interval, boolean includeLineNumbers, boolean ignoreSleeping, TickCounter tickCounter, int tickLengthThreshold) {
+ super(workerPool, threadGrouper, interval, includeLineNumbers, ignoreSleeping);
this.tickCounter = tickCounter;
- this.threadGrouper = threadGrouper;
- this.interval = interval;
- this.includeLineNumbers = includeLineNumbers;
this.tickLengthThreshold = TimeUnit.MILLISECONDS.toMicros(tickLengthThreshold);
// 50 millis in a tick, plus 10 so we have a bit of room to go over
double intervalMilliseconds = interval / 1000d;
@@ -82,7 +63,16 @@ public class TickedDataAggregator implements DataAggregator {
}
@Override
- public void insertData(long threadId, String threadName, StackTraceElement[] stack) {
+ public SamplerMetadata.DataAggregator getMetadata() {
+ return SamplerMetadata.DataAggregator.newBuilder()
+ .setType(SamplerMetadata.DataAggregator.Type.TICKED)
+ .setThreadGrouper(ThreadGrouper.asProto(this.threadGrouper))
+ .setTickLengthThreshold(this.tickLengthThreshold)
+ .build();
+ }
+
+ @Override
+ public void insertData(ThreadInfo threadInfo) {
synchronized (this.mutex) {
int tick = this.tickCounter.getCurrentTick();
if (this.currentTick != tick) {
@@ -91,10 +81,7 @@ public class TickedDataAggregator implements DataAggregator {
this.currentData = new TickList(this.expectedSize);
}
- // form the queued data
- QueuedThreadInfo queuedData = new QueuedThreadInfo(threadId, threadName, stack);
- // insert it
- this.currentData.addData(queuedData);
+ this.currentData.addData(threadInfo);
}
}
@@ -131,37 +118,8 @@ public class TickedDataAggregator implements DataAggregator {
return this.threadData;
}
- private ThreadNode getNode(String group) {
- ThreadNode node = this.threadData.get(group); // fast path
- if (node != null) {
- return node;
- }
- return this.threadData.computeIfAbsent(group, ThreadNode::new);
- }
-
- // called by TickList
- void insertData(List<QueuedThreadInfo> dataList) {
- for (QueuedThreadInfo data : dataList) {
- try {
- AbstractNode node = getNode(this.threadGrouper.getGroup(data.threadId, data.threadName));
- node.log(data.stack, this.interval, this.includeLineNumbers);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
-
- @Override
- public SamplerMetadata.DataAggregator getMetadata() {
- return SamplerMetadata.DataAggregator.newBuilder()
- .setType(SamplerMetadata.DataAggregator.Type.TICKED)
- .setThreadGrouper(ThreadGrouper.asProto(this.threadGrouper))
- .setTickLengthThreshold(this.tickLengthThreshold)
- .build();
- }
-
private final class TickList implements Runnable {
- private final List<QueuedThreadInfo> list;
+ private final List<ThreadInfo> list;
TickList(int expectedSize) {
this.list = new ArrayList<>(expectedSize);
@@ -169,27 +127,17 @@ public class TickedDataAggregator implements DataAggregator {
@Override
public void run() {
- insertData(this.list);
+ for (ThreadInfo data : this.list) {
+ writeData(data);
+ }
}
- public List<QueuedThreadInfo> getList() {
+ public List<ThreadInfo> getList() {
return this.list;
}
- public void addData(QueuedThreadInfo data) {
+ public void addData(ThreadInfo data) {
this.list.add(data);
}
}
-
- private static final class QueuedThreadInfo {
- private final long threadId;
- private final String threadName;
- private final StackTraceElement[] stack;
-
- QueuedThreadInfo(long threadId, String threadName, StackTraceElement[] stack) {
- this.threadId = threadId;
- this.threadName = threadName;
- this.stack = stack;
- }
- }
}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/protocol/ProtocolConstants.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/protocol/ProtocolConstants.java
deleted file mode 100644
index 1995a58..0000000
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/protocol/ProtocolConstants.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * This file is part of spark.
- *
- * Copyright (c) lucko (Luck) <luck@lucko.me>
- * Copyright (c) contributors
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- */
-
-package me.lucko.spark.common.sampler.protocol;
-
-public interface ProtocolConstants {
-
- int VERSION = 1;
- int SAMPLER_TYPE = 0;
-
- byte THREAD_NODE_TYPE = 0;
- byte STACK_TRACE_NODE_TYPE = 1;
-
-}