diff options
author | Luck <git@lucko.me> | 2019-06-16 17:02:39 +0100 |
---|---|---|
committer | Luck <git@lucko.me> | 2019-06-16 17:02:39 +0100 |
commit | 7c614e33f5c050a4e6f83cfbc7b7bd259700778a (patch) | |
tree | 56bbdc2c5d64b520d86121767dc810cca4f22850 /spark-common/src/main | |
parent | bf6272e791abb7f30e4b3aeeeeedbf22b7807aef (diff) | |
download | spark-7c614e33f5c050a4e6f83cfbc7b7bd259700778a.tar.gz spark-7c614e33f5c050a4e6f83cfbc7b7bd259700778a.tar.bz2 spark-7c614e33f5c050a4e6f83cfbc7b7bd259700778a.zip |
Add option to ignore "sleeping" threads when sampling
Diffstat (limited to 'spark-common/src/main')
8 files changed, 137 insertions, 168 deletions
diff --git a/spark-common/src/main/java/me/lucko/spark/common/command/modules/SamplerModule.java b/spark-common/src/main/java/me/lucko/spark/common/command/modules/SamplerModule.java index ee16830..fe7ffe0 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/command/modules/SamplerModule.java +++ b/spark-common/src/main/java/me/lucko/spark/common/command/modules/SamplerModule.java @@ -76,6 +76,7 @@ public class SamplerModule implements CommandModule { .argumentUsage("interval", "interval millis") .argumentUsage("only-ticks-over", "tick length millis") .argumentUsage("include-line-numbers", null) + .argumentUsage("ignore-sleeping", null) .executor((platform, sender, resp, arguments) -> { if (arguments.boolFlag("info")) { if (this.activeSampler == null) { @@ -135,6 +136,7 @@ public class SamplerModule implements CommandModule { } boolean includeLineNumbers = arguments.boolFlag("include-line-numbers"); + boolean ignoreSleeping = arguments.boolFlag("ignore-sleeping"); Set<String> threads = arguments.stringFlag("thread"); ThreadDumper threadDumper; @@ -186,6 +188,7 @@ public class SamplerModule implements CommandModule { } builder.samplingInterval(intervalMillis); builder.includeLineNumbers(includeLineNumbers); + builder.ignoreSleeping(ignoreSleeping); if (ticksOver != -1) { builder.ticksOver(ticksOver, tickCounter); } @@ -198,7 +201,7 @@ public class SamplerModule implements CommandModule { resp.broadcastPrefixed(TextComponent.of("The results will be automatically returned after the profiler has been running for " + timeoutSeconds + " seconds.")); } - CompletableFuture<Sampler> future = activeSampler.getFuture(); + CompletableFuture<Sampler> future = this.activeSampler.getFuture(); // send message if profiling fails future.whenCompleteAsync((s, throwable) -> { @@ -230,7 +233,7 @@ public class SamplerModule implements CommandModule { List<String> opts = new ArrayList<>(Arrays.asList("--info", "--stop", "--cancel", "--timeout", "--regex", "--combine-all", "--not-combined", "--interval", - "--only-ticks-over", "--include-line-numbers")); + "--only-ticks-over", "--include-line-numbers", "--ignore-sleeping")); opts.removeAll(arguments); opts.add("--thread"); // allowed multiple times diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java index 1fb4d2d..7418776 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java @@ -78,16 +78,16 @@ public class Sampler implements Runnable { /** The unix timestamp (in millis) when this sampler should automatically complete.*/ private final long endTime; // -1 for nothing - public Sampler(int interval, ThreadDumper threadDumper, ThreadGrouper threadGrouper, long endTime, boolean includeLineNumbers) { + public Sampler(int interval, ThreadDumper threadDumper, ThreadGrouper threadGrouper, long endTime, boolean includeLineNumbers, boolean ignoreSleeping) { this.threadDumper = threadDumper; - this.dataAggregator = new SimpleDataAggregator(this.workerPool, threadGrouper, interval, includeLineNumbers); + this.dataAggregator = new SimpleDataAggregator(this.workerPool, threadGrouper, interval, includeLineNumbers, ignoreSleeping); this.interval = interval; this.endTime = endTime; } - public Sampler(int interval, ThreadDumper threadDumper, ThreadGrouper threadGrouper, long endTime, boolean includeLineNumbers, TickCounter tickCounter, int tickLengthThreshold) { + public Sampler(int interval, ThreadDumper threadDumper, ThreadGrouper threadGrouper, long endTime, boolean includeLineNumbers, boolean ignoreSleeping, TickCounter tickCounter, int tickLengthThreshold) { this.threadDumper = threadDumper; - this.dataAggregator = new TickedDataAggregator(this.workerPool, tickCounter, threadGrouper, interval, includeLineNumbers, tickLengthThreshold); + this.dataAggregator = new TickedDataAggregator(this.workerPool, threadGrouper, interval, includeLineNumbers, ignoreSleeping, tickCounter, tickLengthThreshold); this.interval = interval; this.endTime = endTime; } @@ -150,15 +150,10 @@ public class Sampler implements Runnable { @Override public void run() { for (ThreadInfo threadInfo : this.threadDumps) { - long threadId = threadInfo.getThreadId(); - String threadName = threadInfo.getThreadName(); - StackTraceElement[] stack = threadInfo.getStackTrace(); - - if (threadName == null || stack == null) { + if (threadInfo.getThreadName() == null || threadInfo.getStackTrace() == null) { continue; } - - this.dataAggregator.insertData(threadId, threadName, stack); + this.dataAggregator.insertData(threadInfo); } } } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java index 4ce69df..4808214 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java @@ -29,6 +29,7 @@ public class SamplerBuilder { private double samplingInterval = 4; // milliseconds private boolean includeLineNumbers = false; + private boolean ignoreSleeping = false; private long timeout = -1; private ThreadDumper threadDumper = ThreadDumper.ALL; private ThreadGrouper threadGrouper = ThreadGrouper.BY_NAME; @@ -73,14 +74,19 @@ public class SamplerBuilder { return this; } + public SamplerBuilder ignoreSleeping(boolean ignoreSleeping) { + this.ignoreSleeping = ignoreSleeping; + return this; + } + public Sampler start() { Sampler sampler; int intervalMicros = (int) (this.samplingInterval * 1000d); - if (this.ticksOver != -1 && this.tickCounter != null) { - sampler = new Sampler(intervalMicros, this.threadDumper, this.threadGrouper, this.timeout, this.includeLineNumbers, this.tickCounter, this.ticksOver); + if (this.ticksOver == -1 || this.tickCounter == null) { + sampler = new Sampler(intervalMicros, this.threadDumper, this.threadGrouper, this.timeout, this.includeLineNumbers, this.ignoreSleeping); } else { - sampler = new Sampler(intervalMicros, this.threadDumper, this.threadGrouper, this.timeout, this.includeLineNumbers); + sampler = new Sampler(intervalMicros, this.threadDumper, this.threadGrouper, this.timeout, this.includeLineNumbers, this.ignoreSleeping, this.tickCounter, this.ticksOver); } sampler.start(); diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/AbstractDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/AbstractDataAggregator.java new file mode 100644 index 0000000..e72dfc5 --- /dev/null +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/AbstractDataAggregator.java @@ -0,0 +1,80 @@ +/* + * This file is part of spark. + * + * Copyright (c) lucko (Luck) <luck@lucko.me> + * Copyright (c) contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +package me.lucko.spark.common.sampler.aggregator; + +import me.lucko.spark.common.sampler.ThreadGrouper; +import me.lucko.spark.common.sampler.node.ThreadNode; + +import java.lang.management.ThreadInfo; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; + +public abstract class AbstractDataAggregator implements DataAggregator { + + /** A map of root stack nodes for each thread with sampling data */ + protected final Map<String, ThreadNode> threadData = new ConcurrentHashMap<>(); + + /** The worker pool for inserting stack nodes */ + protected final ExecutorService workerPool; + + /** The instance used to group threads together */ + protected final ThreadGrouper threadGrouper; + + /** The interval to wait between sampling, in microseconds */ + protected final int interval; + + /** If line numbers should be included in the output */ + private final boolean includeLineNumbers; + + /** If sleeping threads should be ignored */ + private final boolean ignoreSleeping; + + public AbstractDataAggregator(ExecutorService workerPool, ThreadGrouper threadGrouper, int interval, boolean includeLineNumbers, boolean ignoreSleeping) { + this.workerPool = workerPool; + this.threadGrouper = threadGrouper; + this.interval = interval; + this.includeLineNumbers = includeLineNumbers; + this.ignoreSleeping = ignoreSleeping; + } + + protected ThreadNode getNode(String group) { + ThreadNode node = this.threadData.get(group); // fast path + if (node != null) { + return node; + } + return this.threadData.computeIfAbsent(group, ThreadNode::new); + } + + protected void writeData(ThreadInfo threadInfo) { + if (this.ignoreSleeping && (threadInfo.getThreadState() == Thread.State.WAITING || threadInfo.getThreadState() == Thread.State.TIMED_WAITING)) { + return; + } + + try { + ThreadNode node = getNode(this.threadGrouper.getGroup(threadInfo.getThreadId(), threadInfo.getThreadName())); + node.log(threadInfo.getStackTrace(), this.interval, this.includeLineNumbers); + } catch (Exception e) { + e.printStackTrace(); + } + } + +} diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/DataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/DataAggregator.java index 2024f97..8318fbd 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/DataAggregator.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/DataAggregator.java @@ -23,6 +23,7 @@ package me.lucko.spark.common.sampler.aggregator; import me.lucko.spark.common.sampler.node.ThreadNode; import me.lucko.spark.proto.SparkProtos.SamplerMetadata; +import java.lang.management.ThreadInfo; import java.util.Map; /** @@ -39,11 +40,10 @@ public interface DataAggregator { /** * Inserts sampling data into this aggregator - * @param threadId the id of the thread - * @param threadName the name of the thread - * @param stack the call stack + * + * @param threadInfo the thread info */ - void insertData(long threadId, String threadName, StackTraceElement[] stack); + void insertData(ThreadInfo threadInfo); /** * Gets metadata about the data aggregator instance. diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/SimpleDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/SimpleDataAggregator.java index 7df3b9e..ef968fe 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/SimpleDataAggregator.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/SimpleDataAggregator.java @@ -24,54 +24,30 @@ import me.lucko.spark.common.sampler.ThreadGrouper; import me.lucko.spark.common.sampler.node.ThreadNode; import me.lucko.spark.proto.SparkProtos.SamplerMetadata; +import java.lang.management.ThreadInfo; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; /** * Basic implementation of {@link DataAggregator}. */ -public class SimpleDataAggregator implements DataAggregator { - - /** A map of root stack nodes for each thread with sampling data */ - private final Map<String, ThreadNode> threadData = new ConcurrentHashMap<>(); - - /** The worker pool used for sampling */ - private final ExecutorService workerPool; - - /** The instance used to group threads together */ - private final ThreadGrouper threadGrouper; - - /** The interval to wait between sampling, in microseconds */ - private final int interval; - - /** If line numbers should be included in the output */ - private final boolean includeLineNumbers; - - public SimpleDataAggregator(ExecutorService workerPool, ThreadGrouper threadGrouper, int interval, boolean includeLineNumbers) { - this.workerPool = workerPool; - this.threadGrouper = threadGrouper; - this.interval = interval; - this.includeLineNumbers = includeLineNumbers; +public class SimpleDataAggregator extends AbstractDataAggregator { + public SimpleDataAggregator(ExecutorService workerPool, ThreadGrouper threadGrouper, int interval, boolean includeLineNumbers, boolean ignoreSleeping) { + super(workerPool, threadGrouper, interval, includeLineNumbers, ignoreSleeping); } - private ThreadNode getNode(String group) { - ThreadNode node = this.threadData.get(group); // fast path - if (node != null) { - return node; - } - return this.threadData.computeIfAbsent(group, ThreadNode::new); + @Override + public SamplerMetadata.DataAggregator getMetadata() { + return SamplerMetadata.DataAggregator.newBuilder() + .setType(SamplerMetadata.DataAggregator.Type.SIMPLE) + .setThreadGrouper(ThreadGrouper.asProto(this.threadGrouper)) + .build(); } @Override - public void insertData(long threadId, String threadName, StackTraceElement[] stack) { - try { - ThreadNode node = getNode(this.threadGrouper.getGroup(threadId, threadName)); - node.log(stack, this.interval, this.includeLineNumbers); - } catch (Exception e) { - e.printStackTrace(); - } + public void insertData(ThreadInfo threadInfo) { + writeData(threadInfo); } @Override @@ -86,12 +62,4 @@ public class SimpleDataAggregator implements DataAggregator { return this.threadData; } - - @Override - public SamplerMetadata.DataAggregator getMetadata() { - return SamplerMetadata.DataAggregator.newBuilder() - .setType(SamplerMetadata.DataAggregator.Type.SIMPLE) - .setThreadGrouper(ThreadGrouper.asProto(this.threadGrouper)) - .build(); - } } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/TickedDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/TickedDataAggregator.java index 2437de8..a4dfdb8 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/TickedDataAggregator.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/TickedDataAggregator.java @@ -22,14 +22,13 @@ package me.lucko.spark.common.sampler.aggregator; import me.lucko.spark.common.sampler.ThreadGrouper; import me.lucko.spark.common.sampler.TickCounter; -import me.lucko.spark.common.sampler.node.AbstractNode; import me.lucko.spark.common.sampler.node.ThreadNode; import me.lucko.spark.proto.SparkProtos.SamplerMetadata; +import java.lang.management.ThreadInfo; import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; @@ -37,26 +36,11 @@ import java.util.concurrent.TimeUnit; * Implementation of {@link DataAggregator} which supports only including sampling data from "ticks" * which exceed a certain threshold in duration. */ -public class TickedDataAggregator implements DataAggregator { - - /** A map of root stack nodes for each thread with sampling data */ - private final Map<String, ThreadNode> threadData = new ConcurrentHashMap<>(); - - /** The worker pool for inserting stack nodes */ - private final ExecutorService workerPool; +public class TickedDataAggregator extends AbstractDataAggregator { /** Used to monitor the current "tick" of the server */ private final TickCounter tickCounter; - /** The instance used to group threads together */ - private final ThreadGrouper threadGrouper; - - /** The interval to wait between sampling, in microseconds */ - private final int interval; - - /** If line numbers should be included in the output */ - private final boolean includeLineNumbers; - /** Tick durations under this threshold will not be inserted, measured in microseconds */ private final long tickLengthThreshold; @@ -69,12 +53,9 @@ public class TickedDataAggregator implements DataAggregator { private int currentTick = -1; private TickList currentData = new TickList(0); - public TickedDataAggregator(ExecutorService workerPool, TickCounter tickCounter, ThreadGrouper threadGrouper, int interval, boolean includeLineNumbers, int tickLengthThreshold) { - this.workerPool = workerPool; + public TickedDataAggregator(ExecutorService workerPool, ThreadGrouper threadGrouper, int interval, boolean includeLineNumbers, boolean ignoreSleeping, TickCounter tickCounter, int tickLengthThreshold) { + super(workerPool, threadGrouper, interval, includeLineNumbers, ignoreSleeping); this.tickCounter = tickCounter; - this.threadGrouper = threadGrouper; - this.interval = interval; - this.includeLineNumbers = includeLineNumbers; this.tickLengthThreshold = TimeUnit.MILLISECONDS.toMicros(tickLengthThreshold); // 50 millis in a tick, plus 10 so we have a bit of room to go over double intervalMilliseconds = interval / 1000d; @@ -82,7 +63,16 @@ public class TickedDataAggregator implements DataAggregator { } @Override - public void insertData(long threadId, String threadName, StackTraceElement[] stack) { + public SamplerMetadata.DataAggregator getMetadata() { + return SamplerMetadata.DataAggregator.newBuilder() + .setType(SamplerMetadata.DataAggregator.Type.TICKED) + .setThreadGrouper(ThreadGrouper.asProto(this.threadGrouper)) + .setTickLengthThreshold(this.tickLengthThreshold) + .build(); + } + + @Override + public void insertData(ThreadInfo threadInfo) { synchronized (this.mutex) { int tick = this.tickCounter.getCurrentTick(); if (this.currentTick != tick) { @@ -91,10 +81,7 @@ public class TickedDataAggregator implements DataAggregator { this.currentData = new TickList(this.expectedSize); } - // form the queued data - QueuedThreadInfo queuedData = new QueuedThreadInfo(threadId, threadName, stack); - // insert it - this.currentData.addData(queuedData); + this.currentData.addData(threadInfo); } } @@ -131,37 +118,8 @@ public class TickedDataAggregator implements DataAggregator { return this.threadData; } - private ThreadNode getNode(String group) { - ThreadNode node = this.threadData.get(group); // fast path - if (node != null) { - return node; - } - return this.threadData.computeIfAbsent(group, ThreadNode::new); - } - - // called by TickList - void insertData(List<QueuedThreadInfo> dataList) { - for (QueuedThreadInfo data : dataList) { - try { - AbstractNode node = getNode(this.threadGrouper.getGroup(data.threadId, data.threadName)); - node.log(data.stack, this.interval, this.includeLineNumbers); - } catch (Exception e) { - e.printStackTrace(); - } - } - } - - @Override - public SamplerMetadata.DataAggregator getMetadata() { - return SamplerMetadata.DataAggregator.newBuilder() - .setType(SamplerMetadata.DataAggregator.Type.TICKED) - .setThreadGrouper(ThreadGrouper.asProto(this.threadGrouper)) - .setTickLengthThreshold(this.tickLengthThreshold) - .build(); - } - private final class TickList implements Runnable { - private final List<QueuedThreadInfo> list; + private final List<ThreadInfo> list; TickList(int expectedSize) { this.list = new ArrayList<>(expectedSize); @@ -169,27 +127,17 @@ public class TickedDataAggregator implements DataAggregator { @Override public void run() { - insertData(this.list); + for (ThreadInfo data : this.list) { + writeData(data); + } } - public List<QueuedThreadInfo> getList() { + public List<ThreadInfo> getList() { return this.list; } - public void addData(QueuedThreadInfo data) { + public void addData(ThreadInfo data) { this.list.add(data); } } - - private static final class QueuedThreadInfo { - private final long threadId; - private final String threadName; - private final StackTraceElement[] stack; - - QueuedThreadInfo(long threadId, String threadName, StackTraceElement[] stack) { - this.threadId = threadId; - this.threadName = threadName; - this.stack = stack; - } - } } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/protocol/ProtocolConstants.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/protocol/ProtocolConstants.java deleted file mode 100644 index 1995a58..0000000 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/protocol/ProtocolConstants.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * This file is part of spark. - * - * Copyright (c) lucko (Luck) <luck@lucko.me> - * Copyright (c) contributors - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - */ - -package me.lucko.spark.common.sampler.protocol; - -public interface ProtocolConstants { - - int VERSION = 1; - int SAMPLER_TYPE = 0; - - byte THREAD_NODE_TYPE = 0; - byte STACK_TRACE_NODE_TYPE = 1; - -} |