diff options
-rw-r--r-- | spark-common/src/main/java/me/lucko/spark/common/CommandHandler.java | 8 | ||||
-rw-r--r-- | spark-common/src/main/java/me/lucko/spark/profiler/Sampler.java | 27 | ||||
-rw-r--r-- | spark-common/src/main/java/me/lucko/spark/profiler/SamplerBuilder.java | 5 | ||||
-rw-r--r-- | spark-common/src/main/java/me/lucko/spark/profiler/SimpleDataAggregator.java (renamed from spark-common/src/main/java/me/lucko/spark/profiler/AsyncDataAggregator.java) | 42 | ||||
-rw-r--r-- | spark-common/src/main/java/me/lucko/spark/profiler/TickedDataAggregator.java | 31 |
5 files changed, 48 insertions, 65 deletions
diff --git a/spark-common/src/main/java/me/lucko/spark/common/CommandHandler.java b/spark-common/src/main/java/me/lucko/spark/common/CommandHandler.java index 72774d5..e253d29 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/CommandHandler.java +++ b/spark-common/src/main/java/me/lucko/spark/common/CommandHandler.java @@ -17,7 +17,6 @@ import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.Set; -import java.util.Timer; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; @@ -36,11 +35,6 @@ public abstract class CommandHandler<T> { /** The prefix used in all messages */ private static final String PREFIX = "&8[&fspark&8] &7"; - /** - * The {@link Timer} being used by the {@link #activeSampler}. - */ - private final Timer samplingThread = new Timer("spark-sampling-thread", true); - /** Guards {@link #activeSampler} */ private final Object[] activeSamplerMutex = new Object[0]; /** The WarmRoast instance currently running, if any */ @@ -183,7 +177,7 @@ public abstract class CommandHandler<T> { if (ticksOver != -1) { builder.ticksOver(ticksOver, tickCounter); } - sampler = this.activeSampler = builder.start(this.samplingThread); + sampler = this.activeSampler = builder.start(); sendPrefixedMessage("&bProfiler now active!"); if (timeoutSeconds == -1) { diff --git a/spark-common/src/main/java/me/lucko/spark/profiler/Sampler.java b/spark-common/src/main/java/me/lucko/spark/profiler/Sampler.java index 3476f03..14e82c8 100644 --- a/spark-common/src/main/java/me/lucko/spark/profiler/Sampler.java +++ b/spark-common/src/main/java/me/lucko/spark/profiler/Sampler.java @@ -32,25 +32,28 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.Timer; -import java.util.TimerTask; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.zip.GZIPOutputStream; /** * Main sampler class. */ -public class Sampler extends TimerTask { +public class Sampler implements Runnable { private static final AtomicInteger THREAD_ID = new AtomicInteger(0); /** The worker pool for inserting stack nodes */ - private final ExecutorService workerPool = Executors.newFixedThreadPool( - 6, new ThreadFactoryBuilder().setNameFormat("spark-worker-" + THREAD_ID.getAndIncrement()).build() + private final ScheduledExecutorService workerPool = Executors.newScheduledThreadPool( + 9, new ThreadFactoryBuilder().setNameFormat("spark-worker-" + THREAD_ID.getAndIncrement() + "-%d").build() ); + /** The main sampling task */ + private ScheduledFuture<?> task; + /** The thread management interface for the current JVM */ private final ThreadMXBean threadBean = ManagementFactory.getThreadMXBean(); /** The instance used to generate thread information for use in sampling */ @@ -70,7 +73,7 @@ public class Sampler extends TimerTask { public Sampler(int interval, ThreadDumper threadDumper, ThreadGrouper threadGrouper, long endTime) { this.threadDumper = threadDumper; - this.dataAggregator = new AsyncDataAggregator(this.workerPool, threadGrouper, interval); + this.dataAggregator = new SimpleDataAggregator(this.workerPool, threadGrouper, interval); this.interval = interval; this.endTime = endTime; } @@ -84,13 +87,11 @@ public class Sampler extends TimerTask { /** * Starts the sampler. - * - * @param samplingThread the timer to schedule the sampling on */ - public void start(Timer samplingThread) { + public void start() { this.startTime = System.currentTimeMillis(); this.dataAggregator.start(); - samplingThread.scheduleAtFixedRate(this, 0, this.interval); + this.task = workerPool.scheduleAtFixedRate(this, 0, interval, TimeUnit.MILLISECONDS); } public long getStartTime() { @@ -108,6 +109,10 @@ public class Sampler extends TimerTask { return this.future; } + public void cancel() { + task.cancel(false); + } + @Override public void run() { try { diff --git a/spark-common/src/main/java/me/lucko/spark/profiler/SamplerBuilder.java b/spark-common/src/main/java/me/lucko/spark/profiler/SamplerBuilder.java index 7db0515..733f4e3 100644 --- a/spark-common/src/main/java/me/lucko/spark/profiler/SamplerBuilder.java +++ b/spark-common/src/main/java/me/lucko/spark/profiler/SamplerBuilder.java @@ -1,6 +1,5 @@ package me.lucko.spark.profiler; -import java.util.Timer; import java.util.concurrent.TimeUnit; /** @@ -48,7 +47,7 @@ public class SamplerBuilder { return this; } - public Sampler start(Timer samplingThread) { + public Sampler start() { Sampler sampler; if (this.ticksOver != -1 && this.tickCounter != null) { sampler = new Sampler(this.samplingInterval, this.threadDumper, this.threadGrouper, this.timeout, this.tickCounter, this.ticksOver); @@ -56,7 +55,7 @@ public class SamplerBuilder { sampler = new Sampler(this.samplingInterval, this.threadDumper, this.threadGrouper, this.timeout); } - sampler.start(samplingThread); + sampler.start(); return sampler; } diff --git a/spark-common/src/main/java/me/lucko/spark/profiler/AsyncDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/profiler/SimpleDataAggregator.java index 9a4090e..f4138af 100644 --- a/spark-common/src/main/java/me/lucko/spark/profiler/AsyncDataAggregator.java +++ b/spark-common/src/main/java/me/lucko/spark/profiler/SimpleDataAggregator.java @@ -9,12 +9,12 @@ import java.util.concurrent.TimeUnit; * Implementation of {@link DataAggregator} that makes use of a "worker" thread pool for inserting * data. */ -public class AsyncDataAggregator implements DataAggregator { +public class SimpleDataAggregator implements DataAggregator { /** A map of root stack nodes for each thread with sampling data */ private final Map<String, StackNode> threadData = new ConcurrentHashMap<>(); - /** The worker pool for inserting stack nodes */ + /** The worker pool used for sampling */ private final ExecutorService workerPool; /** The instance used to group threads together */ @@ -23,7 +23,7 @@ public class AsyncDataAggregator implements DataAggregator { /** The interval to wait between sampling, in milliseconds */ private final int interval; - public AsyncDataAggregator(ExecutorService workerPool, ThreadGrouper threadGrouper, int interval) { + public SimpleDataAggregator(ExecutorService workerPool, ThreadGrouper threadGrouper, int interval) { this.workerPool = workerPool; this.threadGrouper = threadGrouper; this.interval = interval; @@ -31,10 +31,13 @@ public class AsyncDataAggregator implements DataAggregator { @Override public void insertData(String threadName, StackTraceElement[] stack) { - // form the queued data - QueuedThreadInfo queuedData = new QueuedThreadInfo(threadName, stack); - // schedule insertion of the data - this.workerPool.execute(queuedData); + try { + String group = this.threadGrouper.getGroup(threadName); + StackNode node = this.threadData.computeIfAbsent(group, StackNode::new); + node.log(stack, this.interval); + } catch (Exception e) { + e.printStackTrace(); + } } @Override @@ -49,29 +52,4 @@ public class AsyncDataAggregator implements DataAggregator { return this.threadData; } - - void insertData(QueuedThreadInfo data) { - try { - String group = this.threadGrouper.getGroup(data.threadName); - StackNode node = this.threadData.computeIfAbsent(group, StackNode::new); - node.log(data.stack, this.interval); - } catch (Exception e) { - e.printStackTrace(); - } - } - - private final class QueuedThreadInfo implements Runnable { - private final String threadName; - private final StackTraceElement[] stack; - - QueuedThreadInfo(String threadName, StackTraceElement[] stack) { - this.threadName = threadName; - this.stack = stack; - } - - @Override - public void run() { - insertData(this); - } - } } diff --git a/spark-common/src/main/java/me/lucko/spark/profiler/TickedDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/profiler/TickedDataAggregator.java index abca4b3..1d23d37 100644 --- a/spark-common/src/main/java/me/lucko/spark/profiler/TickedDataAggregator.java +++ b/spark-common/src/main/java/me/lucko/spark/profiler/TickedDataAggregator.java @@ -34,6 +34,8 @@ public class TickedDataAggregator implements DataAggregator { /** The expected number of samples in each tick */ private final int expectedSize; + private final Object mutex = new Object(); + // state private long currentTick = -1; private TickList currentData = new TickList(0); @@ -48,22 +50,24 @@ public class TickedDataAggregator implements DataAggregator { this.expectedSize = (50 / interval) + 10; } - // this is effectively synchronized by the Timer instance in Sampler @Override public void insertData(String threadName, StackTraceElement[] stack) { - long tick = this.tickCounter.getCurrentTick(); - if (this.currentTick != tick) { - pushCurrentTick(); - this.currentTick = tick; - this.currentData = new TickList(this.expectedSize); - } + synchronized (this.mutex) { + long tick = this.tickCounter.getCurrentTick(); + if (this.currentTick != tick) { + pushCurrentTick(); + this.currentTick = tick; + this.currentData = new TickList(this.expectedSize); + } - // form the queued data - QueuedThreadInfo queuedData = new QueuedThreadInfo(threadName, stack); - // insert it - this.currentData.addData(queuedData); + // form the queued data + QueuedThreadInfo queuedData = new QueuedThreadInfo(threadName, stack); + // insert it + this.currentData.addData(queuedData); + } } + // guarded by 'mutex' private void pushCurrentTick() { TickList currentData = this.currentData; @@ -86,7 +90,9 @@ public class TickedDataAggregator implements DataAggregator { @Override public Map<String, StackNode> getData() { // push the current tick - pushCurrentTick(); + synchronized (this.mutex) { + pushCurrentTick(); + } // close the tick counter this.tickCounter.close(); @@ -102,6 +108,7 @@ public class TickedDataAggregator implements DataAggregator { return this.threadData; } + // called by TickList void insertData(List<QueuedThreadInfo> dataList) { for (QueuedThreadInfo data : dataList) { try { |