aboutsummaryrefslogtreecommitdiff
path: root/spark-common/src/main/java
diff options
context:
space:
mode:
authorLuck <git@lucko.me>2019-04-27 13:26:45 +0100
committerLuck <git@lucko.me>2019-04-27 13:26:45 +0100
commit65bd9ac4ecabd28df5802cf0b8e49b0e98fa6ecf (patch)
treeb6061d1e3710c2207c903a6e5e80cc7a18d103df /spark-common/src/main/java
parentd147dbb147c081520ae44aa9e7bd85f759e30a1d (diff)
downloadspark-65bd9ac4ecabd28df5802cf0b8e49b0e98fa6ecf.tar.gz
spark-65bd9ac4ecabd28df5802cf0b8e49b0e98fa6ecf.tar.bz2
spark-65bd9ac4ecabd28df5802cf0b8e49b0e98fa6ecf.zip
Sampler performance improvements
Diffstat (limited to 'spark-common/src/main/java')
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java4
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadGrouper.java21
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/DataAggregator.java11
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/SimpleDataAggregator.java14
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/TickedDataAggregator.java27
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/node/AbstractNode.java10
6 files changed, 50 insertions, 37 deletions
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java
index d504247..c57e4e2 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java
@@ -97,7 +97,6 @@ public class Sampler implements Runnable {
*/
public void start() {
this.startTime = System.currentTimeMillis();
- this.dataAggregator.start();
this.task = this.workerPool.scheduleAtFixedRate(this, 0, this.interval, TimeUnit.MICROSECONDS);
}
@@ -151,6 +150,7 @@ public class Sampler implements Runnable {
@Override
public void run() {
for (ThreadInfo threadInfo : this.threadDumps) {
+ long threadId = threadInfo.getThreadId();
String threadName = threadInfo.getThreadName();
StackTraceElement[] stack = threadInfo.getStackTrace();
@@ -158,7 +158,7 @@ public class Sampler implements Runnable {
continue;
}
- this.dataAggregator.insertData(threadName, stack);
+ this.dataAggregator.insertData(threadId, threadName, stack);
}
}
}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadGrouper.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadGrouper.java
index f53800a..44cf54e 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadGrouper.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadGrouper.java
@@ -20,6 +20,8 @@
package me.lucko.spark.common.sampler;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -32,15 +34,16 @@ public interface ThreadGrouper {
/**
* Gets the group for the given thread.
*
+ * @param threadId the id of the thread
* @param threadName the name of the thread
* @return the group
*/
- String getGroup(String threadName);
+ String getGroup(long threadId, String threadName);
/**
* Implementation of {@link ThreadGrouper} that just groups by thread name.
*/
- ThreadGrouper BY_NAME = threadName -> threadName;
+ ThreadGrouper BY_NAME = (threadId, threadName) -> threadName;
/**
* Implementation of {@link ThreadGrouper} that attempts to group by the name of the pool
@@ -50,16 +53,24 @@ public interface ThreadGrouper {
* separated from the pool name with any of one or more of ' ', '-', or '#'.</p>
*/
ThreadGrouper BY_POOL = new ThreadGrouper() {
+ private final Map<Long, String> cache = new ConcurrentHashMap<>();
private final Pattern pattern = Pattern.compile("^(.*?)[-# ]+\\d+$");
@Override
- public String getGroup(String threadName) {
+ public String getGroup(long threadId, String threadName) {
+ String group = this.cache.get(threadId);
+ if (group != null) {
+ return group;
+ }
+
Matcher matcher = this.pattern.matcher(threadName);
if (!matcher.matches()) {
return threadName;
}
- return matcher.group(1).trim() + " (Combined)";
+ group = matcher.group(1).trim() + " (Combined)";
+ this.cache.put(threadId, group); // we don't care about race conditions here
+ return group;
}
};
@@ -67,6 +78,6 @@ public interface ThreadGrouper {
* Implementation of {@link ThreadGrouper} which groups all threads as one, under
* the name "All".
*/
- ThreadGrouper AS_ONE = threadName -> "All";
+ ThreadGrouper AS_ONE = (threadId, threadName) -> "All";
}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/DataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/DataAggregator.java
index 8c65c2d..b8ca0e7 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/DataAggregator.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/DataAggregator.java
@@ -30,13 +30,6 @@ import java.util.Map;
public interface DataAggregator {
/**
- * Called before the sampler begins to insert data
- */
- default void start() {
-
- }
-
- /**
* Forms the output data
*
* @return the output data
@@ -45,10 +38,10 @@ public interface DataAggregator {
/**
* Inserts sampling data into this aggregator
- *
+ * @param threadId the id of the thread
* @param threadName the name of the thread
* @param stack the call stack
*/
- void insertData(String threadName, StackTraceElement[] stack);
+ void insertData(long threadId, String threadName, StackTraceElement[] stack);
}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/SimpleDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/SimpleDataAggregator.java
index 8fbd03f..8a38612 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/SimpleDataAggregator.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/SimpleDataAggregator.java
@@ -21,7 +21,6 @@
package me.lucko.spark.common.sampler.aggregator;
import me.lucko.spark.common.sampler.ThreadGrouper;
-import me.lucko.spark.common.sampler.node.AbstractNode;
import me.lucko.spark.common.sampler.node.ThreadNode;
import java.util.Map;
@@ -56,11 +55,18 @@ public class SimpleDataAggregator implements DataAggregator {
this.includeLineNumbers = includeLineNumbers;
}
+ private ThreadNode getNode(String group) {
+ ThreadNode node = this.threadData.get(group); // fast path
+ if (node != null) {
+ return node;
+ }
+ return this.threadData.computeIfAbsent(group, ThreadNode::new);
+ }
+
@Override
- public void insertData(String threadName, StackTraceElement[] stack) {
+ public void insertData(long threadId, String threadName, StackTraceElement[] stack) {
try {
- String group = this.threadGrouper.getGroup(threadName);
- AbstractNode node = this.threadData.computeIfAbsent(group, ThreadNode::new);
+ ThreadNode node = getNode(this.threadGrouper.getGroup(threadId, threadName));
node.log(stack, this.interval, this.includeLineNumbers);
} catch (Exception e) {
e.printStackTrace();
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/TickedDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/TickedDataAggregator.java
index 8f8124b..d51c0a2 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/TickedDataAggregator.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/TickedDataAggregator.java
@@ -81,7 +81,7 @@ public class TickedDataAggregator implements DataAggregator {
}
@Override
- public void insertData(String threadName, StackTraceElement[] stack) {
+ public void insertData(long threadId, String threadName, StackTraceElement[] stack) {
synchronized (this.mutex) {
int tick = this.tickCounter.getCurrentTick();
if (this.currentTick != tick) {
@@ -91,7 +91,7 @@ public class TickedDataAggregator implements DataAggregator {
}
// form the queued data
- QueuedThreadInfo queuedData = new QueuedThreadInfo(threadName, stack);
+ QueuedThreadInfo queuedData = new QueuedThreadInfo(threadId, threadName, stack);
// insert it
this.currentData.addData(queuedData);
}
@@ -113,20 +113,12 @@ public class TickedDataAggregator implements DataAggregator {
}
@Override
- public void start() {
- this.tickCounter.start();
- }
-
- @Override
public Map<String, ThreadNode> getData() {
// push the current tick
synchronized (this.mutex) {
pushCurrentTick();
}
- // close the tick counter
- this.tickCounter.close();
-
// wait for all pending data to be inserted
this.workerPool.shutdown();
try {
@@ -138,12 +130,19 @@ public class TickedDataAggregator implements DataAggregator {
return this.threadData;
}
+ private ThreadNode getNode(String group) {
+ ThreadNode node = this.threadData.get(group); // fast path
+ if (node != null) {
+ return node;
+ }
+ return this.threadData.computeIfAbsent(group, ThreadNode::new);
+ }
+
// called by TickList
void insertData(List<QueuedThreadInfo> dataList) {
for (QueuedThreadInfo data : dataList) {
try {
- String group = this.threadGrouper.getGroup(data.threadName);
- AbstractNode node = this.threadData.computeIfAbsent(group, ThreadNode::new);
+ AbstractNode node = getNode(this.threadGrouper.getGroup(data.threadId, data.threadName));
node.log(data.stack, this.interval, this.includeLineNumbers);
} catch (Exception e) {
e.printStackTrace();
@@ -173,10 +172,12 @@ public class TickedDataAggregator implements DataAggregator {
}
private static final class QueuedThreadInfo {
+ private final long threadId;
private final String threadName;
private final StackTraceElement[] stack;
- QueuedThreadInfo(String threadName, StackTraceElement[] stack) {
+ QueuedThreadInfo(long threadId, String threadName, StackTraceElement[] stack) {
+ this.threadId = threadId;
this.threadName = threadName;
this.stack = stack;
}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/node/AbstractNode.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/node/AbstractNode.java
index 5cfc0f2..8c5975a 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/node/AbstractNode.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/node/AbstractNode.java
@@ -64,10 +64,12 @@ public abstract class AbstractNode {
}
private AbstractNode resolveChild(String className, String methodName, int lineNumber) {
- return this.children.computeIfAbsent(
- StackTraceNode.generateKey(className, methodName, lineNumber),
- name -> new StackTraceNode(className, methodName, lineNumber)
- );
+ String key = StackTraceNode.generateKey(className, methodName, lineNumber);
+ StackTraceNode result = this.children.get(key); // fast path
+ if (result != null) {
+ return result;
+ }
+ return this.children.computeIfAbsent(key, name -> new StackTraceNode(className, methodName, lineNumber));
}
public void log(StackTraceElement[] elements, long time, boolean includeLineNumbers) {