diff options
Diffstat (limited to 'common/src/main')
3 files changed, 94 insertions, 36 deletions
diff --git a/common/src/main/java/com/sk89q/warmroast/Sampler.java b/common/src/main/java/com/sk89q/warmroast/Sampler.java index 6c4f60c..a141f82 100644 --- a/common/src/main/java/com/sk89q/warmroast/Sampler.java +++ b/common/src/main/java/com/sk89q/warmroast/Sampler.java @@ -25,31 +25,36 @@ import java.lang.management.ManagementFactory; import java.lang.management.ThreadInfo; import java.lang.management.ThreadMXBean; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; +import java.util.IdentityHashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; /** * Main sampler class. */ public class Sampler extends TimerTask { - /** - * The thread management interface for the current JVM - */ + /** The thread management interface for the current JVM */ private final ThreadMXBean threadBean = ManagementFactory.getThreadMXBean(); - /** - * A map of root stack nodes for each thread with sampling data - */ + /** A map of root stack nodes for each thread with sampling data */ private final Map<String, StackNode> threadData = new HashMap<>(); - /** - * A future to encapsulation the completion of this sampler instance - */ + /** A set of recorded thread info that's yet to be inserted into the node structure */ + private final Set<QueuedThreadInfo> pendingThreadData = Collections.synchronizedSet(Collections.newSetFromMap(new IdentityHashMap<>())); + + /** The worker pool for inserting stack nodes */ + private ExecutorService workerPool; + /** The lock object */ + private final Object[] lock = new Object[0]; + /** A future to encapsulation the completion of this sampler instance */ private final CompletableFuture<Sampler> future = new CompletableFuture<>(); /** The interval to wait between sampling, in milliseconds */ @@ -70,20 +75,47 @@ public class Sampler extends TimerTask { /** * Starts the sampler. * - * @param timer the timer to schedule the sampling on + * @param samplingThread the timer to schedule the sampling on + * @param workerPool the worker pool */ - public synchronized void start(Timer timer) { - timer.scheduleAtFixedRate(this, 0, this.interval); + public void start(Timer samplingThread, ExecutorService workerPool) { + this.workerPool = workerPool; + samplingThread.scheduleAtFixedRate(this, 0, this.interval); this.startTime = System.currentTimeMillis(); } + private void insertData(QueuedThreadInfo data) { + synchronized (this.lock) { + try { + StackNode node = this.threadData.computeIfAbsent(data.threadName, StackNode::new); + node.log(data.stack, Sampler.this.interval); + this.pendingThreadData.remove(data); + } catch (Exception e) { + e.printStackTrace(); + // we need to remove the pending data even if the insert failed + this.pendingThreadData.remove(data); + } + } + } + /** * Gets the sampling data recorded by this instance. * * @return the data */ public Map<String, StackNode> getData() { - return this.threadData; + if (this.pendingThreadData.isEmpty()) { + return this.threadData; + } + + // wait for all pending data to be inserted + while (true) { + synchronized (this.lock) { + if (this.pendingThreadData.isEmpty()) { + return this.threadData; + } + } + } } public long getStartTime() { @@ -101,12 +133,8 @@ public class Sampler extends TimerTask { return this.future; } - private StackNode getRootNode(String threadName) { - return this.threadData.computeIfAbsent(threadName, StackNode::new); - } - @Override - public synchronized void run() { + public void run() { try { if (this.endTime != -1 && this.endTime <= System.currentTimeMillis()) { this.future.complete(this); @@ -123,8 +151,9 @@ public class Sampler extends TimerTask { continue; } - StackNode node = getRootNode(threadName); - node.log(stack, this.interval); + QueuedThreadInfo queuedData = new QueuedThreadInfo(threadName, stack); + this.pendingThreadData.add(queuedData); + this.workerPool.execute(queuedData); } } catch (Throwable t) { this.future.completeExceptionally(t); @@ -153,4 +182,19 @@ public class Sampler extends TimerTask { return out; } + private final class QueuedThreadInfo implements Runnable { + private final String threadName; + private final StackTraceElement[] stack; + + private QueuedThreadInfo(String threadName, StackTraceElement[] stack) { + this.threadName = threadName; + this.stack = stack; + } + + @Override + public void run() { + insertData(this); + } + } + } diff --git a/common/src/main/java/me/lucko/spark/common/CommandHandler.java b/common/src/main/java/me/lucko/spark/common/CommandHandler.java index aa7e47d..c015cb3 100644 --- a/common/src/main/java/me/lucko/spark/common/CommandHandler.java +++ b/common/src/main/java/me/lucko/spark/common/CommandHandler.java @@ -1,8 +1,9 @@ package me.lucko.spark.common; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.gson.JsonObject; -import com.sk89q.warmroast.ThreadDumper; import com.sk89q.warmroast.Sampler; +import com.sk89q.warmroast.ThreadDumper; import me.lucko.spark.common.http.Bytebin; @@ -14,6 +15,8 @@ import java.util.List; import java.util.Map; import java.util.Timer; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -32,7 +35,12 @@ public abstract class CommandHandler<T> { /** * The {@link Timer} being used by the {@link #activeSampler}. */ - private final Timer timer = new Timer("spark-sampling-thread", true); + private final Timer samplingThread = new Timer("spark-sampling-thread", true); + + /** + * The worker {@link ExecutorService} being used by the {@link #activeSampler}. + */ + private final ExecutorService workerPool = Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("spark-worker-%d").build()); /** Guards {@link #activeSampler} */ private final Object[] activeSamplerMutex = new Object[0]; @@ -97,16 +105,16 @@ public abstract class CommandHandler<T> { int timeoutSeconds = parseInt(arguments, "timeout", "d"); if (timeoutSeconds != -1 && timeoutSeconds <= 10) { - sendPrefixedMessage(sender, "&cThe specified timeout is not long enough for accurate results to be formed."); + sendPrefixedMessage(sender, "&cThe specified timeout is not long enough for accurate results to be formed. Please choose a value greater than 10."); return; } - if (timeoutSeconds != -1 && timeoutSeconds < 100) { - sendPrefixedMessage(sender, "&7The accuracy of the output will significantly improve when sampling is able to run for longer periods. Consider setting a value of timeout over 1-2 minutes."); + if (timeoutSeconds != -1 && timeoutSeconds < 30) { + sendPrefixedMessage(sender, "&7The accuracy of the output will significantly improve when sampling is able to run for longer periods. Consider setting a timeout value over 30 seconds."); } int intervalMillis = parseInt(arguments, "interval", "i"); - if (intervalMillis == -1) { + if (intervalMillis <= 0) { intervalMillis = 10; } @@ -128,7 +136,7 @@ public abstract class CommandHandler<T> { return; } - sendPrefixedMessage(sender, "&7Starting a new sampler task..."); + sendPrefixedMessage(sender, "&7Initializing a new profiler, please wait..."); SamplerBuilder builder = new SamplerBuilder(); builder.threadDumper(threadDumper); @@ -136,15 +144,20 @@ public abstract class CommandHandler<T> { builder.completeAfter(timeoutSeconds, TimeUnit.SECONDS); } builder.samplingInterval(intervalMillis); - sampler = this.activeSampler = builder.start(timer); + sampler = this.activeSampler = builder.start(this.samplingThread, this.workerPool); - sendPrefixedMessage(sender, "&bSampling has begun!"); + sendPrefixedMessage(sender, "&bProfiler now active!"); + if (timeoutSeconds == -1) { + sendPrefixedMessage(sender, "&7Use '/profiler stop' to stop profiling and upload the results."); + } else { + sendPrefixedMessage(sender, "&7The results will be automatically returned after the profiler has been running for " + timeoutSeconds + " seconds."); + } } CompletableFuture<Sampler> future = sampler.getFuture(); // send message if profiling fails - future.whenComplete((s, throwable) -> { + future.whenCompleteAsync((s, throwable) -> { if (throwable != null) { sendPrefixedMessage(sender, "&cSampling operation failed unexpectedly. Error: " + throwable.toString()); throwable.printStackTrace(); @@ -152,7 +165,7 @@ public abstract class CommandHandler<T> { }); // set activeSampler to null when complete. - future.whenComplete((s, throwable) -> { + future.whenCompleteAsync((s, throwable) -> { synchronized (this.activeSamplerMutex) { if (sampler == this.activeSampler) { this.activeSampler = null; @@ -218,7 +231,7 @@ public abstract class CommandHandler<T> { JsonObject output = sampler.formOutput(); try { String pasteId = Bytebin.postContent(output); - sendPrefixedMessage(sender, "&bSampling results can be viewed here: &7" + VIEWER_URL + pasteId); + sendPrefixedMessage(sender, "&bSampling results: &7" + VIEWER_URL + pasteId); } catch (IOException e) { sendPrefixedMessage(sender, "&cAn error occurred whilst uploading the results."); e.printStackTrace(); diff --git a/common/src/main/java/me/lucko/spark/common/SamplerBuilder.java b/common/src/main/java/me/lucko/spark/common/SamplerBuilder.java index e6c0cf8..a15bcf6 100644 --- a/common/src/main/java/me/lucko/spark/common/SamplerBuilder.java +++ b/common/src/main/java/me/lucko/spark/common/SamplerBuilder.java @@ -1,10 +1,11 @@ package me.lucko.spark.common; import com.google.common.base.Preconditions; -import com.sk89q.warmroast.ThreadDumper; import com.sk89q.warmroast.Sampler; +import com.sk89q.warmroast.ThreadDumper; import java.util.Timer; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; /** @@ -35,9 +36,9 @@ public class SamplerBuilder { return this; } - public Sampler start(Timer timer) { - Sampler sampler = new Sampler(samplingInterval, threadDumper, timeout); - sampler.start(timer); + public Sampler start(Timer samplingThread, ExecutorService workerPool) { + Sampler sampler = new Sampler(this.samplingInterval, this.threadDumper, this.timeout); + sampler.start(samplingThread, workerPool); return sampler; } |