aboutsummaryrefslogtreecommitdiff
path: root/common/src/main/java/me/lucko/spark/profiler/Sampler.java
diff options
context:
space:
mode:
authorLuck <git@lucko.me>2018-05-30 15:56:37 +0100
committerLuck <git@lucko.me>2018-05-30 15:56:37 +0100
commit46b5064045f631ff66ed4a144bed6919e069bbbd (patch)
treed5aa10b4d7dd3289ba45abd74303ed6bcb4b46a4 /common/src/main/java/me/lucko/spark/profiler/Sampler.java
parentf7c84b3ced3b079f33517f360414b406658a2a72 (diff)
downloadspark-46b5064045f631ff66ed4a144bed6919e069bbbd.tar.gz
spark-46b5064045f631ff66ed4a144bed6919e069bbbd.tar.bz2
spark-46b5064045f631ff66ed4a144bed6919e069bbbd.zip
Set a max size for the worker pool, replace synchronization with concurrent collections & a phaser
Diffstat (limited to 'common/src/main/java/me/lucko/spark/profiler/Sampler.java')
-rw-r--r--common/src/main/java/me/lucko/spark/profiler/Sampler.java57
1 files changed, 26 insertions, 31 deletions
diff --git a/common/src/main/java/me/lucko/spark/profiler/Sampler.java b/common/src/main/java/me/lucko/spark/profiler/Sampler.java
index bfad8d1..5db978c 100644
--- a/common/src/main/java/me/lucko/spark/profiler/Sampler.java
+++ b/common/src/main/java/me/lucko/spark/profiler/Sampler.java
@@ -25,16 +25,16 @@ import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Phaser;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
/**
* Main sampler class.
@@ -45,15 +45,13 @@ public class Sampler extends TimerTask {
private final ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
/** A map of root stack nodes for each thread with sampling data */
- private final Map<String, StackNode> threadData = new HashMap<>();
-
- /** A set of recorded thread info that's yet to be inserted into the node structure */
- private final Set<QueuedThreadInfo> pendingThreadData = Collections.synchronizedSet(Collections.newSetFromMap(new IdentityHashMap<>()));
+ private final Map<String, StackNode> threadData = new ConcurrentHashMap<>();
/** The worker pool for inserting stack nodes */
private ExecutorService workerPool;
- /** The lock object */
- private final Object[] lock = new Object[0];
+ /** The phaser used to track pending stack node data */
+ private final Phaser phaser = new Phaser();
+
/** A future to encapsulation the completion of this sampler instance */
private final CompletableFuture<Sampler> future = new CompletableFuture<>();
@@ -85,16 +83,14 @@ public class Sampler extends TimerTask {
}
private void insertData(QueuedThreadInfo data) {
- synchronized (this.lock) {
- try {
- StackNode node = this.threadData.computeIfAbsent(data.threadName, StackNode::new);
- node.log(data.stack, Sampler.this.interval);
- this.pendingThreadData.remove(data);
- } catch (Exception e) {
- e.printStackTrace();
- // we need to remove the pending data even if the insert failed
- this.pendingThreadData.remove(data);
- }
+ try {
+ StackNode node = this.threadData.computeIfAbsent(data.threadName, StackNode::new);
+ node.log(data.stack, Sampler.this.interval);
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ // countdown the phaser
+ this.phaser.arriveAndDeregister();
}
}
@@ -104,18 +100,14 @@ public class Sampler extends TimerTask {
* @return the data
*/
public Map<String, StackNode> getData() {
- if (this.pendingThreadData.isEmpty()) {
- return this.threadData;
- }
-
// wait for all pending data to be inserted
- while (true) {
- synchronized (this.lock) {
- if (this.pendingThreadData.isEmpty()) {
- return this.threadData;
- }
- }
+ try {
+ this.phaser.awaitAdvanceInterruptibly(this.phaser.getPhase(), 15, TimeUnit.SECONDS);
+ } catch (InterruptedException | TimeoutException e) {
+ e.printStackTrace();
}
+
+ return this.threadData;
}
public long getStartTime() {
@@ -151,8 +143,11 @@ public class Sampler extends TimerTask {
continue;
}
+ // form the queued data
QueuedThreadInfo queuedData = new QueuedThreadInfo(threadName, stack);
- this.pendingThreadData.add(queuedData);
+ // mark that this data is pending
+ this.phaser.register();
+ // schedule insertion of the data
this.workerPool.execute(queuedData);
}
} catch (Throwable t) {