From c29e96eb5d7981bbb1d9b54e92cbc9f22319efb8 Mon Sep 17 00:00:00 2001
From: Luck <git@lucko.me>
Date: Mon, 28 May 2018 15:35:53 +0100
Subject: Handle node insertion in separate worker threads, to reduce the work
 load on the sampling thread

and some other misc changes
---
 .../src/main/java/com/sk89q/warmroast/Sampler.java | 84 ++++++++++++++++------
 .../java/me/lucko/spark/common/CommandHandler.java | 37 ++++++----
 .../java/me/lucko/spark/common/SamplerBuilder.java |  9 +--
 3 files changed, 94 insertions(+), 36 deletions(-)

(limited to 'common/src/main/java')

diff --git a/common/src/main/java/com/sk89q/warmroast/Sampler.java b/common/src/main/java/com/sk89q/warmroast/Sampler.java
index 6c4f60c..a141f82 100644
--- a/common/src/main/java/com/sk89q/warmroast/Sampler.java
+++ b/common/src/main/java/com/sk89q/warmroast/Sampler.java
@@ -25,31 +25,36 @@ import java.lang.management.ManagementFactory;
 import java.lang.management.ThreadInfo;
 import java.lang.management.ThreadMXBean;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.IdentityHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
 
 /**
  * Main sampler class.
  */
 public class Sampler extends TimerTask {
 
-    /**
-     * The thread management interface for the current JVM
-     */
+    /** The thread management interface for the current JVM */
     private final ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
 
-    /**
-     * A map of root stack nodes for each thread with sampling data
-     */
+    /** A map of root stack nodes for each thread with sampling data */
     private final Map<String, StackNode> threadData = new HashMap<>();
 
-    /**
-     * A future to encapsulation the completion of this sampler instance
-     */
+    /** A set of recorded thread info that's yet to be inserted into the node structure */
+    private final Set<QueuedThreadInfo> pendingThreadData = Collections.synchronizedSet(Collections.newSetFromMap(new IdentityHashMap<>()));
+
+    /** The worker pool for inserting stack nodes */
+    private ExecutorService workerPool;
+    /** The lock object */
+    private final Object[] lock = new Object[0];
+    /** A future to encapsulation the completion of this sampler instance */
     private final CompletableFuture<Sampler> future = new CompletableFuture<>();
 
     /** The interval to wait between sampling, in milliseconds */
@@ -70,20 +75,47 @@ public class Sampler extends TimerTask {
     /**
      * Starts the sampler.
      *
-     * @param timer the timer to schedule the sampling on
+     * @param samplingThread the timer to schedule the sampling on
+     * @param workerPool the worker pool
      */
-    public synchronized void start(Timer timer) {
-        timer.scheduleAtFixedRate(this, 0, this.interval);
+    public void start(Timer samplingThread, ExecutorService workerPool) {
+        this.workerPool = workerPool;
+        samplingThread.scheduleAtFixedRate(this, 0, this.interval);
         this.startTime = System.currentTimeMillis();
     }
 
+    private void insertData(QueuedThreadInfo data) {
+        synchronized (this.lock) {
+            try {
+                StackNode node = this.threadData.computeIfAbsent(data.threadName, StackNode::new);
+                node.log(data.stack, Sampler.this.interval);
+                this.pendingThreadData.remove(data);
+            } catch (Exception e) {
+                e.printStackTrace();
+                // we need to remove the pending data even if the insert failed
+                this.pendingThreadData.remove(data);
+            }
+        }
+    }
+
     /**
      * Gets the sampling data recorded by this instance.
      *
      * @return the data
      */
     public Map<String, StackNode> getData() {
-        return this.threadData;
+        if (this.pendingThreadData.isEmpty()) {
+            return this.threadData;
+        }
+
+        // wait for all pending data to be inserted
+        while (true) {
+            synchronized (this.lock) {
+                if (this.pendingThreadData.isEmpty()) {
+                    return this.threadData;
+                }
+            }
+        }
     }
 
     public long getStartTime() {
@@ -101,12 +133,8 @@ public class Sampler extends TimerTask {
         return this.future;
     }
 
-    private StackNode getRootNode(String threadName) {
-        return this.threadData.computeIfAbsent(threadName, StackNode::new);
-    }
-
     @Override
-    public synchronized void run() {
+    public void run() {
         try {
             if (this.endTime != -1 && this.endTime <= System.currentTimeMillis()) {
                 this.future.complete(this);
@@ -123,8 +151,9 @@ public class Sampler extends TimerTask {
                     continue;
                 }
 
-                StackNode node = getRootNode(threadName);
-                node.log(stack, this.interval);
+                QueuedThreadInfo queuedData = new QueuedThreadInfo(threadName, stack);
+                this.pendingThreadData.add(queuedData);
+                this.workerPool.execute(queuedData);
             }
         } catch (Throwable t) {
             this.future.completeExceptionally(t);
@@ -153,4 +182,19 @@ public class Sampler extends TimerTask {
         return out;
     }
 
+    private final class QueuedThreadInfo implements Runnable {
+        private final String threadName;
+        private final StackTraceElement[] stack;
+
+        private QueuedThreadInfo(String threadName, StackTraceElement[] stack) {
+            this.threadName = threadName;
+            this.stack = stack;
+        }
+
+        @Override
+        public void run() {
+            insertData(this);
+        }
+    }
+
 }
diff --git a/common/src/main/java/me/lucko/spark/common/CommandHandler.java b/common/src/main/java/me/lucko/spark/common/CommandHandler.java
index aa7e47d..c015cb3 100644
--- a/common/src/main/java/me/lucko/spark/common/CommandHandler.java
+++ b/common/src/main/java/me/lucko/spark/common/CommandHandler.java
@@ -1,8 +1,9 @@
 package me.lucko.spark.common;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.gson.JsonObject;
-import com.sk89q.warmroast.ThreadDumper;
 import com.sk89q.warmroast.Sampler;
+import com.sk89q.warmroast.ThreadDumper;
 
 import me.lucko.spark.common.http.Bytebin;
 
@@ -14,6 +15,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Timer;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -32,7 +35,12 @@ public abstract class CommandHandler<T> {
     /**
      * The {@link Timer} being used by the {@link #activeSampler}.
      */
-    private final Timer timer = new Timer("spark-sampling-thread", true);
+    private final Timer samplingThread = new Timer("spark-sampling-thread", true);
+
+    /**
+     * The worker {@link ExecutorService} being used by the {@link #activeSampler}.
+     */
+    private final ExecutorService workerPool = Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("spark-worker-%d").build());
 
     /** Guards {@link #activeSampler} */
     private final Object[] activeSamplerMutex = new Object[0];
@@ -97,16 +105,16 @@ public abstract class CommandHandler<T> {
 
         int timeoutSeconds = parseInt(arguments, "timeout", "d");
         if (timeoutSeconds != -1 && timeoutSeconds <= 10) {
-            sendPrefixedMessage(sender, "&cThe specified timeout is not long enough for accurate results to be formed.");
+            sendPrefixedMessage(sender, "&cThe specified timeout is not long enough for accurate results to be formed. Please choose a value greater than 10.");
             return;
         }
 
-        if (timeoutSeconds != -1 && timeoutSeconds < 100) {
-            sendPrefixedMessage(sender, "&7The accuracy of the output will significantly improve when sampling is able to run for longer periods. Consider setting a value of timeout over 1-2 minutes.");
+        if (timeoutSeconds != -1 && timeoutSeconds < 30) {
+            sendPrefixedMessage(sender, "&7The accuracy of the output will significantly improve when sampling is able to run for longer periods. Consider setting a timeout value over 30 seconds.");
         }
 
         int intervalMillis = parseInt(arguments, "interval", "i");
-        if (intervalMillis == -1) {
+        if (intervalMillis <= 0) {
             intervalMillis = 10;
         }
 
@@ -128,7 +136,7 @@ public abstract class CommandHandler<T> {
                 return;
             }
 
-            sendPrefixedMessage(sender, "&7Starting a new sampler task...");
+            sendPrefixedMessage(sender, "&7Initializing a new profiler, please wait...");
 
             SamplerBuilder builder = new SamplerBuilder();
             builder.threadDumper(threadDumper);
@@ -136,15 +144,20 @@ public abstract class CommandHandler<T> {
                 builder.completeAfter(timeoutSeconds, TimeUnit.SECONDS);
             }
             builder.samplingInterval(intervalMillis);
-            sampler = this.activeSampler = builder.start(timer);
+            sampler = this.activeSampler = builder.start(this.samplingThread, this.workerPool);
 
-            sendPrefixedMessage(sender, "&bSampling has begun!");
+            sendPrefixedMessage(sender, "&bProfiler now active!");
+            if (timeoutSeconds == -1) {
+                sendPrefixedMessage(sender, "&7Use '/profiler stop' to stop profiling and upload the results.");
+            } else {
+                sendPrefixedMessage(sender, "&7The results will be automatically returned after the profiler has been running for " + timeoutSeconds + " seconds.");
+            }
         }
 
         CompletableFuture<Sampler> future = sampler.getFuture();
 
         // send message if profiling fails
-        future.whenComplete((s, throwable) -> {
+        future.whenCompleteAsync((s, throwable) -> {
             if (throwable != null) {
                 sendPrefixedMessage(sender, "&cSampling operation failed unexpectedly. Error: " + throwable.toString());
                 throwable.printStackTrace();
@@ -152,7 +165,7 @@ public abstract class CommandHandler<T> {
         });
 
         // set activeSampler to null when complete.
-        future.whenComplete((s, throwable) -> {
+        future.whenCompleteAsync((s, throwable) -> {
             synchronized (this.activeSamplerMutex) {
                 if (sampler == this.activeSampler) {
                     this.activeSampler = null;
@@ -218,7 +231,7 @@ public abstract class CommandHandler<T> {
             JsonObject output = sampler.formOutput();
             try {
                 String pasteId = Bytebin.postContent(output);
-                sendPrefixedMessage(sender, "&bSampling results can be viewed here: &7" + VIEWER_URL + pasteId);
+                sendPrefixedMessage(sender, "&bSampling results: &7" + VIEWER_URL + pasteId);
             } catch (IOException e) {
                 sendPrefixedMessage(sender, "&cAn error occurred whilst uploading the results.");
                 e.printStackTrace();
diff --git a/common/src/main/java/me/lucko/spark/common/SamplerBuilder.java b/common/src/main/java/me/lucko/spark/common/SamplerBuilder.java
index e6c0cf8..a15bcf6 100644
--- a/common/src/main/java/me/lucko/spark/common/SamplerBuilder.java
+++ b/common/src/main/java/me/lucko/spark/common/SamplerBuilder.java
@@ -1,10 +1,11 @@
 package me.lucko.spark.common;
 
 import com.google.common.base.Preconditions;
-import com.sk89q.warmroast.ThreadDumper;
 import com.sk89q.warmroast.Sampler;
+import com.sk89q.warmroast.ThreadDumper;
 
 import java.util.Timer;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -35,9 +36,9 @@ public class SamplerBuilder {
         return this;
     }
 
-    public Sampler start(Timer timer) {
-        Sampler sampler = new Sampler(samplingInterval, threadDumper, timeout);
-        sampler.start(timer);
+    public Sampler start(Timer samplingThread, ExecutorService workerPool) {
+        Sampler sampler = new Sampler(this.samplingInterval, this.threadDumper, this.timeout);
+        sampler.start(samplingThread, workerPool);
         return sampler;
     }
 
-- 
cgit