aboutsummaryrefslogtreecommitdiff
path: root/spark-common/src/main/java/me/lucko
diff options
context:
space:
mode:
authorLuck <git@lucko.me>2018-06-09 22:47:50 +0100
committerLuck <git@lucko.me>2018-06-09 22:47:50 +0100
commit48924d99d3f91d7319eceea74619c5ebcf4d76fa (patch)
tree9c9a2030933f3da447059f99f8ff174842dbefce /spark-common/src/main/java/me/lucko
parent7e4ca1ef197422cb08be675707641f68c4e52f5f (diff)
downloadspark-48924d99d3f91d7319eceea74619c5ebcf4d76fa.tar.gz
spark-48924d99d3f91d7319eceea74619c5ebcf4d76fa.tar.bz2
spark-48924d99d3f91d7319eceea74619c5ebcf4d76fa.zip
Use ScheduledExecutorService instead of Timer
Diffstat (limited to 'spark-common/src/main/java/me/lucko')
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/CommandHandler.java8
-rw-r--r--spark-common/src/main/java/me/lucko/spark/profiler/Sampler.java27
-rw-r--r--spark-common/src/main/java/me/lucko/spark/profiler/SamplerBuilder.java5
-rw-r--r--spark-common/src/main/java/me/lucko/spark/profiler/SimpleDataAggregator.java (renamed from spark-common/src/main/java/me/lucko/spark/profiler/AsyncDataAggregator.java)42
-rw-r--r--spark-common/src/main/java/me/lucko/spark/profiler/TickedDataAggregator.java31
5 files changed, 48 insertions, 65 deletions
diff --git a/spark-common/src/main/java/me/lucko/spark/common/CommandHandler.java b/spark-common/src/main/java/me/lucko/spark/common/CommandHandler.java
index 72774d5..e253d29 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/CommandHandler.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/CommandHandler.java
@@ -17,7 +17,6 @@ import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
-import java.util.Timer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
@@ -36,11 +35,6 @@ public abstract class CommandHandler<T> {
/** The prefix used in all messages */
private static final String PREFIX = "&8[&fspark&8] &7";
- /**
- * The {@link Timer} being used by the {@link #activeSampler}.
- */
- private final Timer samplingThread = new Timer("spark-sampling-thread", true);
-
/** Guards {@link #activeSampler} */
private final Object[] activeSamplerMutex = new Object[0];
/** The WarmRoast instance currently running, if any */
@@ -183,7 +177,7 @@ public abstract class CommandHandler<T> {
if (ticksOver != -1) {
builder.ticksOver(ticksOver, tickCounter);
}
- sampler = this.activeSampler = builder.start(this.samplingThread);
+ sampler = this.activeSampler = builder.start();
sendPrefixedMessage("&bProfiler now active!");
if (timeoutSeconds == -1) {
diff --git a/spark-common/src/main/java/me/lucko/spark/profiler/Sampler.java b/spark-common/src/main/java/me/lucko/spark/profiler/Sampler.java
index 3476f03..14e82c8 100644
--- a/spark-common/src/main/java/me/lucko/spark/profiler/Sampler.java
+++ b/spark-common/src/main/java/me/lucko/spark/profiler/Sampler.java
@@ -32,25 +32,28 @@ import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.GZIPOutputStream;
/**
* Main sampler class.
*/
-public class Sampler extends TimerTask {
+public class Sampler implements Runnable {
private static final AtomicInteger THREAD_ID = new AtomicInteger(0);
/** The worker pool for inserting stack nodes */
- private final ExecutorService workerPool = Executors.newFixedThreadPool(
- 6, new ThreadFactoryBuilder().setNameFormat("spark-worker-" + THREAD_ID.getAndIncrement()).build()
+ private final ScheduledExecutorService workerPool = Executors.newScheduledThreadPool(
+ 9, new ThreadFactoryBuilder().setNameFormat("spark-worker-" + THREAD_ID.getAndIncrement() + "-%d").build()
);
+ /** The main sampling task */
+ private ScheduledFuture<?> task;
+
/** The thread management interface for the current JVM */
private final ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
/** The instance used to generate thread information for use in sampling */
@@ -70,7 +73,7 @@ public class Sampler extends TimerTask {
public Sampler(int interval, ThreadDumper threadDumper, ThreadGrouper threadGrouper, long endTime) {
this.threadDumper = threadDumper;
- this.dataAggregator = new AsyncDataAggregator(this.workerPool, threadGrouper, interval);
+ this.dataAggregator = new SimpleDataAggregator(this.workerPool, threadGrouper, interval);
this.interval = interval;
this.endTime = endTime;
}
@@ -84,13 +87,11 @@ public class Sampler extends TimerTask {
/**
* Starts the sampler.
- *
- * @param samplingThread the timer to schedule the sampling on
*/
- public void start(Timer samplingThread) {
+ public void start() {
this.startTime = System.currentTimeMillis();
this.dataAggregator.start();
- samplingThread.scheduleAtFixedRate(this, 0, this.interval);
+ this.task = workerPool.scheduleAtFixedRate(this, 0, interval, TimeUnit.MILLISECONDS);
}
public long getStartTime() {
@@ -108,6 +109,10 @@ public class Sampler extends TimerTask {
return this.future;
}
+ public void cancel() {
+ task.cancel(false);
+ }
+
@Override
public void run() {
try {
diff --git a/spark-common/src/main/java/me/lucko/spark/profiler/SamplerBuilder.java b/spark-common/src/main/java/me/lucko/spark/profiler/SamplerBuilder.java
index 7db0515..733f4e3 100644
--- a/spark-common/src/main/java/me/lucko/spark/profiler/SamplerBuilder.java
+++ b/spark-common/src/main/java/me/lucko/spark/profiler/SamplerBuilder.java
@@ -1,6 +1,5 @@
package me.lucko.spark.profiler;
-import java.util.Timer;
import java.util.concurrent.TimeUnit;
/**
@@ -48,7 +47,7 @@ public class SamplerBuilder {
return this;
}
- public Sampler start(Timer samplingThread) {
+ public Sampler start() {
Sampler sampler;
if (this.ticksOver != -1 && this.tickCounter != null) {
sampler = new Sampler(this.samplingInterval, this.threadDumper, this.threadGrouper, this.timeout, this.tickCounter, this.ticksOver);
@@ -56,7 +55,7 @@ public class SamplerBuilder {
sampler = new Sampler(this.samplingInterval, this.threadDumper, this.threadGrouper, this.timeout);
}
- sampler.start(samplingThread);
+ sampler.start();
return sampler;
}
diff --git a/spark-common/src/main/java/me/lucko/spark/profiler/AsyncDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/profiler/SimpleDataAggregator.java
index 9a4090e..f4138af 100644
--- a/spark-common/src/main/java/me/lucko/spark/profiler/AsyncDataAggregator.java
+++ b/spark-common/src/main/java/me/lucko/spark/profiler/SimpleDataAggregator.java
@@ -9,12 +9,12 @@ import java.util.concurrent.TimeUnit;
* Implementation of {@link DataAggregator} that makes use of a "worker" thread pool for inserting
* data.
*/
-public class AsyncDataAggregator implements DataAggregator {
+public class SimpleDataAggregator implements DataAggregator {
/** A map of root stack nodes for each thread with sampling data */
private final Map<String, StackNode> threadData = new ConcurrentHashMap<>();
- /** The worker pool for inserting stack nodes */
+ /** The worker pool used for sampling */
private final ExecutorService workerPool;
/** The instance used to group threads together */
@@ -23,7 +23,7 @@ public class AsyncDataAggregator implements DataAggregator {
/** The interval to wait between sampling, in milliseconds */
private final int interval;
- public AsyncDataAggregator(ExecutorService workerPool, ThreadGrouper threadGrouper, int interval) {
+ public SimpleDataAggregator(ExecutorService workerPool, ThreadGrouper threadGrouper, int interval) {
this.workerPool = workerPool;
this.threadGrouper = threadGrouper;
this.interval = interval;
@@ -31,10 +31,13 @@ public class AsyncDataAggregator implements DataAggregator {
@Override
public void insertData(String threadName, StackTraceElement[] stack) {
- // form the queued data
- QueuedThreadInfo queuedData = new QueuedThreadInfo(threadName, stack);
- // schedule insertion of the data
- this.workerPool.execute(queuedData);
+ try {
+ String group = this.threadGrouper.getGroup(threadName);
+ StackNode node = this.threadData.computeIfAbsent(group, StackNode::new);
+ node.log(stack, this.interval);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
}
@Override
@@ -49,29 +52,4 @@ public class AsyncDataAggregator implements DataAggregator {
return this.threadData;
}
-
- void insertData(QueuedThreadInfo data) {
- try {
- String group = this.threadGrouper.getGroup(data.threadName);
- StackNode node = this.threadData.computeIfAbsent(group, StackNode::new);
- node.log(data.stack, this.interval);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- private final class QueuedThreadInfo implements Runnable {
- private final String threadName;
- private final StackTraceElement[] stack;
-
- QueuedThreadInfo(String threadName, StackTraceElement[] stack) {
- this.threadName = threadName;
- this.stack = stack;
- }
-
- @Override
- public void run() {
- insertData(this);
- }
- }
}
diff --git a/spark-common/src/main/java/me/lucko/spark/profiler/TickedDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/profiler/TickedDataAggregator.java
index abca4b3..1d23d37 100644
--- a/spark-common/src/main/java/me/lucko/spark/profiler/TickedDataAggregator.java
+++ b/spark-common/src/main/java/me/lucko/spark/profiler/TickedDataAggregator.java
@@ -34,6 +34,8 @@ public class TickedDataAggregator implements DataAggregator {
/** The expected number of samples in each tick */
private final int expectedSize;
+ private final Object mutex = new Object();
+
// state
private long currentTick = -1;
private TickList currentData = new TickList(0);
@@ -48,22 +50,24 @@ public class TickedDataAggregator implements DataAggregator {
this.expectedSize = (50 / interval) + 10;
}
- // this is effectively synchronized by the Timer instance in Sampler
@Override
public void insertData(String threadName, StackTraceElement[] stack) {
- long tick = this.tickCounter.getCurrentTick();
- if (this.currentTick != tick) {
- pushCurrentTick();
- this.currentTick = tick;
- this.currentData = new TickList(this.expectedSize);
- }
+ synchronized (this.mutex) {
+ long tick = this.tickCounter.getCurrentTick();
+ if (this.currentTick != tick) {
+ pushCurrentTick();
+ this.currentTick = tick;
+ this.currentData = new TickList(this.expectedSize);
+ }
- // form the queued data
- QueuedThreadInfo queuedData = new QueuedThreadInfo(threadName, stack);
- // insert it
- this.currentData.addData(queuedData);
+ // form the queued data
+ QueuedThreadInfo queuedData = new QueuedThreadInfo(threadName, stack);
+ // insert it
+ this.currentData.addData(queuedData);
+ }
}
+ // guarded by 'mutex'
private void pushCurrentTick() {
TickList currentData = this.currentData;
@@ -86,7 +90,9 @@ public class TickedDataAggregator implements DataAggregator {
@Override
public Map<String, StackNode> getData() {
// push the current tick
- pushCurrentTick();
+ synchronized (this.mutex) {
+ pushCurrentTick();
+ }
// close the tick counter
this.tickCounter.close();
@@ -102,6 +108,7 @@ public class TickedDataAggregator implements DataAggregator {
return this.threadData;
}
+ // called by TickList
void insertData(List<QueuedThreadInfo> dataList) {
for (QueuedThreadInfo data : dataList) {
try {