From 48924d99d3f91d7319eceea74619c5ebcf4d76fa Mon Sep 17 00:00:00 2001 From: Luck Date: Sat, 9 Jun 2018 22:47:50 +0100 Subject: Use ScheduledExecutorService instead of Timer --- .../java/me/lucko/spark/common/CommandHandler.java | 8 +-- .../lucko/spark/profiler/AsyncDataAggregator.java | 77 ---------------------- .../main/java/me/lucko/spark/profiler/Sampler.java | 27 ++++---- .../me/lucko/spark/profiler/SamplerBuilder.java | 5 +- .../lucko/spark/profiler/SimpleDataAggregator.java | 55 ++++++++++++++++ .../lucko/spark/profiler/TickedDataAggregator.java | 31 +++++---- 6 files changed, 93 insertions(+), 110 deletions(-) delete mode 100644 spark-common/src/main/java/me/lucko/spark/profiler/AsyncDataAggregator.java create mode 100644 spark-common/src/main/java/me/lucko/spark/profiler/SimpleDataAggregator.java (limited to 'spark-common') diff --git a/spark-common/src/main/java/me/lucko/spark/common/CommandHandler.java b/spark-common/src/main/java/me/lucko/spark/common/CommandHandler.java index 72774d5..e253d29 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/CommandHandler.java +++ b/spark-common/src/main/java/me/lucko/spark/common/CommandHandler.java @@ -17,7 +17,6 @@ import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.Set; -import java.util.Timer; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; @@ -36,11 +35,6 @@ public abstract class CommandHandler { /** The prefix used in all messages */ private static final String PREFIX = "&8[&fspark&8] &7"; - /** - * The {@link Timer} being used by the {@link #activeSampler}. - */ - private final Timer samplingThread = new Timer("spark-sampling-thread", true); - /** Guards {@link #activeSampler} */ private final Object[] activeSamplerMutex = new Object[0]; /** The WarmRoast instance currently running, if any */ @@ -183,7 +177,7 @@ public abstract class CommandHandler { if (ticksOver != -1) { builder.ticksOver(ticksOver, tickCounter); } - sampler = this.activeSampler = builder.start(this.samplingThread); + sampler = this.activeSampler = builder.start(); sendPrefixedMessage("&bProfiler now active!"); if (timeoutSeconds == -1) { diff --git a/spark-common/src/main/java/me/lucko/spark/profiler/AsyncDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/profiler/AsyncDataAggregator.java deleted file mode 100644 index 9a4090e..0000000 --- a/spark-common/src/main/java/me/lucko/spark/profiler/AsyncDataAggregator.java +++ /dev/null @@ -1,77 +0,0 @@ -package me.lucko.spark.profiler; - -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; - -/** - * Implementation of {@link DataAggregator} that makes use of a "worker" thread pool for inserting - * data. - */ -public class AsyncDataAggregator implements DataAggregator { - - /** A map of root stack nodes for each thread with sampling data */ - private final Map threadData = new ConcurrentHashMap<>(); - - /** The worker pool for inserting stack nodes */ - private final ExecutorService workerPool; - - /** The instance used to group threads together */ - private final ThreadGrouper threadGrouper; - - /** The interval to wait between sampling, in milliseconds */ - private final int interval; - - public AsyncDataAggregator(ExecutorService workerPool, ThreadGrouper threadGrouper, int interval) { - this.workerPool = workerPool; - this.threadGrouper = threadGrouper; - this.interval = interval; - } - - @Override - public void insertData(String threadName, StackTraceElement[] stack) { - // form the queued data - QueuedThreadInfo queuedData = new QueuedThreadInfo(threadName, stack); - // schedule insertion of the data - this.workerPool.execute(queuedData); - } - - @Override - public Map getData() { - // wait for all pending data to be inserted - this.workerPool.shutdown(); - try { - this.workerPool.awaitTermination(15, TimeUnit.SECONDS); - } catch (InterruptedException e) { - e.printStackTrace(); - } - - return this.threadData; - } - - void insertData(QueuedThreadInfo data) { - try { - String group = this.threadGrouper.getGroup(data.threadName); - StackNode node = this.threadData.computeIfAbsent(group, StackNode::new); - node.log(data.stack, this.interval); - } catch (Exception e) { - e.printStackTrace(); - } - } - - private final class QueuedThreadInfo implements Runnable { - private final String threadName; - private final StackTraceElement[] stack; - - QueuedThreadInfo(String threadName, StackTraceElement[] stack) { - this.threadName = threadName; - this.stack = stack; - } - - @Override - public void run() { - insertData(this); - } - } -} diff --git a/spark-common/src/main/java/me/lucko/spark/profiler/Sampler.java b/spark-common/src/main/java/me/lucko/spark/profiler/Sampler.java index 3476f03..14e82c8 100644 --- a/spark-common/src/main/java/me/lucko/spark/profiler/Sampler.java +++ b/spark-common/src/main/java/me/lucko/spark/profiler/Sampler.java @@ -32,25 +32,28 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.Timer; -import java.util.TimerTask; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.zip.GZIPOutputStream; /** * Main sampler class. */ -public class Sampler extends TimerTask { +public class Sampler implements Runnable { private static final AtomicInteger THREAD_ID = new AtomicInteger(0); /** The worker pool for inserting stack nodes */ - private final ExecutorService workerPool = Executors.newFixedThreadPool( - 6, new ThreadFactoryBuilder().setNameFormat("spark-worker-" + THREAD_ID.getAndIncrement()).build() + private final ScheduledExecutorService workerPool = Executors.newScheduledThreadPool( + 9, new ThreadFactoryBuilder().setNameFormat("spark-worker-" + THREAD_ID.getAndIncrement() + "-%d").build() ); + /** The main sampling task */ + private ScheduledFuture task; + /** The thread management interface for the current JVM */ private final ThreadMXBean threadBean = ManagementFactory.getThreadMXBean(); /** The instance used to generate thread information for use in sampling */ @@ -70,7 +73,7 @@ public class Sampler extends TimerTask { public Sampler(int interval, ThreadDumper threadDumper, ThreadGrouper threadGrouper, long endTime) { this.threadDumper = threadDumper; - this.dataAggregator = new AsyncDataAggregator(this.workerPool, threadGrouper, interval); + this.dataAggregator = new SimpleDataAggregator(this.workerPool, threadGrouper, interval); this.interval = interval; this.endTime = endTime; } @@ -84,13 +87,11 @@ public class Sampler extends TimerTask { /** * Starts the sampler. - * - * @param samplingThread the timer to schedule the sampling on */ - public void start(Timer samplingThread) { + public void start() { this.startTime = System.currentTimeMillis(); this.dataAggregator.start(); - samplingThread.scheduleAtFixedRate(this, 0, this.interval); + this.task = workerPool.scheduleAtFixedRate(this, 0, interval, TimeUnit.MILLISECONDS); } public long getStartTime() { @@ -108,6 +109,10 @@ public class Sampler extends TimerTask { return this.future; } + public void cancel() { + task.cancel(false); + } + @Override public void run() { try { diff --git a/spark-common/src/main/java/me/lucko/spark/profiler/SamplerBuilder.java b/spark-common/src/main/java/me/lucko/spark/profiler/SamplerBuilder.java index 7db0515..733f4e3 100644 --- a/spark-common/src/main/java/me/lucko/spark/profiler/SamplerBuilder.java +++ b/spark-common/src/main/java/me/lucko/spark/profiler/SamplerBuilder.java @@ -1,6 +1,5 @@ package me.lucko.spark.profiler; -import java.util.Timer; import java.util.concurrent.TimeUnit; /** @@ -48,7 +47,7 @@ public class SamplerBuilder { return this; } - public Sampler start(Timer samplingThread) { + public Sampler start() { Sampler sampler; if (this.ticksOver != -1 && this.tickCounter != null) { sampler = new Sampler(this.samplingInterval, this.threadDumper, this.threadGrouper, this.timeout, this.tickCounter, this.ticksOver); @@ -56,7 +55,7 @@ public class SamplerBuilder { sampler = new Sampler(this.samplingInterval, this.threadDumper, this.threadGrouper, this.timeout); } - sampler.start(samplingThread); + sampler.start(); return sampler; } diff --git a/spark-common/src/main/java/me/lucko/spark/profiler/SimpleDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/profiler/SimpleDataAggregator.java new file mode 100644 index 0000000..f4138af --- /dev/null +++ b/spark-common/src/main/java/me/lucko/spark/profiler/SimpleDataAggregator.java @@ -0,0 +1,55 @@ +package me.lucko.spark.profiler; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * Implementation of {@link DataAggregator} that makes use of a "worker" thread pool for inserting + * data. + */ +public class SimpleDataAggregator implements DataAggregator { + + /** A map of root stack nodes for each thread with sampling data */ + private final Map threadData = new ConcurrentHashMap<>(); + + /** The worker pool used for sampling */ + private final ExecutorService workerPool; + + /** The instance used to group threads together */ + private final ThreadGrouper threadGrouper; + + /** The interval to wait between sampling, in milliseconds */ + private final int interval; + + public SimpleDataAggregator(ExecutorService workerPool, ThreadGrouper threadGrouper, int interval) { + this.workerPool = workerPool; + this.threadGrouper = threadGrouper; + this.interval = interval; + } + + @Override + public void insertData(String threadName, StackTraceElement[] stack) { + try { + String group = this.threadGrouper.getGroup(threadName); + StackNode node = this.threadData.computeIfAbsent(group, StackNode::new); + node.log(stack, this.interval); + } catch (Exception e) { + e.printStackTrace(); + } + } + + @Override + public Map getData() { + // wait for all pending data to be inserted + this.workerPool.shutdown(); + try { + this.workerPool.awaitTermination(15, TimeUnit.SECONDS); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + return this.threadData; + } +} diff --git a/spark-common/src/main/java/me/lucko/spark/profiler/TickedDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/profiler/TickedDataAggregator.java index abca4b3..1d23d37 100644 --- a/spark-common/src/main/java/me/lucko/spark/profiler/TickedDataAggregator.java +++ b/spark-common/src/main/java/me/lucko/spark/profiler/TickedDataAggregator.java @@ -34,6 +34,8 @@ public class TickedDataAggregator implements DataAggregator { /** The expected number of samples in each tick */ private final int expectedSize; + private final Object mutex = new Object(); + // state private long currentTick = -1; private TickList currentData = new TickList(0); @@ -48,22 +50,24 @@ public class TickedDataAggregator implements DataAggregator { this.expectedSize = (50 / interval) + 10; } - // this is effectively synchronized by the Timer instance in Sampler @Override public void insertData(String threadName, StackTraceElement[] stack) { - long tick = this.tickCounter.getCurrentTick(); - if (this.currentTick != tick) { - pushCurrentTick(); - this.currentTick = tick; - this.currentData = new TickList(this.expectedSize); - } + synchronized (this.mutex) { + long tick = this.tickCounter.getCurrentTick(); + if (this.currentTick != tick) { + pushCurrentTick(); + this.currentTick = tick; + this.currentData = new TickList(this.expectedSize); + } - // form the queued data - QueuedThreadInfo queuedData = new QueuedThreadInfo(threadName, stack); - // insert it - this.currentData.addData(queuedData); + // form the queued data + QueuedThreadInfo queuedData = new QueuedThreadInfo(threadName, stack); + // insert it + this.currentData.addData(queuedData); + } } + // guarded by 'mutex' private void pushCurrentTick() { TickList currentData = this.currentData; @@ -86,7 +90,9 @@ public class TickedDataAggregator implements DataAggregator { @Override public Map getData() { // push the current tick - pushCurrentTick(); + synchronized (this.mutex) { + pushCurrentTick(); + } // close the tick counter this.tickCounter.close(); @@ -102,6 +108,7 @@ public class TickedDataAggregator implements DataAggregator { return this.threadData; } + // called by TickList void insertData(List dataList) { for (QueuedThreadInfo data : dataList) { try { -- cgit