aboutsummaryrefslogtreecommitdiff
path: root/common/src/main/java/me/lucko/spark
diff options
context:
space:
mode:
authorLuck <git@lucko.me>2018-05-30 15:56:37 +0100
committerLuck <git@lucko.me>2018-05-30 15:56:37 +0100
commit46b5064045f631ff66ed4a144bed6919e069bbbd (patch)
treed5aa10b4d7dd3289ba45abd74303ed6bcb4b46a4 /common/src/main/java/me/lucko/spark
parentf7c84b3ced3b079f33517f360414b406658a2a72 (diff)
downloadspark-46b5064045f631ff66ed4a144bed6919e069bbbd.tar.gz
spark-46b5064045f631ff66ed4a144bed6919e069bbbd.tar.bz2
spark-46b5064045f631ff66ed4a144bed6919e069bbbd.zip
Set a max size for the worker pool, replace synchronization with concurrent collections & a phaser
Diffstat (limited to 'common/src/main/java/me/lucko/spark')
-rw-r--r--common/src/main/java/me/lucko/spark/common/CommandHandler.java10
-rw-r--r--common/src/main/java/me/lucko/spark/profiler/Sampler.java57
-rw-r--r--common/src/main/java/me/lucko/spark/profiler/StackNode.java11
3 files changed, 40 insertions, 38 deletions
diff --git a/common/src/main/java/me/lucko/spark/common/CommandHandler.java b/common/src/main/java/me/lucko/spark/common/CommandHandler.java
index 43b2694..11f0e46 100644
--- a/common/src/main/java/me/lucko/spark/common/CommandHandler.java
+++ b/common/src/main/java/me/lucko/spark/common/CommandHandler.java
@@ -20,7 +20,8 @@ import java.util.Set;
import java.util.Timer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -44,7 +45,12 @@ public abstract class CommandHandler<T> {
/**
* The worker {@link ExecutorService} being used by the {@link #activeSampler}.
*/
- private final ExecutorService workerPool = Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("spark-worker-%d").build());
+ private final ExecutorService workerPool = new ThreadPoolExecutor(
+ 1, 6,
+ 30L, TimeUnit.SECONDS,
+ new SynchronousQueue<>(),
+ new ThreadFactoryBuilder().setNameFormat("spark-worker-%d").build()
+ );
/** Guards {@link #activeSampler} */
private final Object[] activeSamplerMutex = new Object[0];
diff --git a/common/src/main/java/me/lucko/spark/profiler/Sampler.java b/common/src/main/java/me/lucko/spark/profiler/Sampler.java
index bfad8d1..5db978c 100644
--- a/common/src/main/java/me/lucko/spark/profiler/Sampler.java
+++ b/common/src/main/java/me/lucko/spark/profiler/Sampler.java
@@ -25,16 +25,16 @@ import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Phaser;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
/**
* Main sampler class.
@@ -45,15 +45,13 @@ public class Sampler extends TimerTask {
private final ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
/** A map of root stack nodes for each thread with sampling data */
- private final Map<String, StackNode> threadData = new HashMap<>();
-
- /** A set of recorded thread info that's yet to be inserted into the node structure */
- private final Set<QueuedThreadInfo> pendingThreadData = Collections.synchronizedSet(Collections.newSetFromMap(new IdentityHashMap<>()));
+ private final Map<String, StackNode> threadData = new ConcurrentHashMap<>();
/** The worker pool for inserting stack nodes */
private ExecutorService workerPool;
- /** The lock object */
- private final Object[] lock = new Object[0];
+ /** The phaser used to track pending stack node data */
+ private final Phaser phaser = new Phaser();
+
/** A future to encapsulation the completion of this sampler instance */
private final CompletableFuture<Sampler> future = new CompletableFuture<>();
@@ -85,16 +83,14 @@ public class Sampler extends TimerTask {
}
private void insertData(QueuedThreadInfo data) {
- synchronized (this.lock) {
- try {
- StackNode node = this.threadData.computeIfAbsent(data.threadName, StackNode::new);
- node.log(data.stack, Sampler.this.interval);
- this.pendingThreadData.remove(data);
- } catch (Exception e) {
- e.printStackTrace();
- // we need to remove the pending data even if the insert failed
- this.pendingThreadData.remove(data);
- }
+ try {
+ StackNode node = this.threadData.computeIfAbsent(data.threadName, StackNode::new);
+ node.log(data.stack, Sampler.this.interval);
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ // countdown the phaser
+ this.phaser.arriveAndDeregister();
}
}
@@ -104,18 +100,14 @@ public class Sampler extends TimerTask {
* @return the data
*/
public Map<String, StackNode> getData() {
- if (this.pendingThreadData.isEmpty()) {
- return this.threadData;
- }
-
// wait for all pending data to be inserted
- while (true) {
- synchronized (this.lock) {
- if (this.pendingThreadData.isEmpty()) {
- return this.threadData;
- }
- }
+ try {
+ this.phaser.awaitAdvanceInterruptibly(this.phaser.getPhase(), 15, TimeUnit.SECONDS);
+ } catch (InterruptedException | TimeoutException e) {
+ e.printStackTrace();
}
+
+ return this.threadData;
}
public long getStartTime() {
@@ -151,8 +143,11 @@ public class Sampler extends TimerTask {
continue;
}
+ // form the queued data
QueuedThreadInfo queuedData = new QueuedThreadInfo(threadName, stack);
- this.pendingThreadData.add(queuedData);
+ // mark that this data is pending
+ this.phaser.register();
+ // schedule insertion of the data
this.workerPool.execute(queuedData);
}
} catch (Throwable t) {
diff --git a/common/src/main/java/me/lucko/spark/profiler/StackNode.java b/common/src/main/java/me/lucko/spark/profiler/StackNode.java
index 1e1ea63..b2b6f88 100644
--- a/common/src/main/java/me/lucko/spark/profiler/StackNode.java
+++ b/common/src/main/java/me/lucko/spark/profiler/StackNode.java
@@ -24,9 +24,10 @@ import com.google.gson.JsonObject;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.LongAdder;
/**
* Represents a node in the overall sampling stack.
@@ -46,12 +47,12 @@ public class StackNode implements Comparable<StackNode> {
/**
* A map of this nodes children
*/
- private final Map<String, StackNode> children = new HashMap<>();
+ private final Map<String, StackNode> children = new ConcurrentHashMap<>();
/**
* The accumulated sample time for this node
*/
- private long totalTime = 0;
+ private final LongAdder totalTime = new LongAdder();
public StackNode(String name) {
this.name = name;
@@ -80,11 +81,11 @@ public class StackNode implements Comparable<StackNode> {
}
public long getTotalTime() {
- return this.totalTime;
+ return this.totalTime.longValue();
}
public void accumulateTime(long time) {
- this.totalTime += time;
+ this.totalTime.add(time);
}
private void log(StackTraceElement[] elements, int skip, long time) {