diff options
| author | Luck <git@lucko.me> | 2019-06-16 17:02:39 +0100 | 
|---|---|---|
| committer | Luck <git@lucko.me> | 2019-06-16 17:02:39 +0100 | 
| commit | 7c614e33f5c050a4e6f83cfbc7b7bd259700778a (patch) | |
| tree | 56bbdc2c5d64b520d86121767dc810cca4f22850 /spark-common/src/main/java/me/lucko/spark/common/sampler | |
| parent | bf6272e791abb7f30e4b3aeeeeedbf22b7807aef (diff) | |
| download | spark-7c614e33f5c050a4e6f83cfbc7b7bd259700778a.tar.gz spark-7c614e33f5c050a4e6f83cfbc7b7bd259700778a.tar.bz2 spark-7c614e33f5c050a4e6f83cfbc7b7bd259700778a.zip | |
Add option to ignore "sleeping" threads when sampling
Diffstat (limited to 'spark-common/src/main/java/me/lucko/spark/common/sampler')
7 files changed, 132 insertions, 166 deletions
| diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java index 1fb4d2d..7418776 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java @@ -78,16 +78,16 @@ public class Sampler implements Runnable {      /** The unix timestamp (in millis) when this sampler should automatically complete.*/      private final long endTime; // -1 for nothing -    public Sampler(int interval, ThreadDumper threadDumper, ThreadGrouper threadGrouper, long endTime, boolean includeLineNumbers) { +    public Sampler(int interval, ThreadDumper threadDumper, ThreadGrouper threadGrouper, long endTime, boolean includeLineNumbers, boolean ignoreSleeping) {          this.threadDumper = threadDumper; -        this.dataAggregator = new SimpleDataAggregator(this.workerPool, threadGrouper, interval, includeLineNumbers); +        this.dataAggregator = new SimpleDataAggregator(this.workerPool, threadGrouper, interval, includeLineNumbers, ignoreSleeping);          this.interval = interval;          this.endTime = endTime;      } -    public Sampler(int interval, ThreadDumper threadDumper, ThreadGrouper threadGrouper, long endTime, boolean includeLineNumbers, TickCounter tickCounter, int tickLengthThreshold) { +    public Sampler(int interval, ThreadDumper threadDumper, ThreadGrouper threadGrouper, long endTime, boolean includeLineNumbers, boolean ignoreSleeping, TickCounter tickCounter, int tickLengthThreshold) {          this.threadDumper = threadDumper; -        this.dataAggregator = new TickedDataAggregator(this.workerPool, tickCounter, threadGrouper, interval, includeLineNumbers, tickLengthThreshold); +        this.dataAggregator = new TickedDataAggregator(this.workerPool, threadGrouper, interval, includeLineNumbers, ignoreSleeping, tickCounter, tickLengthThreshold);          this.interval = interval;          this.endTime = endTime;      } @@ -150,15 +150,10 @@ public class Sampler implements Runnable {          @Override          public void run() {              for (ThreadInfo threadInfo : this.threadDumps) { -                long threadId = threadInfo.getThreadId(); -                String threadName = threadInfo.getThreadName(); -                StackTraceElement[] stack = threadInfo.getStackTrace(); - -                if (threadName == null || stack == null) { +                if (threadInfo.getThreadName() == null || threadInfo.getStackTrace() == null) {                      continue;                  } - -                this.dataAggregator.insertData(threadId, threadName, stack); +                this.dataAggregator.insertData(threadInfo);              }          }      } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java index 4ce69df..4808214 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java @@ -29,6 +29,7 @@ public class SamplerBuilder {      private double samplingInterval = 4; // milliseconds      private boolean includeLineNumbers = false; +    private boolean ignoreSleeping = false;      private long timeout = -1;      private ThreadDumper threadDumper = ThreadDumper.ALL;      private ThreadGrouper threadGrouper = ThreadGrouper.BY_NAME; @@ -73,14 +74,19 @@ public class SamplerBuilder {          return this;      } +    public SamplerBuilder ignoreSleeping(boolean ignoreSleeping) { +        this.ignoreSleeping = ignoreSleeping; +        return this; +    } +      public Sampler start() {          Sampler sampler;          int intervalMicros = (int) (this.samplingInterval * 1000d); -        if (this.ticksOver != -1 && this.tickCounter != null) { -            sampler = new Sampler(intervalMicros, this.threadDumper, this.threadGrouper, this.timeout, this.includeLineNumbers, this.tickCounter, this.ticksOver); +        if (this.ticksOver == -1 || this.tickCounter == null) { +            sampler = new Sampler(intervalMicros, this.threadDumper, this.threadGrouper, this.timeout, this.includeLineNumbers, this.ignoreSleeping);          } else { -            sampler = new Sampler(intervalMicros, this.threadDumper, this.threadGrouper, this.timeout, this.includeLineNumbers); +            sampler = new Sampler(intervalMicros, this.threadDumper, this.threadGrouper, this.timeout, this.includeLineNumbers, this.ignoreSleeping, this.tickCounter, this.ticksOver);          }          sampler.start(); diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/AbstractDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/AbstractDataAggregator.java new file mode 100644 index 0000000..e72dfc5 --- /dev/null +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/AbstractDataAggregator.java @@ -0,0 +1,80 @@ +/* + * This file is part of spark. + * + *  Copyright (c) lucko (Luck) <luck@lucko.me> + *  Copyright (c) contributors + * + *  This program is free software: you can redistribute it and/or modify + *  it under the terms of the GNU General Public License as published by + *  the Free Software Foundation, either version 3 of the License, or + *  (at your option) any later version. + * + *  This program is distributed in the hope that it will be useful, + *  but WITHOUT ANY WARRANTY; without even the implied warranty of + *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + *  GNU General Public License for more details. + * + *  You should have received a copy of the GNU General Public License + *  along with this program.  If not, see <http://www.gnu.org/licenses/>. + */ + +package me.lucko.spark.common.sampler.aggregator; + +import me.lucko.spark.common.sampler.ThreadGrouper; +import me.lucko.spark.common.sampler.node.ThreadNode; + +import java.lang.management.ThreadInfo; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; + +public abstract class AbstractDataAggregator implements DataAggregator { + +    /** A map of root stack nodes for each thread with sampling data */ +    protected final Map<String, ThreadNode> threadData = new ConcurrentHashMap<>(); + +    /** The worker pool for inserting stack nodes */ +    protected final ExecutorService workerPool; + +    /** The instance used to group threads together */ +    protected final ThreadGrouper threadGrouper; + +    /** The interval to wait between sampling, in microseconds */ +    protected final int interval; + +    /** If line numbers should be included in the output */ +    private final boolean includeLineNumbers; + +    /** If sleeping threads should be ignored */ +    private final boolean ignoreSleeping; + +    public AbstractDataAggregator(ExecutorService workerPool, ThreadGrouper threadGrouper, int interval, boolean includeLineNumbers, boolean ignoreSleeping) { +        this.workerPool = workerPool; +        this.threadGrouper = threadGrouper; +        this.interval = interval; +        this.includeLineNumbers = includeLineNumbers; +        this.ignoreSleeping = ignoreSleeping; +    } + +    protected ThreadNode getNode(String group) { +        ThreadNode node = this.threadData.get(group); // fast path +        if (node != null) { +            return node; +        } +        return this.threadData.computeIfAbsent(group, ThreadNode::new); +    } + +    protected void writeData(ThreadInfo threadInfo) { +        if (this.ignoreSleeping && (threadInfo.getThreadState() == Thread.State.WAITING || threadInfo.getThreadState() == Thread.State.TIMED_WAITING)) { +            return; +        } + +        try { +            ThreadNode node = getNode(this.threadGrouper.getGroup(threadInfo.getThreadId(), threadInfo.getThreadName())); +            node.log(threadInfo.getStackTrace(), this.interval, this.includeLineNumbers); +        } catch (Exception e) { +            e.printStackTrace(); +        } +    } + +} diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/DataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/DataAggregator.java index 2024f97..8318fbd 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/DataAggregator.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/DataAggregator.java @@ -23,6 +23,7 @@ package me.lucko.spark.common.sampler.aggregator;  import me.lucko.spark.common.sampler.node.ThreadNode;  import me.lucko.spark.proto.SparkProtos.SamplerMetadata; +import java.lang.management.ThreadInfo;  import java.util.Map;  /** @@ -39,11 +40,10 @@ public interface DataAggregator {      /**       * Inserts sampling data into this aggregator -     * @param threadId the id of the thread -     * @param threadName the name of the thread -     * @param stack the call stack +     * +     * @param threadInfo the thread info       */ -    void insertData(long threadId, String threadName, StackTraceElement[] stack); +    void insertData(ThreadInfo threadInfo);      /**       * Gets metadata about the data aggregator instance. diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/SimpleDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/SimpleDataAggregator.java index 7df3b9e..ef968fe 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/SimpleDataAggregator.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/SimpleDataAggregator.java @@ -24,54 +24,30 @@ import me.lucko.spark.common.sampler.ThreadGrouper;  import me.lucko.spark.common.sampler.node.ThreadNode;  import me.lucko.spark.proto.SparkProtos.SamplerMetadata; +import java.lang.management.ThreadInfo;  import java.util.Map; -import java.util.concurrent.ConcurrentHashMap;  import java.util.concurrent.ExecutorService;  import java.util.concurrent.TimeUnit;  /**   * Basic implementation of {@link DataAggregator}.   */ -public class SimpleDataAggregator implements DataAggregator { - -    /** A map of root stack nodes for each thread with sampling data */ -    private final Map<String, ThreadNode> threadData = new ConcurrentHashMap<>(); - -    /** The worker pool used for sampling */ -    private final ExecutorService workerPool; - -    /** The instance used to group threads together */ -    private final ThreadGrouper threadGrouper; - -    /** The interval to wait between sampling, in microseconds */ -    private final int interval; - -    /** If line numbers should be included in the output */ -    private final boolean includeLineNumbers; - -    public SimpleDataAggregator(ExecutorService workerPool, ThreadGrouper threadGrouper, int interval, boolean includeLineNumbers) { -        this.workerPool = workerPool; -        this.threadGrouper = threadGrouper; -        this.interval = interval; -        this.includeLineNumbers = includeLineNumbers; +public class SimpleDataAggregator extends AbstractDataAggregator { +    public SimpleDataAggregator(ExecutorService workerPool, ThreadGrouper threadGrouper, int interval, boolean includeLineNumbers, boolean ignoreSleeping) { +        super(workerPool, threadGrouper, interval, includeLineNumbers, ignoreSleeping);      } -    private ThreadNode getNode(String group) { -        ThreadNode node = this.threadData.get(group); // fast path -        if (node != null) { -            return node; -        } -        return this.threadData.computeIfAbsent(group, ThreadNode::new); +    @Override +    public SamplerMetadata.DataAggregator getMetadata() { +        return SamplerMetadata.DataAggregator.newBuilder() +                .setType(SamplerMetadata.DataAggregator.Type.SIMPLE) +                .setThreadGrouper(ThreadGrouper.asProto(this.threadGrouper)) +                .build();      }      @Override -    public void insertData(long threadId, String threadName, StackTraceElement[] stack) { -        try { -            ThreadNode node = getNode(this.threadGrouper.getGroup(threadId, threadName)); -            node.log(stack, this.interval, this.includeLineNumbers); -        } catch (Exception e) { -            e.printStackTrace(); -        } +    public void insertData(ThreadInfo threadInfo) { +        writeData(threadInfo);      }      @Override @@ -86,12 +62,4 @@ public class SimpleDataAggregator implements DataAggregator {          return this.threadData;      } - -    @Override -    public SamplerMetadata.DataAggregator getMetadata() { -        return SamplerMetadata.DataAggregator.newBuilder() -                .setType(SamplerMetadata.DataAggregator.Type.SIMPLE) -                .setThreadGrouper(ThreadGrouper.asProto(this.threadGrouper)) -                .build(); -    }  } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/TickedDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/TickedDataAggregator.java index 2437de8..a4dfdb8 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/TickedDataAggregator.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/TickedDataAggregator.java @@ -22,14 +22,13 @@ package me.lucko.spark.common.sampler.aggregator;  import me.lucko.spark.common.sampler.ThreadGrouper;  import me.lucko.spark.common.sampler.TickCounter; -import me.lucko.spark.common.sampler.node.AbstractNode;  import me.lucko.spark.common.sampler.node.ThreadNode;  import me.lucko.spark.proto.SparkProtos.SamplerMetadata; +import java.lang.management.ThreadInfo;  import java.util.ArrayList;  import java.util.List;  import java.util.Map; -import java.util.concurrent.ConcurrentHashMap;  import java.util.concurrent.ExecutorService;  import java.util.concurrent.TimeUnit; @@ -37,26 +36,11 @@ import java.util.concurrent.TimeUnit;   * Implementation of {@link DataAggregator} which supports only including sampling data from "ticks"   * which exceed a certain threshold in duration.   */ -public class TickedDataAggregator implements DataAggregator { - -    /** A map of root stack nodes for each thread with sampling data */ -    private final Map<String, ThreadNode> threadData = new ConcurrentHashMap<>(); - -    /** The worker pool for inserting stack nodes */ -    private final ExecutorService workerPool; +public class TickedDataAggregator extends AbstractDataAggregator {      /** Used to monitor the current "tick" of the server */      private final TickCounter tickCounter; -    /** The instance used to group threads together */ -    private final ThreadGrouper threadGrouper; - -    /** The interval to wait between sampling, in microseconds */ -    private final int interval; - -    /** If line numbers should be included in the output */ -    private final boolean includeLineNumbers; -      /** Tick durations under this threshold will not be inserted, measured in microseconds */      private final long tickLengthThreshold; @@ -69,12 +53,9 @@ public class TickedDataAggregator implements DataAggregator {      private int currentTick = -1;      private TickList currentData = new TickList(0); -    public TickedDataAggregator(ExecutorService workerPool, TickCounter tickCounter, ThreadGrouper threadGrouper, int interval, boolean includeLineNumbers, int tickLengthThreshold) { -        this.workerPool = workerPool; +    public TickedDataAggregator(ExecutorService workerPool, ThreadGrouper threadGrouper, int interval, boolean includeLineNumbers, boolean ignoreSleeping, TickCounter tickCounter, int tickLengthThreshold) { +        super(workerPool, threadGrouper, interval, includeLineNumbers, ignoreSleeping);          this.tickCounter = tickCounter; -        this.threadGrouper = threadGrouper; -        this.interval = interval; -        this.includeLineNumbers = includeLineNumbers;          this.tickLengthThreshold = TimeUnit.MILLISECONDS.toMicros(tickLengthThreshold);          // 50 millis in a tick, plus 10 so we have a bit of room to go over          double intervalMilliseconds = interval / 1000d; @@ -82,7 +63,16 @@ public class TickedDataAggregator implements DataAggregator {      }      @Override -    public void insertData(long threadId, String threadName, StackTraceElement[] stack) { +    public SamplerMetadata.DataAggregator getMetadata() { +        return SamplerMetadata.DataAggregator.newBuilder() +                .setType(SamplerMetadata.DataAggregator.Type.TICKED) +                .setThreadGrouper(ThreadGrouper.asProto(this.threadGrouper)) +                .setTickLengthThreshold(this.tickLengthThreshold) +                .build(); +    } + +    @Override +    public void insertData(ThreadInfo threadInfo) {          synchronized (this.mutex) {              int tick = this.tickCounter.getCurrentTick();              if (this.currentTick != tick) { @@ -91,10 +81,7 @@ public class TickedDataAggregator implements DataAggregator {                  this.currentData = new TickList(this.expectedSize);              } -            // form the queued data -            QueuedThreadInfo queuedData = new QueuedThreadInfo(threadId, threadName, stack); -            // insert it -            this.currentData.addData(queuedData); +            this.currentData.addData(threadInfo);          }      } @@ -131,37 +118,8 @@ public class TickedDataAggregator implements DataAggregator {          return this.threadData;      } -    private ThreadNode getNode(String group) { -        ThreadNode node = this.threadData.get(group); // fast path -        if (node != null) { -            return node; -        } -        return this.threadData.computeIfAbsent(group, ThreadNode::new); -    } - -    // called by TickList -    void insertData(List<QueuedThreadInfo> dataList) { -        for (QueuedThreadInfo data : dataList) { -            try { -                AbstractNode node = getNode(this.threadGrouper.getGroup(data.threadId, data.threadName)); -                node.log(data.stack, this.interval, this.includeLineNumbers); -            } catch (Exception e) { -                e.printStackTrace(); -            } -        } -    } - -    @Override -    public SamplerMetadata.DataAggregator getMetadata() { -        return SamplerMetadata.DataAggregator.newBuilder() -                .setType(SamplerMetadata.DataAggregator.Type.TICKED) -                .setThreadGrouper(ThreadGrouper.asProto(this.threadGrouper)) -                .setTickLengthThreshold(this.tickLengthThreshold) -                .build(); -    } -      private final class TickList implements Runnable { -        private final List<QueuedThreadInfo> list; +        private final List<ThreadInfo> list;          TickList(int expectedSize) {              this.list = new ArrayList<>(expectedSize); @@ -169,27 +127,17 @@ public class TickedDataAggregator implements DataAggregator {          @Override          public void run() { -            insertData(this.list); +            for (ThreadInfo data : this.list) { +                writeData(data); +            }          } -        public List<QueuedThreadInfo> getList() { +        public List<ThreadInfo> getList() {              return this.list;          } -        public void addData(QueuedThreadInfo data) { +        public void addData(ThreadInfo data) {              this.list.add(data);          }      } - -    private static final class QueuedThreadInfo { -        private final long threadId; -        private final String threadName; -        private final StackTraceElement[] stack; - -        QueuedThreadInfo(long threadId, String threadName, StackTraceElement[] stack) { -            this.threadId = threadId; -            this.threadName = threadName; -            this.stack = stack; -        } -    }  } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/protocol/ProtocolConstants.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/protocol/ProtocolConstants.java deleted file mode 100644 index 1995a58..0000000 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/protocol/ProtocolConstants.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * This file is part of spark. - * - *  Copyright (c) lucko (Luck) <luck@lucko.me> - *  Copyright (c) contributors - * - *  This program is free software: you can redistribute it and/or modify - *  it under the terms of the GNU General Public License as published by - *  the Free Software Foundation, either version 3 of the License, or - *  (at your option) any later version. - * - *  This program is distributed in the hope that it will be useful, - *  but WITHOUT ANY WARRANTY; without even the implied warranty of - *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the - *  GNU General Public License for more details. - * - *  You should have received a copy of the GNU General Public License - *  along with this program.  If not, see <http://www.gnu.org/licenses/>. - */ - -package me.lucko.spark.common.sampler.protocol; - -public interface ProtocolConstants { - -    int VERSION = 1; -    int SAMPLER_TYPE = 0; - -    byte THREAD_NODE_TYPE = 0; -    byte STACK_TRACE_NODE_TYPE = 1; - -} | 
