diff options
-rw-r--r-- | spark-common/src/main/java/me/lucko/spark/sampler/Sampler.java | 27 |
1 files changed, 22 insertions, 5 deletions
diff --git a/spark-common/src/main/java/me/lucko/spark/sampler/Sampler.java b/spark-common/src/main/java/me/lucko/spark/sampler/Sampler.java index 2e8ac65..6777770 100644 --- a/spark-common/src/main/java/me/lucko/spark/sampler/Sampler.java +++ b/spark-common/src/main/java/me/lucko/spark/sampler/Sampler.java @@ -56,7 +56,7 @@ public class Sampler implements Runnable { /** The worker pool for inserting stack nodes */ private final ScheduledExecutorService workerPool = Executors.newScheduledThreadPool( - 9, new ThreadFactoryBuilder().setNameFormat("spark-worker-" + THREAD_ID.getAndIncrement() + "-%d").build() + 6, new ThreadFactoryBuilder().setNameFormat("spark-worker-" + THREAD_ID.getAndIncrement() + "-%d").build() ); /** The main sampling task */ @@ -123,6 +123,8 @@ public class Sampler implements Runnable { @Override public void run() { + // this is effectively synchronized, the worker pool will not allow this task + // to concurrently execute. try { if (this.endTime != -1 && this.endTime <= System.currentTimeMillis()) { this.future.complete(this); @@ -131,7 +133,25 @@ public class Sampler implements Runnable { } ThreadInfo[] threadDumps = this.threadDumper.dumpThreads(this.threadBean); - for (ThreadInfo threadInfo : threadDumps) { + this.workerPool.execute(new InsertDataTask(this.dataAggregator, threadDumps)); + } catch (Throwable t) { + this.future.completeExceptionally(t); + cancel(); + } + } + + private static final class InsertDataTask implements Runnable { + private final DataAggregator dataAggregator; + private final ThreadInfo[] threadDumps; + + InsertDataTask(DataAggregator dataAggregator, ThreadInfo[] threadDumps) { + this.dataAggregator = dataAggregator; + this.threadDumps = threadDumps; + } + + @Override + public void run() { + for (ThreadInfo threadInfo : this.threadDumps) { String threadName = threadInfo.getThreadName(); StackTraceElement[] stack = threadInfo.getStackTrace(); @@ -141,9 +161,6 @@ public class Sampler implements Runnable { this.dataAggregator.insertData(threadName, stack); } - } catch (Throwable t) { - this.future.completeExceptionally(t); - cancel(); } } |