aboutsummaryrefslogtreecommitdiff
path: root/common/src/main/java/com/sk89q/warmroast/Sampler.java
diff options
context:
space:
mode:
authorLuck <git@lucko.me>2018-05-28 15:35:53 +0100
committerLuck <git@lucko.me>2018-05-28 15:35:53 +0100
commitc29e96eb5d7981bbb1d9b54e92cbc9f22319efb8 (patch)
tree89e39cd6dfa854f48ba30386838c807578820b79 /common/src/main/java/com/sk89q/warmroast/Sampler.java
parent70b88264da4ff67dfd7b887e9550c9934f2414c7 (diff)
downloadspark-c29e96eb5d7981bbb1d9b54e92cbc9f22319efb8.tar.gz
spark-c29e96eb5d7981bbb1d9b54e92cbc9f22319efb8.tar.bz2
spark-c29e96eb5d7981bbb1d9b54e92cbc9f22319efb8.zip
Handle node insertion in separate worker threads, to reduce the work load on the sampling thread
and some other misc changes
Diffstat (limited to 'common/src/main/java/com/sk89q/warmroast/Sampler.java')
-rw-r--r--common/src/main/java/com/sk89q/warmroast/Sampler.java84
1 files changed, 64 insertions, 20 deletions
diff --git a/common/src/main/java/com/sk89q/warmroast/Sampler.java b/common/src/main/java/com/sk89q/warmroast/Sampler.java
index 6c4f60c..a141f82 100644
--- a/common/src/main/java/com/sk89q/warmroast/Sampler.java
+++ b/common/src/main/java/com/sk89q/warmroast/Sampler.java
@@ -25,31 +25,36 @@ import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
/**
* Main sampler class.
*/
public class Sampler extends TimerTask {
- /**
- * The thread management interface for the current JVM
- */
+ /** The thread management interface for the current JVM */
private final ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
- /**
- * A map of root stack nodes for each thread with sampling data
- */
+ /** A map of root stack nodes for each thread with sampling data */
private final Map<String, StackNode> threadData = new HashMap<>();
- /**
- * A future to encapsulation the completion of this sampler instance
- */
+ /** A set of recorded thread info that's yet to be inserted into the node structure */
+ private final Set<QueuedThreadInfo> pendingThreadData = Collections.synchronizedSet(Collections.newSetFromMap(new IdentityHashMap<>()));
+
+ /** The worker pool for inserting stack nodes */
+ private ExecutorService workerPool;
+ /** The lock object */
+ private final Object[] lock = new Object[0];
+ /** A future to encapsulation the completion of this sampler instance */
private final CompletableFuture<Sampler> future = new CompletableFuture<>();
/** The interval to wait between sampling, in milliseconds */
@@ -70,20 +75,47 @@ public class Sampler extends TimerTask {
/**
* Starts the sampler.
*
- * @param timer the timer to schedule the sampling on
+ * @param samplingThread the timer to schedule the sampling on
+ * @param workerPool the worker pool
*/
- public synchronized void start(Timer timer) {
- timer.scheduleAtFixedRate(this, 0, this.interval);
+ public void start(Timer samplingThread, ExecutorService workerPool) {
+ this.workerPool = workerPool;
+ samplingThread.scheduleAtFixedRate(this, 0, this.interval);
this.startTime = System.currentTimeMillis();
}
+ private void insertData(QueuedThreadInfo data) {
+ synchronized (this.lock) {
+ try {
+ StackNode node = this.threadData.computeIfAbsent(data.threadName, StackNode::new);
+ node.log(data.stack, Sampler.this.interval);
+ this.pendingThreadData.remove(data);
+ } catch (Exception e) {
+ e.printStackTrace();
+ // we need to remove the pending data even if the insert failed
+ this.pendingThreadData.remove(data);
+ }
+ }
+ }
+
/**
* Gets the sampling data recorded by this instance.
*
* @return the data
*/
public Map<String, StackNode> getData() {
- return this.threadData;
+ if (this.pendingThreadData.isEmpty()) {
+ return this.threadData;
+ }
+
+ // wait for all pending data to be inserted
+ while (true) {
+ synchronized (this.lock) {
+ if (this.pendingThreadData.isEmpty()) {
+ return this.threadData;
+ }
+ }
+ }
}
public long getStartTime() {
@@ -101,12 +133,8 @@ public class Sampler extends TimerTask {
return this.future;
}
- private StackNode getRootNode(String threadName) {
- return this.threadData.computeIfAbsent(threadName, StackNode::new);
- }
-
@Override
- public synchronized void run() {
+ public void run() {
try {
if (this.endTime != -1 && this.endTime <= System.currentTimeMillis()) {
this.future.complete(this);
@@ -123,8 +151,9 @@ public class Sampler extends TimerTask {
continue;
}
- StackNode node = getRootNode(threadName);
- node.log(stack, this.interval);
+ QueuedThreadInfo queuedData = new QueuedThreadInfo(threadName, stack);
+ this.pendingThreadData.add(queuedData);
+ this.workerPool.execute(queuedData);
}
} catch (Throwable t) {
this.future.completeExceptionally(t);
@@ -153,4 +182,19 @@ public class Sampler extends TimerTask {
return out;
}
+ private final class QueuedThreadInfo implements Runnable {
+ private final String threadName;
+ private final StackTraceElement[] stack;
+
+ private QueuedThreadInfo(String threadName, StackTraceElement[] stack) {
+ this.threadName = threadName;
+ this.stack = stack;
+ }
+
+ @Override
+ public void run() {
+ insertData(this);
+ }
+ }
+
}