From 8a61b404848ed8e3c27f06eb73239d37d4273240 Mon Sep 17 00:00:00 2001 From: Luck Date: Thu, 21 Feb 2019 13:16:21 +0000 Subject: Insert data async to prevent blocking the main sampler task --- .../main/java/me/lucko/spark/sampler/Sampler.java | 27 ++++++++++++++++++---- 1 file changed, 22 insertions(+), 5 deletions(-) (limited to 'spark-common/src/main/java') diff --git a/spark-common/src/main/java/me/lucko/spark/sampler/Sampler.java b/spark-common/src/main/java/me/lucko/spark/sampler/Sampler.java index 2e8ac65..6777770 100644 --- a/spark-common/src/main/java/me/lucko/spark/sampler/Sampler.java +++ b/spark-common/src/main/java/me/lucko/spark/sampler/Sampler.java @@ -56,7 +56,7 @@ public class Sampler implements Runnable { /** The worker pool for inserting stack nodes */ private final ScheduledExecutorService workerPool = Executors.newScheduledThreadPool( - 9, new ThreadFactoryBuilder().setNameFormat("spark-worker-" + THREAD_ID.getAndIncrement() + "-%d").build() + 6, new ThreadFactoryBuilder().setNameFormat("spark-worker-" + THREAD_ID.getAndIncrement() + "-%d").build() ); /** The main sampling task */ @@ -123,6 +123,8 @@ public class Sampler implements Runnable { @Override public void run() { + // this is effectively synchronized, the worker pool will not allow this task + // to concurrently execute. try { if (this.endTime != -1 && this.endTime <= System.currentTimeMillis()) { this.future.complete(this); @@ -131,7 +133,25 @@ public class Sampler implements Runnable { } ThreadInfo[] threadDumps = this.threadDumper.dumpThreads(this.threadBean); - for (ThreadInfo threadInfo : threadDumps) { + this.workerPool.execute(new InsertDataTask(this.dataAggregator, threadDumps)); + } catch (Throwable t) { + this.future.completeExceptionally(t); + cancel(); + } + } + + private static final class InsertDataTask implements Runnable { + private final DataAggregator dataAggregator; + private final ThreadInfo[] threadDumps; + + InsertDataTask(DataAggregator dataAggregator, ThreadInfo[] threadDumps) { + this.dataAggregator = dataAggregator; + this.threadDumps = threadDumps; + } + + @Override + public void run() { + for (ThreadInfo threadInfo : this.threadDumps) { String threadName = threadInfo.getThreadName(); StackTraceElement[] stack = threadInfo.getStackTrace(); @@ -141,9 +161,6 @@ public class Sampler implements Runnable { this.dataAggregator.insertData(threadName, stack); } - } catch (Throwable t) { - this.future.completeExceptionally(t); - cancel(); } } -- cgit