diff options
author | Luck <git@lucko.me> | 2018-05-28 15:35:53 +0100 |
---|---|---|
committer | Luck <git@lucko.me> | 2018-05-28 15:35:53 +0100 |
commit | c29e96eb5d7981bbb1d9b54e92cbc9f22319efb8 (patch) | |
tree | 89e39cd6dfa854f48ba30386838c807578820b79 /common/src/main/java/com/sk89q/warmroast/Sampler.java | |
parent | 70b88264da4ff67dfd7b887e9550c9934f2414c7 (diff) | |
download | spark-c29e96eb5d7981bbb1d9b54e92cbc9f22319efb8.tar.gz spark-c29e96eb5d7981bbb1d9b54e92cbc9f22319efb8.tar.bz2 spark-c29e96eb5d7981bbb1d9b54e92cbc9f22319efb8.zip |
Handle node insertion in separate worker threads, to reduce the work load on the sampling thread
and some other misc changes
Diffstat (limited to 'common/src/main/java/com/sk89q/warmroast/Sampler.java')
-rw-r--r-- | common/src/main/java/com/sk89q/warmroast/Sampler.java | 84 |
1 files changed, 64 insertions, 20 deletions
diff --git a/common/src/main/java/com/sk89q/warmroast/Sampler.java b/common/src/main/java/com/sk89q/warmroast/Sampler.java index 6c4f60c..a141f82 100644 --- a/common/src/main/java/com/sk89q/warmroast/Sampler.java +++ b/common/src/main/java/com/sk89q/warmroast/Sampler.java @@ -25,31 +25,36 @@ import java.lang.management.ManagementFactory; import java.lang.management.ThreadInfo; import java.lang.management.ThreadMXBean; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; +import java.util.IdentityHashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; /** * Main sampler class. */ public class Sampler extends TimerTask { - /** - * The thread management interface for the current JVM - */ + /** The thread management interface for the current JVM */ private final ThreadMXBean threadBean = ManagementFactory.getThreadMXBean(); - /** - * A map of root stack nodes for each thread with sampling data - */ + /** A map of root stack nodes for each thread with sampling data */ private final Map<String, StackNode> threadData = new HashMap<>(); - /** - * A future to encapsulation the completion of this sampler instance - */ + /** A set of recorded thread info that's yet to be inserted into the node structure */ + private final Set<QueuedThreadInfo> pendingThreadData = Collections.synchronizedSet(Collections.newSetFromMap(new IdentityHashMap<>())); + + /** The worker pool for inserting stack nodes */ + private ExecutorService workerPool; + /** The lock object */ + private final Object[] lock = new Object[0]; + /** A future to encapsulation the completion of this sampler instance */ private final CompletableFuture<Sampler> future = new CompletableFuture<>(); /** The interval to wait between sampling, in milliseconds */ @@ -70,20 +75,47 @@ public class Sampler extends TimerTask { /** * Starts the sampler. * - * @param timer the timer to schedule the sampling on + * @param samplingThread the timer to schedule the sampling on + * @param workerPool the worker pool */ - public synchronized void start(Timer timer) { - timer.scheduleAtFixedRate(this, 0, this.interval); + public void start(Timer samplingThread, ExecutorService workerPool) { + this.workerPool = workerPool; + samplingThread.scheduleAtFixedRate(this, 0, this.interval); this.startTime = System.currentTimeMillis(); } + private void insertData(QueuedThreadInfo data) { + synchronized (this.lock) { + try { + StackNode node = this.threadData.computeIfAbsent(data.threadName, StackNode::new); + node.log(data.stack, Sampler.this.interval); + this.pendingThreadData.remove(data); + } catch (Exception e) { + e.printStackTrace(); + // we need to remove the pending data even if the insert failed + this.pendingThreadData.remove(data); + } + } + } + /** * Gets the sampling data recorded by this instance. * * @return the data */ public Map<String, StackNode> getData() { - return this.threadData; + if (this.pendingThreadData.isEmpty()) { + return this.threadData; + } + + // wait for all pending data to be inserted + while (true) { + synchronized (this.lock) { + if (this.pendingThreadData.isEmpty()) { + return this.threadData; + } + } + } } public long getStartTime() { @@ -101,12 +133,8 @@ public class Sampler extends TimerTask { return this.future; } - private StackNode getRootNode(String threadName) { - return this.threadData.computeIfAbsent(threadName, StackNode::new); - } - @Override - public synchronized void run() { + public void run() { try { if (this.endTime != -1 && this.endTime <= System.currentTimeMillis()) { this.future.complete(this); @@ -123,8 +151,9 @@ public class Sampler extends TimerTask { continue; } - StackNode node = getRootNode(threadName); - node.log(stack, this.interval); + QueuedThreadInfo queuedData = new QueuedThreadInfo(threadName, stack); + this.pendingThreadData.add(queuedData); + this.workerPool.execute(queuedData); } } catch (Throwable t) { this.future.completeExceptionally(t); @@ -153,4 +182,19 @@ public class Sampler extends TimerTask { return out; } + private final class QueuedThreadInfo implements Runnable { + private final String threadName; + private final StackTraceElement[] stack; + + private QueuedThreadInfo(String threadName, StackTraceElement[] stack) { + this.threadName = threadName; + this.stack = stack; + } + + @Override + public void run() { + insertData(this); + } + } + } |