diff options
author | Luck <git@lucko.me> | 2018-05-30 15:56:37 +0100 |
---|---|---|
committer | Luck <git@lucko.me> | 2018-05-30 15:56:37 +0100 |
commit | 46b5064045f631ff66ed4a144bed6919e069bbbd (patch) | |
tree | d5aa10b4d7dd3289ba45abd74303ed6bcb4b46a4 /common/src/main/java/me/lucko/spark/profiler/Sampler.java | |
parent | f7c84b3ced3b079f33517f360414b406658a2a72 (diff) | |
download | spark-46b5064045f631ff66ed4a144bed6919e069bbbd.tar.gz spark-46b5064045f631ff66ed4a144bed6919e069bbbd.tar.bz2 spark-46b5064045f631ff66ed4a144bed6919e069bbbd.zip |
Set a max size for the worker pool, replace synchronization with concurrent collections & a phaser
Diffstat (limited to 'common/src/main/java/me/lucko/spark/profiler/Sampler.java')
-rw-r--r-- | common/src/main/java/me/lucko/spark/profiler/Sampler.java | 57 |
1 files changed, 26 insertions, 31 deletions
diff --git a/common/src/main/java/me/lucko/spark/profiler/Sampler.java b/common/src/main/java/me/lucko/spark/profiler/Sampler.java index bfad8d1..5db978c 100644 --- a/common/src/main/java/me/lucko/spark/profiler/Sampler.java +++ b/common/src/main/java/me/lucko/spark/profiler/Sampler.java @@ -25,16 +25,16 @@ import java.lang.management.ManagementFactory; import java.lang.management.ThreadInfo; import java.lang.management.ThreadMXBean; import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.IdentityHashMap; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Phaser; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; /** * Main sampler class. @@ -45,15 +45,13 @@ public class Sampler extends TimerTask { private final ThreadMXBean threadBean = ManagementFactory.getThreadMXBean(); /** A map of root stack nodes for each thread with sampling data */ - private final Map<String, StackNode> threadData = new HashMap<>(); - - /** A set of recorded thread info that's yet to be inserted into the node structure */ - private final Set<QueuedThreadInfo> pendingThreadData = Collections.synchronizedSet(Collections.newSetFromMap(new IdentityHashMap<>())); + private final Map<String, StackNode> threadData = new ConcurrentHashMap<>(); /** The worker pool for inserting stack nodes */ private ExecutorService workerPool; - /** The lock object */ - private final Object[] lock = new Object[0]; + /** The phaser used to track pending stack node data */ + private final Phaser phaser = new Phaser(); + /** A future to encapsulation the completion of this sampler instance */ private final CompletableFuture<Sampler> future = new CompletableFuture<>(); @@ -85,16 +83,14 @@ public class Sampler extends TimerTask { } private void insertData(QueuedThreadInfo data) { - synchronized (this.lock) { - try { - StackNode node = this.threadData.computeIfAbsent(data.threadName, StackNode::new); - node.log(data.stack, Sampler.this.interval); - this.pendingThreadData.remove(data); - } catch (Exception e) { - e.printStackTrace(); - // we need to remove the pending data even if the insert failed - this.pendingThreadData.remove(data); - } + try { + StackNode node = this.threadData.computeIfAbsent(data.threadName, StackNode::new); + node.log(data.stack, Sampler.this.interval); + } catch (Exception e) { + e.printStackTrace(); + } finally { + // countdown the phaser + this.phaser.arriveAndDeregister(); } } @@ -104,18 +100,14 @@ public class Sampler extends TimerTask { * @return the data */ public Map<String, StackNode> getData() { - if (this.pendingThreadData.isEmpty()) { - return this.threadData; - } - // wait for all pending data to be inserted - while (true) { - synchronized (this.lock) { - if (this.pendingThreadData.isEmpty()) { - return this.threadData; - } - } + try { + this.phaser.awaitAdvanceInterruptibly(this.phaser.getPhase(), 15, TimeUnit.SECONDS); + } catch (InterruptedException | TimeoutException e) { + e.printStackTrace(); } + + return this.threadData; } public long getStartTime() { @@ -151,8 +143,11 @@ public class Sampler extends TimerTask { continue; } + // form the queued data QueuedThreadInfo queuedData = new QueuedThreadInfo(threadName, stack); - this.pendingThreadData.add(queuedData); + // mark that this data is pending + this.phaser.register(); + // schedule insertion of the data this.workerPool.execute(queuedData); } } catch (Throwable t) { |