From 46b5064045f631ff66ed4a144bed6919e069bbbd Mon Sep 17 00:00:00 2001 From: Luck Date: Wed, 30 May 2018 15:56:37 +0100 Subject: Set a max size for the worker pool, replace synchronization with concurrent collections & a phaser --- .../java/me/lucko/spark/common/CommandHandler.java | 10 +++- .../main/java/me/lucko/spark/profiler/Sampler.java | 57 ++++++++++------------ .../java/me/lucko/spark/profiler/StackNode.java | 11 +++-- 3 files changed, 40 insertions(+), 38 deletions(-) (limited to 'common') diff --git a/common/src/main/java/me/lucko/spark/common/CommandHandler.java b/common/src/main/java/me/lucko/spark/common/CommandHandler.java index 43b2694..11f0e46 100644 --- a/common/src/main/java/me/lucko/spark/common/CommandHandler.java +++ b/common/src/main/java/me/lucko/spark/common/CommandHandler.java @@ -20,7 +20,8 @@ import java.util.Set; import java.util.Timer; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -44,7 +45,12 @@ public abstract class CommandHandler { /** * The worker {@link ExecutorService} being used by the {@link #activeSampler}. */ - private final ExecutorService workerPool = Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("spark-worker-%d").build()); + private final ExecutorService workerPool = new ThreadPoolExecutor( + 1, 6, + 30L, TimeUnit.SECONDS, + new SynchronousQueue<>(), + new ThreadFactoryBuilder().setNameFormat("spark-worker-%d").build() + ); /** Guards {@link #activeSampler} */ private final Object[] activeSamplerMutex = new Object[0]; diff --git a/common/src/main/java/me/lucko/spark/profiler/Sampler.java b/common/src/main/java/me/lucko/spark/profiler/Sampler.java index bfad8d1..5db978c 100644 --- a/common/src/main/java/me/lucko/spark/profiler/Sampler.java +++ b/common/src/main/java/me/lucko/spark/profiler/Sampler.java @@ -25,16 +25,16 @@ import java.lang.management.ManagementFactory; import java.lang.management.ThreadInfo; import java.lang.management.ThreadMXBean; import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.IdentityHashMap; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Phaser; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; /** * Main sampler class. @@ -45,15 +45,13 @@ public class Sampler extends TimerTask { private final ThreadMXBean threadBean = ManagementFactory.getThreadMXBean(); /** A map of root stack nodes for each thread with sampling data */ - private final Map threadData = new HashMap<>(); - - /** A set of recorded thread info that's yet to be inserted into the node structure */ - private final Set pendingThreadData = Collections.synchronizedSet(Collections.newSetFromMap(new IdentityHashMap<>())); + private final Map threadData = new ConcurrentHashMap<>(); /** The worker pool for inserting stack nodes */ private ExecutorService workerPool; - /** The lock object */ - private final Object[] lock = new Object[0]; + /** The phaser used to track pending stack node data */ + private final Phaser phaser = new Phaser(); + /** A future to encapsulation the completion of this sampler instance */ private final CompletableFuture future = new CompletableFuture<>(); @@ -85,16 +83,14 @@ public class Sampler extends TimerTask { } private void insertData(QueuedThreadInfo data) { - synchronized (this.lock) { - try { - StackNode node = this.threadData.computeIfAbsent(data.threadName, StackNode::new); - node.log(data.stack, Sampler.this.interval); - this.pendingThreadData.remove(data); - } catch (Exception e) { - e.printStackTrace(); - // we need to remove the pending data even if the insert failed - this.pendingThreadData.remove(data); - } + try { + StackNode node = this.threadData.computeIfAbsent(data.threadName, StackNode::new); + node.log(data.stack, Sampler.this.interval); + } catch (Exception e) { + e.printStackTrace(); + } finally { + // countdown the phaser + this.phaser.arriveAndDeregister(); } } @@ -104,18 +100,14 @@ public class Sampler extends TimerTask { * @return the data */ public Map getData() { - if (this.pendingThreadData.isEmpty()) { - return this.threadData; - } - // wait for all pending data to be inserted - while (true) { - synchronized (this.lock) { - if (this.pendingThreadData.isEmpty()) { - return this.threadData; - } - } + try { + this.phaser.awaitAdvanceInterruptibly(this.phaser.getPhase(), 15, TimeUnit.SECONDS); + } catch (InterruptedException | TimeoutException e) { + e.printStackTrace(); } + + return this.threadData; } public long getStartTime() { @@ -151,8 +143,11 @@ public class Sampler extends TimerTask { continue; } + // form the queued data QueuedThreadInfo queuedData = new QueuedThreadInfo(threadName, stack); - this.pendingThreadData.add(queuedData); + // mark that this data is pending + this.phaser.register(); + // schedule insertion of the data this.workerPool.execute(queuedData); } } catch (Throwable t) { diff --git a/common/src/main/java/me/lucko/spark/profiler/StackNode.java b/common/src/main/java/me/lucko/spark/profiler/StackNode.java index 1e1ea63..b2b6f88 100644 --- a/common/src/main/java/me/lucko/spark/profiler/StackNode.java +++ b/common/src/main/java/me/lucko/spark/profiler/StackNode.java @@ -24,9 +24,10 @@ import com.google.gson.JsonObject; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.LongAdder; /** * Represents a node in the overall sampling stack. @@ -46,12 +47,12 @@ public class StackNode implements Comparable { /** * A map of this nodes children */ - private final Map children = new HashMap<>(); + private final Map children = new ConcurrentHashMap<>(); /** * The accumulated sample time for this node */ - private long totalTime = 0; + private final LongAdder totalTime = new LongAdder(); public StackNode(String name) { this.name = name; @@ -80,11 +81,11 @@ public class StackNode implements Comparable { } public long getTotalTime() { - return this.totalTime; + return this.totalTime.longValue(); } public void accumulateTime(long time) { - this.totalTime += time; + this.totalTime.add(time); } private void log(StackTraceElement[] elements, int skip, long time) { -- cgit