aboutsummaryrefslogtreecommitdiff
path: root/spark-common/src/main/java
diff options
context:
space:
mode:
authorLuck <git@lucko.me>2019-02-21 13:16:21 +0000
committerLuck <git@lucko.me>2019-02-21 13:16:21 +0000
commit8a61b404848ed8e3c27f06eb73239d37d4273240 (patch)
treecc567b7ab2928ca268e872695c89ca1cae2067a4 /spark-common/src/main/java
parent161d3bdfbfe331a1ea3c6da1d096e86fd7a5c525 (diff)
downloadspark-8a61b404848ed8e3c27f06eb73239d37d4273240.tar.gz
spark-8a61b404848ed8e3c27f06eb73239d37d4273240.tar.bz2
spark-8a61b404848ed8e3c27f06eb73239d37d4273240.zip
Insert data async to prevent blocking the main sampler task
Diffstat (limited to 'spark-common/src/main/java')
-rw-r--r--spark-common/src/main/java/me/lucko/spark/sampler/Sampler.java27
1 files changed, 22 insertions, 5 deletions
diff --git a/spark-common/src/main/java/me/lucko/spark/sampler/Sampler.java b/spark-common/src/main/java/me/lucko/spark/sampler/Sampler.java
index 2e8ac65..6777770 100644
--- a/spark-common/src/main/java/me/lucko/spark/sampler/Sampler.java
+++ b/spark-common/src/main/java/me/lucko/spark/sampler/Sampler.java
@@ -56,7 +56,7 @@ public class Sampler implements Runnable {
/** The worker pool for inserting stack nodes */
private final ScheduledExecutorService workerPool = Executors.newScheduledThreadPool(
- 9, new ThreadFactoryBuilder().setNameFormat("spark-worker-" + THREAD_ID.getAndIncrement() + "-%d").build()
+ 6, new ThreadFactoryBuilder().setNameFormat("spark-worker-" + THREAD_ID.getAndIncrement() + "-%d").build()
);
/** The main sampling task */
@@ -123,6 +123,8 @@ public class Sampler implements Runnable {
@Override
public void run() {
+ // this is effectively synchronized, the worker pool will not allow this task
+ // to concurrently execute.
try {
if (this.endTime != -1 && this.endTime <= System.currentTimeMillis()) {
this.future.complete(this);
@@ -131,7 +133,25 @@ public class Sampler implements Runnable {
}
ThreadInfo[] threadDumps = this.threadDumper.dumpThreads(this.threadBean);
- for (ThreadInfo threadInfo : threadDumps) {
+ this.workerPool.execute(new InsertDataTask(this.dataAggregator, threadDumps));
+ } catch (Throwable t) {
+ this.future.completeExceptionally(t);
+ cancel();
+ }
+ }
+
+ private static final class InsertDataTask implements Runnable {
+ private final DataAggregator dataAggregator;
+ private final ThreadInfo[] threadDumps;
+
+ InsertDataTask(DataAggregator dataAggregator, ThreadInfo[] threadDumps) {
+ this.dataAggregator = dataAggregator;
+ this.threadDumps = threadDumps;
+ }
+
+ @Override
+ public void run() {
+ for (ThreadInfo threadInfo : this.threadDumps) {
String threadName = threadInfo.getThreadName();
StackTraceElement[] stack = threadInfo.getStackTrace();
@@ -141,9 +161,6 @@ public class Sampler implements Runnable {
this.dataAggregator.insertData(threadName, stack);
}
- } catch (Throwable t) {
- this.future.completeExceptionally(t);
- cancel();
}
}