diff options
author | Luck <git@lucko.me> | 2019-02-21 13:16:21 +0000 |
---|---|---|
committer | Luck <git@lucko.me> | 2019-02-21 13:16:21 +0000 |
commit | 8a61b404848ed8e3c27f06eb73239d37d4273240 (patch) | |
tree | cc567b7ab2928ca268e872695c89ca1cae2067a4 /spark-common/src/main/java | |
parent | 161d3bdfbfe331a1ea3c6da1d096e86fd7a5c525 (diff) | |
download | spark-8a61b404848ed8e3c27f06eb73239d37d4273240.tar.gz spark-8a61b404848ed8e3c27f06eb73239d37d4273240.tar.bz2 spark-8a61b404848ed8e3c27f06eb73239d37d4273240.zip |
Insert data async to prevent blocking the main sampler task
Diffstat (limited to 'spark-common/src/main/java')
-rw-r--r-- | spark-common/src/main/java/me/lucko/spark/sampler/Sampler.java | 27 |
1 files changed, 22 insertions, 5 deletions
diff --git a/spark-common/src/main/java/me/lucko/spark/sampler/Sampler.java b/spark-common/src/main/java/me/lucko/spark/sampler/Sampler.java index 2e8ac65..6777770 100644 --- a/spark-common/src/main/java/me/lucko/spark/sampler/Sampler.java +++ b/spark-common/src/main/java/me/lucko/spark/sampler/Sampler.java @@ -56,7 +56,7 @@ public class Sampler implements Runnable { /** The worker pool for inserting stack nodes */ private final ScheduledExecutorService workerPool = Executors.newScheduledThreadPool( - 9, new ThreadFactoryBuilder().setNameFormat("spark-worker-" + THREAD_ID.getAndIncrement() + "-%d").build() + 6, new ThreadFactoryBuilder().setNameFormat("spark-worker-" + THREAD_ID.getAndIncrement() + "-%d").build() ); /** The main sampling task */ @@ -123,6 +123,8 @@ public class Sampler implements Runnable { @Override public void run() { + // this is effectively synchronized, the worker pool will not allow this task + // to concurrently execute. try { if (this.endTime != -1 && this.endTime <= System.currentTimeMillis()) { this.future.complete(this); @@ -131,7 +133,25 @@ public class Sampler implements Runnable { } ThreadInfo[] threadDumps = this.threadDumper.dumpThreads(this.threadBean); - for (ThreadInfo threadInfo : threadDumps) { + this.workerPool.execute(new InsertDataTask(this.dataAggregator, threadDumps)); + } catch (Throwable t) { + this.future.completeExceptionally(t); + cancel(); + } + } + + private static final class InsertDataTask implements Runnable { + private final DataAggregator dataAggregator; + private final ThreadInfo[] threadDumps; + + InsertDataTask(DataAggregator dataAggregator, ThreadInfo[] threadDumps) { + this.dataAggregator = dataAggregator; + this.threadDumps = threadDumps; + } + + @Override + public void run() { + for (ThreadInfo threadInfo : this.threadDumps) { String threadName = threadInfo.getThreadName(); StackTraceElement[] stack = threadInfo.getStackTrace(); @@ -141,9 +161,6 @@ public class Sampler implements Runnable { this.dataAggregator.insertData(threadName, stack); } - } catch (Throwable t) { - this.future.completeExceptionally(t); - cancel(); } } |