diff options
author | Luck <git@lucko.me> | 2019-04-27 13:26:45 +0100 |
---|---|---|
committer | Luck <git@lucko.me> | 2019-04-27 13:26:45 +0100 |
commit | 65bd9ac4ecabd28df5802cf0b8e49b0e98fa6ecf (patch) | |
tree | b6061d1e3710c2207c903a6e5e80cc7a18d103df /spark-common/src/main/java | |
parent | d147dbb147c081520ae44aa9e7bd85f759e30a1d (diff) | |
download | spark-65bd9ac4ecabd28df5802cf0b8e49b0e98fa6ecf.tar.gz spark-65bd9ac4ecabd28df5802cf0b8e49b0e98fa6ecf.tar.bz2 spark-65bd9ac4ecabd28df5802cf0b8e49b0e98fa6ecf.zip |
Sampler performance improvements
Diffstat (limited to 'spark-common/src/main/java')
6 files changed, 50 insertions, 37 deletions
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java index d504247..c57e4e2 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java @@ -97,7 +97,6 @@ public class Sampler implements Runnable { */ public void start() { this.startTime = System.currentTimeMillis(); - this.dataAggregator.start(); this.task = this.workerPool.scheduleAtFixedRate(this, 0, this.interval, TimeUnit.MICROSECONDS); } @@ -151,6 +150,7 @@ public class Sampler implements Runnable { @Override public void run() { for (ThreadInfo threadInfo : this.threadDumps) { + long threadId = threadInfo.getThreadId(); String threadName = threadInfo.getThreadName(); StackTraceElement[] stack = threadInfo.getStackTrace(); @@ -158,7 +158,7 @@ public class Sampler implements Runnable { continue; } - this.dataAggregator.insertData(threadName, stack); + this.dataAggregator.insertData(threadId, threadName, stack); } } } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadGrouper.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadGrouper.java index f53800a..44cf54e 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadGrouper.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadGrouper.java @@ -20,6 +20,8 @@ package me.lucko.spark.common.sampler; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -32,15 +34,16 @@ public interface ThreadGrouper { /** * Gets the group for the given thread. * + * @param threadId the id of the thread * @param threadName the name of the thread * @return the group */ - String getGroup(String threadName); + String getGroup(long threadId, String threadName); /** * Implementation of {@link ThreadGrouper} that just groups by thread name. */ - ThreadGrouper BY_NAME = threadName -> threadName; + ThreadGrouper BY_NAME = (threadId, threadName) -> threadName; /** * Implementation of {@link ThreadGrouper} that attempts to group by the name of the pool @@ -50,16 +53,24 @@ public interface ThreadGrouper { * separated from the pool name with any of one or more of ' ', '-', or '#'.</p> */ ThreadGrouper BY_POOL = new ThreadGrouper() { + private final Map<Long, String> cache = new ConcurrentHashMap<>(); private final Pattern pattern = Pattern.compile("^(.*?)[-# ]+\\d+$"); @Override - public String getGroup(String threadName) { + public String getGroup(long threadId, String threadName) { + String group = this.cache.get(threadId); + if (group != null) { + return group; + } + Matcher matcher = this.pattern.matcher(threadName); if (!matcher.matches()) { return threadName; } - return matcher.group(1).trim() + " (Combined)"; + group = matcher.group(1).trim() + " (Combined)"; + this.cache.put(threadId, group); // we don't care about race conditions here + return group; } }; @@ -67,6 +78,6 @@ public interface ThreadGrouper { * Implementation of {@link ThreadGrouper} which groups all threads as one, under * the name "All". */ - ThreadGrouper AS_ONE = threadName -> "All"; + ThreadGrouper AS_ONE = (threadId, threadName) -> "All"; } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/DataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/DataAggregator.java index 8c65c2d..b8ca0e7 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/DataAggregator.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/DataAggregator.java @@ -30,13 +30,6 @@ import java.util.Map; public interface DataAggregator { /** - * Called before the sampler begins to insert data - */ - default void start() { - - } - - /** * Forms the output data * * @return the output data @@ -45,10 +38,10 @@ public interface DataAggregator { /** * Inserts sampling data into this aggregator - * + * @param threadId the id of the thread * @param threadName the name of the thread * @param stack the call stack */ - void insertData(String threadName, StackTraceElement[] stack); + void insertData(long threadId, String threadName, StackTraceElement[] stack); } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/SimpleDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/SimpleDataAggregator.java index 8fbd03f..8a38612 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/SimpleDataAggregator.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/SimpleDataAggregator.java @@ -21,7 +21,6 @@ package me.lucko.spark.common.sampler.aggregator; import me.lucko.spark.common.sampler.ThreadGrouper; -import me.lucko.spark.common.sampler.node.AbstractNode; import me.lucko.spark.common.sampler.node.ThreadNode; import java.util.Map; @@ -56,11 +55,18 @@ public class SimpleDataAggregator implements DataAggregator { this.includeLineNumbers = includeLineNumbers; } + private ThreadNode getNode(String group) { + ThreadNode node = this.threadData.get(group); // fast path + if (node != null) { + return node; + } + return this.threadData.computeIfAbsent(group, ThreadNode::new); + } + @Override - public void insertData(String threadName, StackTraceElement[] stack) { + public void insertData(long threadId, String threadName, StackTraceElement[] stack) { try { - String group = this.threadGrouper.getGroup(threadName); - AbstractNode node = this.threadData.computeIfAbsent(group, ThreadNode::new); + ThreadNode node = getNode(this.threadGrouper.getGroup(threadId, threadName)); node.log(stack, this.interval, this.includeLineNumbers); } catch (Exception e) { e.printStackTrace(); diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/TickedDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/TickedDataAggregator.java index 8f8124b..d51c0a2 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/TickedDataAggregator.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/TickedDataAggregator.java @@ -81,7 +81,7 @@ public class TickedDataAggregator implements DataAggregator { } @Override - public void insertData(String threadName, StackTraceElement[] stack) { + public void insertData(long threadId, String threadName, StackTraceElement[] stack) { synchronized (this.mutex) { int tick = this.tickCounter.getCurrentTick(); if (this.currentTick != tick) { @@ -91,7 +91,7 @@ public class TickedDataAggregator implements DataAggregator { } // form the queued data - QueuedThreadInfo queuedData = new QueuedThreadInfo(threadName, stack); + QueuedThreadInfo queuedData = new QueuedThreadInfo(threadId, threadName, stack); // insert it this.currentData.addData(queuedData); } @@ -113,20 +113,12 @@ public class TickedDataAggregator implements DataAggregator { } @Override - public void start() { - this.tickCounter.start(); - } - - @Override public Map<String, ThreadNode> getData() { // push the current tick synchronized (this.mutex) { pushCurrentTick(); } - // close the tick counter - this.tickCounter.close(); - // wait for all pending data to be inserted this.workerPool.shutdown(); try { @@ -138,12 +130,19 @@ public class TickedDataAggregator implements DataAggregator { return this.threadData; } + private ThreadNode getNode(String group) { + ThreadNode node = this.threadData.get(group); // fast path + if (node != null) { + return node; + } + return this.threadData.computeIfAbsent(group, ThreadNode::new); + } + // called by TickList void insertData(List<QueuedThreadInfo> dataList) { for (QueuedThreadInfo data : dataList) { try { - String group = this.threadGrouper.getGroup(data.threadName); - AbstractNode node = this.threadData.computeIfAbsent(group, ThreadNode::new); + AbstractNode node = getNode(this.threadGrouper.getGroup(data.threadId, data.threadName)); node.log(data.stack, this.interval, this.includeLineNumbers); } catch (Exception e) { e.printStackTrace(); @@ -173,10 +172,12 @@ public class TickedDataAggregator implements DataAggregator { } private static final class QueuedThreadInfo { + private final long threadId; private final String threadName; private final StackTraceElement[] stack; - QueuedThreadInfo(String threadName, StackTraceElement[] stack) { + QueuedThreadInfo(long threadId, String threadName, StackTraceElement[] stack) { + this.threadId = threadId; this.threadName = threadName; this.stack = stack; } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/node/AbstractNode.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/node/AbstractNode.java index 5cfc0f2..8c5975a 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/node/AbstractNode.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/node/AbstractNode.java @@ -64,10 +64,12 @@ public abstract class AbstractNode { } private AbstractNode resolveChild(String className, String methodName, int lineNumber) { - return this.children.computeIfAbsent( - StackTraceNode.generateKey(className, methodName, lineNumber), - name -> new StackTraceNode(className, methodName, lineNumber) - ); + String key = StackTraceNode.generateKey(className, methodName, lineNumber); + StackTraceNode result = this.children.get(key); // fast path + if (result != null) { + return result; + } + return this.children.computeIfAbsent(key, name -> new StackTraceNode(className, methodName, lineNumber)); } public void log(StackTraceElement[] elements, long time, boolean includeLineNumbers) { |