diff options
author | Luck <git@lucko.me> | 2018-05-30 15:56:37 +0100 |
---|---|---|
committer | Luck <git@lucko.me> | 2018-05-30 15:56:37 +0100 |
commit | 46b5064045f631ff66ed4a144bed6919e069bbbd (patch) | |
tree | d5aa10b4d7dd3289ba45abd74303ed6bcb4b46a4 /common/src/main/java/me/lucko/spark | |
parent | f7c84b3ced3b079f33517f360414b406658a2a72 (diff) | |
download | spark-46b5064045f631ff66ed4a144bed6919e069bbbd.tar.gz spark-46b5064045f631ff66ed4a144bed6919e069bbbd.tar.bz2 spark-46b5064045f631ff66ed4a144bed6919e069bbbd.zip |
Set a max size for the worker pool, replace synchronization with concurrent collections & a phaser
Diffstat (limited to 'common/src/main/java/me/lucko/spark')
3 files changed, 40 insertions, 38 deletions
diff --git a/common/src/main/java/me/lucko/spark/common/CommandHandler.java b/common/src/main/java/me/lucko/spark/common/CommandHandler.java index 43b2694..11f0e46 100644 --- a/common/src/main/java/me/lucko/spark/common/CommandHandler.java +++ b/common/src/main/java/me/lucko/spark/common/CommandHandler.java @@ -20,7 +20,8 @@ import java.util.Set; import java.util.Timer; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -44,7 +45,12 @@ public abstract class CommandHandler<T> { /** * The worker {@link ExecutorService} being used by the {@link #activeSampler}. */ - private final ExecutorService workerPool = Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("spark-worker-%d").build()); + private final ExecutorService workerPool = new ThreadPoolExecutor( + 1, 6, + 30L, TimeUnit.SECONDS, + new SynchronousQueue<>(), + new ThreadFactoryBuilder().setNameFormat("spark-worker-%d").build() + ); /** Guards {@link #activeSampler} */ private final Object[] activeSamplerMutex = new Object[0]; diff --git a/common/src/main/java/me/lucko/spark/profiler/Sampler.java b/common/src/main/java/me/lucko/spark/profiler/Sampler.java index bfad8d1..5db978c 100644 --- a/common/src/main/java/me/lucko/spark/profiler/Sampler.java +++ b/common/src/main/java/me/lucko/spark/profiler/Sampler.java @@ -25,16 +25,16 @@ import java.lang.management.ManagementFactory; import java.lang.management.ThreadInfo; import java.lang.management.ThreadMXBean; import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.IdentityHashMap; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Phaser; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; /** * Main sampler class. @@ -45,15 +45,13 @@ public class Sampler extends TimerTask { private final ThreadMXBean threadBean = ManagementFactory.getThreadMXBean(); /** A map of root stack nodes for each thread with sampling data */ - private final Map<String, StackNode> threadData = new HashMap<>(); - - /** A set of recorded thread info that's yet to be inserted into the node structure */ - private final Set<QueuedThreadInfo> pendingThreadData = Collections.synchronizedSet(Collections.newSetFromMap(new IdentityHashMap<>())); + private final Map<String, StackNode> threadData = new ConcurrentHashMap<>(); /** The worker pool for inserting stack nodes */ private ExecutorService workerPool; - /** The lock object */ - private final Object[] lock = new Object[0]; + /** The phaser used to track pending stack node data */ + private final Phaser phaser = new Phaser(); + /** A future to encapsulation the completion of this sampler instance */ private final CompletableFuture<Sampler> future = new CompletableFuture<>(); @@ -85,16 +83,14 @@ public class Sampler extends TimerTask { } private void insertData(QueuedThreadInfo data) { - synchronized (this.lock) { - try { - StackNode node = this.threadData.computeIfAbsent(data.threadName, StackNode::new); - node.log(data.stack, Sampler.this.interval); - this.pendingThreadData.remove(data); - } catch (Exception e) { - e.printStackTrace(); - // we need to remove the pending data even if the insert failed - this.pendingThreadData.remove(data); - } + try { + StackNode node = this.threadData.computeIfAbsent(data.threadName, StackNode::new); + node.log(data.stack, Sampler.this.interval); + } catch (Exception e) { + e.printStackTrace(); + } finally { + // countdown the phaser + this.phaser.arriveAndDeregister(); } } @@ -104,18 +100,14 @@ public class Sampler extends TimerTask { * @return the data */ public Map<String, StackNode> getData() { - if (this.pendingThreadData.isEmpty()) { - return this.threadData; - } - // wait for all pending data to be inserted - while (true) { - synchronized (this.lock) { - if (this.pendingThreadData.isEmpty()) { - return this.threadData; - } - } + try { + this.phaser.awaitAdvanceInterruptibly(this.phaser.getPhase(), 15, TimeUnit.SECONDS); + } catch (InterruptedException | TimeoutException e) { + e.printStackTrace(); } + + return this.threadData; } public long getStartTime() { @@ -151,8 +143,11 @@ public class Sampler extends TimerTask { continue; } + // form the queued data QueuedThreadInfo queuedData = new QueuedThreadInfo(threadName, stack); - this.pendingThreadData.add(queuedData); + // mark that this data is pending + this.phaser.register(); + // schedule insertion of the data this.workerPool.execute(queuedData); } } catch (Throwable t) { diff --git a/common/src/main/java/me/lucko/spark/profiler/StackNode.java b/common/src/main/java/me/lucko/spark/profiler/StackNode.java index 1e1ea63..b2b6f88 100644 --- a/common/src/main/java/me/lucko/spark/profiler/StackNode.java +++ b/common/src/main/java/me/lucko/spark/profiler/StackNode.java @@ -24,9 +24,10 @@ import com.google.gson.JsonObject; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.LongAdder; /** * Represents a node in the overall sampling stack. @@ -46,12 +47,12 @@ public class StackNode implements Comparable<StackNode> { /** * A map of this nodes children */ - private final Map<String, StackNode> children = new HashMap<>(); + private final Map<String, StackNode> children = new ConcurrentHashMap<>(); /** * The accumulated sample time for this node */ - private long totalTime = 0; + private final LongAdder totalTime = new LongAdder(); public StackNode(String name) { this.name = name; @@ -80,11 +81,11 @@ public class StackNode implements Comparable<StackNode> { } public long getTotalTime() { - return this.totalTime; + return this.totalTime.longValue(); } public void accumulateTime(long time) { - this.totalTime += time; + this.totalTime.add(time); } private void log(StackTraceElement[] elements, int skip, long time) { |