diff options
13 files changed, 697 insertions, 90 deletions
diff --git a/bukkit/src/main/java/me/lucko/spark/bukkit/BukkitTickCounter.java b/bukkit/src/main/java/me/lucko/spark/bukkit/BukkitTickCounter.java new file mode 100644 index 0000000..61a7690 --- /dev/null +++ b/bukkit/src/main/java/me/lucko/spark/bukkit/BukkitTickCounter.java @@ -0,0 +1,55 @@ +package me.lucko.spark.bukkit; + +import me.lucko.spark.profiler.TickCounter; + +import org.bukkit.plugin.Plugin; +import org.bukkit.scheduler.BukkitTask; + +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.atomic.LongAdder; + +public class BukkitTickCounter implements TickCounter, Runnable { + private final Plugin plugin; + private BukkitTask task; + + private final Set<Runnable> tasks = new HashSet<>(); + private final LongAdder tick = new LongAdder(); + + public BukkitTickCounter(Plugin plugin) { + this.plugin = plugin; + } + + @Override + public void run() { + this.tick.increment(); + for (Runnable r : this.tasks){ + r.run(); + } + } + + @Override + public void start() { + this.task = this.plugin.getServer().getScheduler().runTaskTimer(this.plugin, this, 1, 1); + } + + @Override + public void close() { + this.task.cancel(); + } + + @Override + public long getCurrentTick() { + return this.tick.longValue(); + } + + @Override + public void addTickTask(Runnable runnable) { + this.tasks.add(runnable); + } + + @Override + public void removeTickTask(Runnable runnable) { + this.tasks.remove(runnable); + } +} diff --git a/bukkit/src/main/java/me/lucko/spark/bukkit/SparkBukkitPlugin.java b/bukkit/src/main/java/me/lucko/spark/bukkit/SparkBukkitPlugin.java index 8b3ed28..dc432c5 100644 --- a/bukkit/src/main/java/me/lucko/spark/bukkit/SparkBukkitPlugin.java +++ b/bukkit/src/main/java/me/lucko/spark/bukkit/SparkBukkitPlugin.java @@ -2,23 +2,46 @@ package me.lucko.spark.bukkit; import me.lucko.spark.common.CommandHandler; import me.lucko.spark.profiler.ThreadDumper; +import me.lucko.spark.profiler.TickCounter; import org.bukkit.ChatColor; import org.bukkit.command.Command; import org.bukkit.command.CommandSender; +import org.bukkit.entity.Player; import org.bukkit.plugin.java.JavaPlugin; public class SparkBukkitPlugin extends JavaPlugin { private final CommandHandler<CommandSender> commandHandler = new CommandHandler<CommandSender>() { + + private String colorize(String message) { + return ChatColor.translateAlternateColorCodes('&', message); + } + + private void broadcast(String msg) { + getServer().getConsoleSender().sendMessage(msg); + for (Player player : getServer().getOnlinePlayers()) { + if (player.hasPermission("spark.profiler")) { + player.sendMessage(msg); + } + } + } + @Override protected void sendMessage(CommandSender sender, String message) { - sender.sendMessage(ChatColor.translateAlternateColorCodes('&', message)); + sender.sendMessage(colorize(message)); } @Override - protected void sendLink(CommandSender sender, String url) { - sendMessage(sender, "&7" + url); + protected void sendMessage(String message) { + String msg = colorize(message); + broadcast(msg); + } + + @Override + protected void sendLink(String url) { + String msg = colorize("&7" + url); + broadcast(msg); } @Override @@ -30,6 +53,11 @@ public class SparkBukkitPlugin extends JavaPlugin { protected ThreadDumper getDefaultThreadDumper() { return new ThreadDumper.Specific(new long[]{Thread.currentThread().getId()}); } + + @Override + protected TickCounter newTickCounter() { + return new BukkitTickCounter(SparkBukkitPlugin.this); + } }; @Override diff --git a/bungeecord/src/main/java/me/lucko/spark/bungeecord/SparkBungeeCordPlugin.java b/bungeecord/src/main/java/me/lucko/spark/bungeecord/SparkBungeeCordPlugin.java index 717497c..59bab67 100644 --- a/bungeecord/src/main/java/me/lucko/spark/bungeecord/SparkBungeeCordPlugin.java +++ b/bungeecord/src/main/java/me/lucko/spark/bungeecord/SparkBungeeCordPlugin.java @@ -2,28 +2,49 @@ package me.lucko.spark.bungeecord; import me.lucko.spark.common.CommandHandler; import me.lucko.spark.profiler.ThreadDumper; +import me.lucko.spark.profiler.TickCounter; import net.md_5.bungee.api.ChatColor; import net.md_5.bungee.api.CommandSender; +import net.md_5.bungee.api.chat.BaseComponent; import net.md_5.bungee.api.chat.ClickEvent; import net.md_5.bungee.api.chat.TextComponent; +import net.md_5.bungee.api.connection.ProxiedPlayer; import net.md_5.bungee.api.plugin.Command; import net.md_5.bungee.api.plugin.Plugin; public class SparkBungeeCordPlugin extends Plugin { private final CommandHandler<CommandSender> commandHandler = new CommandHandler<CommandSender>() { + private BaseComponent[] colorize(String message) { + return TextComponent.fromLegacyText(ChatColor.translateAlternateColorCodes('&', message)); + } + + private void broadcast(BaseComponent... msg) { + getProxy().getConsole().sendMessage(msg); + for (ProxiedPlayer player : getProxy().getPlayers()) { + if (player.hasPermission("spark.profiler")) { + player.sendMessage(msg); + } + } + } + @Override protected void sendMessage(CommandSender sender, String message) { - sender.sendMessage(TextComponent.fromLegacyText(ChatColor.translateAlternateColorCodes('&', message))); + sender.sendMessage(colorize(message)); } @Override - protected void sendLink(CommandSender sender, String url) { + protected void sendMessage(String message) { + broadcast(colorize(message)); + } + + @Override + protected void sendLink(String url) { TextComponent component = new TextComponent(url); component.setColor(ChatColor.GRAY); component.setClickEvent(new ClickEvent(ClickEvent.Action.OPEN_URL, url)); - sender.sendMessage(component); + broadcast(component); } @Override @@ -35,6 +56,11 @@ public class SparkBungeeCordPlugin extends Plugin { protected ThreadDumper getDefaultThreadDumper() { return new ThreadDumper.All(); } + + @Override + protected TickCounter newTickCounter() { + throw new UnsupportedOperationException(); + } }; @Override diff --git a/common/src/main/java/me/lucko/spark/common/CommandHandler.java b/common/src/main/java/me/lucko/spark/common/CommandHandler.java index 6664c18..a9bbe61 100644 --- a/common/src/main/java/me/lucko/spark/common/CommandHandler.java +++ b/common/src/main/java/me/lucko/spark/common/CommandHandler.java @@ -10,6 +10,7 @@ import me.lucko.spark.profiler.Sampler; import me.lucko.spark.profiler.SamplerBuilder; import me.lucko.spark.profiler.ThreadDumper; import me.lucko.spark.profiler.ThreadGrouper; +import me.lucko.spark.profiler.TickCounter; import java.io.IOException; import java.util.ArrayList; @@ -33,6 +34,8 @@ public abstract class CommandHandler<T> { /** The URL of the viewer frontend */ private static final String VIEWER_URL = "https://sparkprofiler.github.io/?"; + /** The prefix used in all messages */ + private static final String PREFIX = "&8[&fspark&8] &7"; /** * The {@link Timer} being used by the {@link #activeSampler}. @@ -43,17 +46,25 @@ public abstract class CommandHandler<T> { private final Object[] activeSamplerMutex = new Object[0]; /** The WarmRoast instance currently running, if any */ private Sampler activeSampler = null; + /** The tick monitor instance currently running, if any */ + private ReportingTickMonitor activeTickMonitor = null; // abstract methods implemented by each platform protected abstract void sendMessage(T sender, String message); - protected abstract void sendLink(T sender, String url); + protected abstract void sendMessage(String message); + protected abstract void sendLink(String url); protected abstract void runAsync(Runnable r); protected abstract ThreadDumper getDefaultThreadDumper(); + protected abstract TickCounter newTickCounter(); private void sendPrefixedMessage(T sender, String message) { - sendMessage(sender, "&8[&fspark&8] &7" + message); + sendMessage(sender, PREFIX + message); + } + + private void sendPrefixedMessage(String message) { + sendMessage(PREFIX + message); } public void handleCommand(T sender, String[] args) { @@ -79,6 +90,9 @@ public abstract class CommandHandler<T> { case "paste": handleStop(sender); break; + case "monitoring": + handleMonitoring(sender, arguments); + break; default: sendInfo(sender); break; @@ -95,9 +109,11 @@ public abstract class CommandHandler<T> { sendMessage(sender, " &8[&7--thread&8 <thread name>]"); sendMessage(sender, " &8[&7--not-combined]"); sendMessage(sender, " &8[&7--interval&8 <interval millis>]"); + sendMessage(sender, " &8[&7--only-ticks-over&8 <tick length millis>]"); sendMessage(sender, "&b&l> &7/profiler info"); sendMessage(sender, "&b&l> &7/profiler stop"); sendMessage(sender, "&b&l> &7/profiler cancel"); + sendMessage(sender, "&b&l> &7/profiler monitoring"); } private void handleStart(T sender, List<String> args) { @@ -136,6 +152,17 @@ public abstract class CommandHandler<T> { threadGrouper = ThreadGrouper.BY_POOL; } + int ticksOver = parseInt(arguments, "only-ticks-over", "o"); + TickCounter tickCounter = null; + if (ticksOver != -1) { + try { + tickCounter = newTickCounter(); + } catch (UnsupportedOperationException e) { + sendPrefixedMessage(sender, "&cTick counting is not supported on BungeeCord!"); + return; + } + } + Sampler sampler; synchronized (this.activeSamplerMutex) { if (this.activeSampler != null) { @@ -143,7 +170,7 @@ public abstract class CommandHandler<T> { return; } - sendPrefixedMessage(sender, "&7Initializing a new profiler, please wait..."); + sendPrefixedMessage("&7Initializing a new profiler, please wait..."); SamplerBuilder builder = new SamplerBuilder(); builder.threadDumper(threadDumper); @@ -152,13 +179,16 @@ public abstract class CommandHandler<T> { builder.completeAfter(timeoutSeconds, TimeUnit.SECONDS); } builder.samplingInterval(intervalMillis); + if (ticksOver != -1) { + builder.ticksOver(ticksOver, tickCounter); + } sampler = this.activeSampler = builder.start(this.samplingThread); - sendPrefixedMessage(sender, "&bProfiler now active!"); + sendPrefixedMessage("&bProfiler now active!"); if (timeoutSeconds == -1) { - sendPrefixedMessage(sender, "&7Use '/profiler stop' to stop profiling and upload the results."); + sendPrefixedMessage("&7Use '/profiler stop' to stop profiling and upload the results."); } else { - sendPrefixedMessage(sender, "&7The results will be automatically returned after the profiler has been running for " + timeoutSeconds + " seconds."); + sendPrefixedMessage("&7The results will be automatically returned after the profiler has been running for " + timeoutSeconds + " seconds."); } } @@ -167,7 +197,7 @@ public abstract class CommandHandler<T> { // send message if profiling fails future.whenCompleteAsync((s, throwable) -> { if (throwable != null) { - sendPrefixedMessage(sender, "&cSampling operation failed unexpectedly. Error: " + throwable.toString()); + sendPrefixedMessage("&cSampling operation failed unexpectedly. Error: " + throwable.toString()); throwable.printStackTrace(); } }); @@ -184,8 +214,8 @@ public abstract class CommandHandler<T> { // await the result if (timeoutSeconds != -1) { future.thenAcceptAsync(s -> { - sendPrefixedMessage(sender, "&7The active sampling operation has completed! Uploading results..."); - handleUpload(sender, s); + sendPrefixedMessage("&7The active sampling operation has completed! Uploading results..."); + handleUpload(s); }); } } @@ -215,8 +245,8 @@ public abstract class CommandHandler<T> { sendPrefixedMessage(sender, "&7There isn't an active sampling task running."); } else { this.activeSampler.cancel(); - sendPrefixedMessage(sender, "&7The active sampling operation has been stopped! Uploading results..."); - handleUpload(sender, this.activeSampler); + sendPrefixedMessage("&7The active sampling operation has been stopped! Uploading results..."); + handleUpload(this.activeSampler); this.activeSampler = null; } } @@ -229,25 +259,59 @@ public abstract class CommandHandler<T> { } else { this.activeSampler.cancel(); this.activeSampler = null; - sendPrefixedMessage(sender, "&bThe active sampling task has been cancelled."); + sendPrefixedMessage("&bThe active sampling task has been cancelled."); } } } - private void handleUpload(T sender, Sampler sampler) { + private void handleUpload(Sampler sampler) { runAsync(() -> { JsonObject output = sampler.formOutput(); try { String pasteId = Bytebin.postContent(output); - sendPrefixedMessage(sender, "&bSampling results:"); - sendLink(sender, VIEWER_URL + pasteId); + sendPrefixedMessage("&bSampling results:"); + sendLink(VIEWER_URL + pasteId); } catch (IOException e) { - sendPrefixedMessage(sender, "&cAn error occurred whilst uploading the results."); + sendPrefixedMessage("&cAn error occurred whilst uploading the results."); e.printStackTrace(); } }); } + private void handleMonitoring(T sender, List<String> args) { + SetMultimap<String, String> arguments = parseArguments(args); + + if (this.activeTickMonitor == null) { + + int threshold = parseInt(arguments, "threshold", "t"); + if (threshold == -1) { + threshold = 100; + } + + try { + TickCounter tickCounter = newTickCounter(); + this.activeTickMonitor = new ReportingTickMonitor(tickCounter, threshold); + } catch (UnsupportedOperationException e) { + sendPrefixedMessage(sender, "&cNot supported on BungeeCord!"); + } + } else { + this.activeTickMonitor.close(); + this.activeTickMonitor = null; + sendPrefixedMessage("&7Tick monitor disabled."); + } + } + + private class ReportingTickMonitor extends TickMonitor { + public ReportingTickMonitor(TickCounter tickCounter, int percentageChangeThreshold) { + super(tickCounter, percentageChangeThreshold); + } + + @Override + protected void sendMessage(String message) { + sendPrefixedMessage(message); + } + } + private int parseInt(SetMultimap<String, String> arguments, String longArg, String shortArg) { Iterator<String> it = Sets.union(arguments.get(longArg), arguments.get(shortArg)).iterator(); if (it.hasNext()) { diff --git a/common/src/main/java/me/lucko/spark/common/TickMonitor.java b/common/src/main/java/me/lucko/spark/common/TickMonitor.java new file mode 100644 index 0000000..e6342c4 --- /dev/null +++ b/common/src/main/java/me/lucko/spark/common/TickMonitor.java @@ -0,0 +1,87 @@ +package me.lucko.spark.common; + +import me.lucko.spark.profiler.TickCounter; + +import java.text.DecimalFormat; +import java.util.DoubleSummaryStatistics; + +public abstract class TickMonitor implements Runnable { + private static final DecimalFormat df = new DecimalFormat("#.##"); + + private final TickCounter tickCounter; + private final int percentageChangeThreshold; + + // data + private double lastTickTime = 0; + private State state = null; + private DoubleSummaryStatistics averageTickTime = new DoubleSummaryStatistics(); + + public TickMonitor(TickCounter tickCounter, int percentageChangeThreshold) { + this.tickCounter = tickCounter; + this.percentageChangeThreshold = percentageChangeThreshold; + + this.tickCounter.start(); + this.tickCounter.addTickTask(this); + } + + protected abstract void sendMessage(String message); + + public void close() { + this.tickCounter.close(); + } + + @Override + public void run() { + double now = ((double) System.nanoTime()) / 1000000d; + + // init + if (this.state == null) { + this.state = State.SETUP; + this.lastTickTime = now; + sendMessage("Tick monitor started. Before the monitor becomes fully active, the server's " + + "average tick rate will be calculated over a period of 100 ticks (approx 5 seconds)."); + return; + } + + // find the diff + double diff = now - this.lastTickTime; + this.lastTickTime = now; + + // form averages + if (this.state == State.SETUP) { + this.averageTickTime.accept(diff); + + // move onto the next state + if (this.averageTickTime.getCount() >= 100) { + + sendMessage("&bAnalysis is now complete."); + sendMessage("&f> &7Max: " + df.format(this.averageTickTime.getMax()) + "ms"); + sendMessage("&f> &7Min: " + df.format(this.averageTickTime.getMin()) + "ms"); + sendMessage("&f> &7Avg: " + df.format(this.averageTickTime.getAverage()) + "ms"); + sendMessage("Starting now, any ticks with >" + this.percentageChangeThreshold + "% increase in " + + "duration compared to the average will be reported."); + this.state = State.MONITORING; + } + } + + if (this.state == State.MONITORING) { + double avg = this.averageTickTime.getAverage(); + + double increase = diff - avg; + if (increase <= 0) { + return; + } + + double percentageChange = (increase * 100d) / avg; + if (percentageChange > this.percentageChangeThreshold) { + sendMessage("&7Tick &8#" + this.tickCounter.getCurrentTick() + " &7lasted &b" + df.format(diff) + "&7 milliseconds. " + + "&7(a &b" + df.format(percentageChange) + "% &7increase from average)"); + } + } + } + + private enum State { + SETUP, + MONITORING + } +} diff --git a/common/src/main/java/me/lucko/spark/profiler/AsyncDataAggregator.java b/common/src/main/java/me/lucko/spark/profiler/AsyncDataAggregator.java new file mode 100644 index 0000000..9a4090e --- /dev/null +++ b/common/src/main/java/me/lucko/spark/profiler/AsyncDataAggregator.java @@ -0,0 +1,77 @@ +package me.lucko.spark.profiler; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * Implementation of {@link DataAggregator} that makes use of a "worker" thread pool for inserting + * data. + */ +public class AsyncDataAggregator implements DataAggregator { + + /** A map of root stack nodes for each thread with sampling data */ + private final Map<String, StackNode> threadData = new ConcurrentHashMap<>(); + + /** The worker pool for inserting stack nodes */ + private final ExecutorService workerPool; + + /** The instance used to group threads together */ + private final ThreadGrouper threadGrouper; + + /** The interval to wait between sampling, in milliseconds */ + private final int interval; + + public AsyncDataAggregator(ExecutorService workerPool, ThreadGrouper threadGrouper, int interval) { + this.workerPool = workerPool; + this.threadGrouper = threadGrouper; + this.interval = interval; + } + + @Override + public void insertData(String threadName, StackTraceElement[] stack) { + // form the queued data + QueuedThreadInfo queuedData = new QueuedThreadInfo(threadName, stack); + // schedule insertion of the data + this.workerPool.execute(queuedData); + } + + @Override + public Map<String, StackNode> getData() { + // wait for all pending data to be inserted + this.workerPool.shutdown(); + try { + this.workerPool.awaitTermination(15, TimeUnit.SECONDS); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + return this.threadData; + } + + void insertData(QueuedThreadInfo data) { + try { + String group = this.threadGrouper.getGroup(data.threadName); + StackNode node = this.threadData.computeIfAbsent(group, StackNode::new); + node.log(data.stack, this.interval); + } catch (Exception e) { + e.printStackTrace(); + } + } + + private final class QueuedThreadInfo implements Runnable { + private final String threadName; + private final StackTraceElement[] stack; + + QueuedThreadInfo(String threadName, StackTraceElement[] stack) { + this.threadName = threadName; + this.stack = stack; + } + + @Override + public void run() { + insertData(this); + } + } +} diff --git a/common/src/main/java/me/lucko/spark/profiler/DataAggregator.java b/common/src/main/java/me/lucko/spark/profiler/DataAggregator.java new file mode 100644 index 0000000..1afa52c --- /dev/null +++ b/common/src/main/java/me/lucko/spark/profiler/DataAggregator.java @@ -0,0 +1,32 @@ +package me.lucko.spark.profiler; + +import java.util.Map; + +/** + * Aggregates sampling data. + */ +public interface DataAggregator { + + /** + * Called before the sampler begins to insert data + */ + default void start() { + + } + + /** + * Forms the output data + * + * @return the output data + */ + Map<String, StackNode> getData(); + + /** + * Inserts sampling data into this aggregator + * + * @param threadName the name of the thread + * @param stack the call stack + */ + void insertData(String threadName, StackTraceElement[] stack); + +} diff --git a/common/src/main/java/me/lucko/spark/profiler/Sampler.java b/common/src/main/java/me/lucko/spark/profiler/Sampler.java index d03b4b6..44cf445 100644 --- a/common/src/main/java/me/lucko/spark/profiler/Sampler.java +++ b/common/src/main/java/me/lucko/spark/profiler/Sampler.java @@ -31,10 +31,8 @@ import java.util.Map; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; /** @@ -43,35 +41,39 @@ import java.util.concurrent.atomic.AtomicInteger; public class Sampler extends TimerTask { private static final AtomicInteger THREAD_ID = new AtomicInteger(0); - /** The thread management interface for the current JVM */ - private final ThreadMXBean threadBean = ManagementFactory.getThreadMXBean(); - - /** A map of root stack nodes for each thread with sampling data */ - private final Map<String, StackNode> threadData = new ConcurrentHashMap<>(); - /** The worker pool for inserting stack nodes */ private final ExecutorService workerPool = Executors.newFixedThreadPool( 6, new ThreadFactoryBuilder().setNameFormat("spark-worker-" + THREAD_ID.getAndIncrement()).build() ); + /** The thread management interface for the current JVM */ + private final ThreadMXBean threadBean = ManagementFactory.getThreadMXBean(); + /** The instance used to generate thread information for use in sampling */ + private final ThreadDumper threadDumper; + /** Responsible for aggregating and then outputting collected sampling data */ + private final DataAggregator dataAggregator; + /** A future to encapsulation the completion of this sampler instance */ private final CompletableFuture<Sampler> future = new CompletableFuture<>(); /** The interval to wait between sampling, in milliseconds */ private final int interval; - /** The instance used to generate thread information for use in sampling */ - private final ThreadDumper threadDumper; - /** The instance used to group threads together */ - private final ThreadGrouper threadGrouper; /** The time when sampling first began */ private long startTime = -1; /** The unix timestamp (in millis) when this sampler should automatically complete.*/ private final long endTime; // -1 for nothing public Sampler(int interval, ThreadDumper threadDumper, ThreadGrouper threadGrouper, long endTime) { + this.threadDumper = threadDumper; + this.dataAggregator = new AsyncDataAggregator(this.workerPool, threadGrouper, interval); this.interval = interval; + this.endTime = endTime; + } + + public Sampler(int interval, ThreadDumper threadDumper, ThreadGrouper threadGrouper, long endTime, TickCounter tickCounter, int tickLengthThreshold) { this.threadDumper = threadDumper; - this.threadGrouper = threadGrouper; + this.dataAggregator = new TickedDataAggregator(this.workerPool, tickCounter, threadGrouper, interval, tickLengthThreshold); + this.interval = interval; this.endTime = endTime; } @@ -81,35 +83,9 @@ public class Sampler extends TimerTask { * @param samplingThread the timer to schedule the sampling on */ public void start(Timer samplingThread) { - samplingThread.scheduleAtFixedRate(this, 0, this.interval); this.startTime = System.currentTimeMillis(); - } - - private void insertData(QueuedThreadInfo data) { - try { - String group = this.threadGrouper.getGroup(data.threadName); - StackNode node = this.threadData.computeIfAbsent(group, StackNode::new); - node.log(data.stack, Sampler.this.interval); - } catch (Exception e) { - e.printStackTrace(); - } - } - - /** - * Gets the sampling data recorded by this instance. - * - * @return the data - */ - public Map<String, StackNode> getData() { - // wait for all pending data to be inserted - this.workerPool.shutdown(); - try { - this.workerPool.awaitTermination(15, TimeUnit.SECONDS); - } catch (InterruptedException e) { - e.printStackTrace(); - } - - return this.threadData; + this.dataAggregator.start(); + samplingThread.scheduleAtFixedRate(this, 0, this.interval); } public long getStartTime() { @@ -145,10 +121,7 @@ public class Sampler extends TimerTask { continue; } - // form the queued data - QueuedThreadInfo queuedData = new QueuedThreadInfo(threadName, stack); - // schedule insertion of the data - this.workerPool.execute(queuedData); + this.dataAggregator.insertData(threadName, stack); } } catch (Throwable t) { this.future.completeExceptionally(t); @@ -161,7 +134,7 @@ public class Sampler extends TimerTask { JsonArray threads = new JsonArray(); - List<Map.Entry<String, StackNode>> data = new ArrayList<>(getData().entrySet()); + List<Map.Entry<String, StackNode>> data = new ArrayList<>(this.dataAggregator.getData().entrySet()); data.sort(Map.Entry.comparingByKey()); for (Map.Entry<String, StackNode> entry : data) { @@ -177,19 +150,4 @@ public class Sampler extends TimerTask { return out; } - private final class QueuedThreadInfo implements Runnable { - private final String threadName; - private final StackTraceElement[] stack; - - private QueuedThreadInfo(String threadName, StackTraceElement[] stack) { - this.threadName = threadName; - this.stack = stack; - } - - @Override - public void run() { - insertData(this); - } - } - } diff --git a/common/src/main/java/me/lucko/spark/profiler/SamplerBuilder.java b/common/src/main/java/me/lucko/spark/profiler/SamplerBuilder.java index fa2898b..4c16d50 100644 --- a/common/src/main/java/me/lucko/spark/profiler/SamplerBuilder.java +++ b/common/src/main/java/me/lucko/spark/profiler/SamplerBuilder.java @@ -15,6 +15,9 @@ public class SamplerBuilder { private ThreadDumper threadDumper = ThreadDumper.ALL; private ThreadGrouper threadGrouper = ThreadGrouper.BY_NAME; + private int ticksOver = -1; + private TickCounter tickCounter = null; + public SamplerBuilder() { } @@ -39,8 +42,20 @@ public class SamplerBuilder { return this; } + public SamplerBuilder ticksOver(int ticksOver, TickCounter tickCounter) { + this.ticksOver = ticksOver; + this.tickCounter = tickCounter; + return this; + } + public Sampler start(Timer samplingThread) { - Sampler sampler = new Sampler(this.samplingInterval, this.threadDumper, this.threadGrouper, this.timeout); + Sampler sampler; + if (this.ticksOver != -1 && this.tickCounter != null) { + sampler = new Sampler(this.samplingInterval, this.threadDumper, this.threadGrouper, this.timeout, this.tickCounter, this.ticksOver); + } else { + sampler = new Sampler(this.samplingInterval, this.threadDumper, this.threadGrouper, this.timeout); + } + sampler.start(samplingThread); return sampler; } diff --git a/common/src/main/java/me/lucko/spark/profiler/TickCounter.java b/common/src/main/java/me/lucko/spark/profiler/TickCounter.java new file mode 100644 index 0000000..53a9c27 --- /dev/null +++ b/common/src/main/java/me/lucko/spark/profiler/TickCounter.java @@ -0,0 +1,39 @@ +package me.lucko.spark.profiler; + +/** + * A hook with the game's "tick loop". + */ +public interface TickCounter { + + /** + * Starts the counter + */ + void start(); + + /** + * Stops the counter + */ + void close(); + + /** + * Gets the current tick number + * + * @return the current tick + */ + long getCurrentTick(); + + /** + * Adds a task to be called each time the tick increments + * + * @param runnable the task + */ + void addTickTask(Runnable runnable); + + /** + * Removes a tick task + * + * @param runnable the task + */ + void removeTickTask(Runnable runnable); + +} diff --git a/common/src/main/java/me/lucko/spark/profiler/TickedDataAggregator.java b/common/src/main/java/me/lucko/spark/profiler/TickedDataAggregator.java new file mode 100644 index 0000000..abca4b3 --- /dev/null +++ b/common/src/main/java/me/lucko/spark/profiler/TickedDataAggregator.java @@ -0,0 +1,147 @@ +package me.lucko.spark.profiler; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * Implementation of {@link DataAggregator} which supports only including sampling data from "ticks" + * which exceed a certain threshold in duration. + */ +public class TickedDataAggregator implements DataAggregator { + + /** A map of root stack nodes for each thread with sampling data */ + private final Map<String, StackNode> threadData = new ConcurrentHashMap<>(); + + /** The worker pool for inserting stack nodes */ + private final ExecutorService workerPool; + + /** Used to monitor the current "tick" of the server */ + private final TickCounter tickCounter; + + /** The instance used to group threads together */ + private final ThreadGrouper threadGrouper; + + /** The interval to wait between sampling, in milliseconds */ + private final int interval; + + /** Tick durations under this threshold will not be inserted */ + private final int tickLengthThreshold; + + /** The expected number of samples in each tick */ + private final int expectedSize; + + // state + private long currentTick = -1; + private TickList currentData = new TickList(0); + + public TickedDataAggregator(ExecutorService workerPool, TickCounter tickCounter, ThreadGrouper threadGrouper, int interval, int tickLengthThreshold) { + this.workerPool = workerPool; + this.tickCounter = tickCounter; + this.threadGrouper = threadGrouper; + this.interval = interval; + this.tickLengthThreshold = tickLengthThreshold; + // 50 millis in a tick, plus 10 so we have a bit of room to go over + this.expectedSize = (50 / interval) + 10; + } + + // this is effectively synchronized by the Timer instance in Sampler + @Override + public void insertData(String threadName, StackTraceElement[] stack) { + long tick = this.tickCounter.getCurrentTick(); + if (this.currentTick != tick) { + pushCurrentTick(); + this.currentTick = tick; + this.currentData = new TickList(this.expectedSize); + } + + // form the queued data + QueuedThreadInfo queuedData = new QueuedThreadInfo(threadName, stack); + // insert it + this.currentData.addData(queuedData); + } + + private void pushCurrentTick() { + TickList currentData = this.currentData; + + // approximate how long the tick lasted + int tickLengthMillis = currentData.getList().size() * this.interval; + + // don't push data below the threshold + if (tickLengthMillis < this.tickLengthThreshold) { + return; + } + + this.workerPool.submit(currentData); + } + + @Override + public void start() { + this.tickCounter.start(); + } + + @Override + public Map<String, StackNode> getData() { + // push the current tick + pushCurrentTick(); + + // close the tick counter + this.tickCounter.close(); + + // wait for all pending data to be inserted + this.workerPool.shutdown(); + try { + this.workerPool.awaitTermination(15, TimeUnit.SECONDS); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + return this.threadData; + } + + void insertData(List<QueuedThreadInfo> dataList) { + for (QueuedThreadInfo data : dataList) { + try { + String group = this.threadGrouper.getGroup(data.threadName); + StackNode node = this.threadData.computeIfAbsent(group, StackNode::new); + node.log(data.stack, this.interval); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + + private final class TickList implements Runnable { + private final List<QueuedThreadInfo> list; + + TickList(int expectedSize) { + this.list = new ArrayList<>(expectedSize); + } + + @Override + public void run() { + insertData(this.list); + } + + public List<QueuedThreadInfo> getList() { + return this.list; + } + + public void addData(QueuedThreadInfo data) { + this.list.add(data); + } + } + + private static final class QueuedThreadInfo { + private final String threadName; + private final StackTraceElement[] stack; + + QueuedThreadInfo(String threadName, StackTraceElement[] stack) { + this.threadName = threadName; + this.stack = stack; + } + } +} diff --git a/sponge/src/main/java/me/lucko/spark/sponge/SparkSpongePlugin.java b/sponge/src/main/java/me/lucko/spark/sponge/SparkSpongePlugin.java index 9149cb0..b77b13c 100644 --- a/sponge/src/main/java/me/lucko/spark/sponge/SparkSpongePlugin.java +++ b/sponge/src/main/java/me/lucko/spark/sponge/SparkSpongePlugin.java @@ -4,18 +4,20 @@ import com.google.inject.Inject; import me.lucko.spark.common.CommandHandler; import me.lucko.spark.profiler.ThreadDumper; +import me.lucko.spark.profiler.TickCounter; import me.lucko.spark.sponge.utils.PomData; import org.spongepowered.api.Game; +import org.spongepowered.api.Sponge; import org.spongepowered.api.command.CommandCallable; import org.spongepowered.api.command.CommandResult; import org.spongepowered.api.command.CommandSource; +import org.spongepowered.api.entity.living.player.Player; import org.spongepowered.api.event.Listener; import org.spongepowered.api.event.game.state.GameStartedServerEvent; import org.spongepowered.api.plugin.Plugin; import org.spongepowered.api.scheduler.AsynchronousExecutor; import org.spongepowered.api.scheduler.SpongeExecutorService; -import org.spongepowered.api.text.LiteralText; import org.spongepowered.api.text.Text; import org.spongepowered.api.text.action.TextActions; import org.spongepowered.api.text.format.TextColors; @@ -41,20 +43,38 @@ import javax.annotation.Nullable; public class SparkSpongePlugin implements CommandCallable { private final CommandHandler<CommandSource> commandHandler = new CommandHandler<CommandSource>() { + private Text colorize(String message) { + return TextSerializers.FORMATTING_CODE.deserialize(message); + } + + private void broadcast(Text msg) { + Sponge.getServer().getConsole().sendMessage(msg); + for (Player player : Sponge.getServer().getOnlinePlayers()) { + if (player.hasPermission("spark.profiler")) { + player.sendMessage(msg); + } + } + } + @Override protected void sendMessage(CommandSource sender, String message) { - sender.sendMessage(TextSerializers.FORMATTING_CODE.deserialize(message)); + sender.sendMessage(colorize(message)); + } + + @Override + protected void sendMessage(String message) { + Text msg = colorize(message); + broadcast(msg); } @Override - protected void sendLink(CommandSource sender, String url) { + protected void sendLink(String url) { try { - LiteralText text = Text.builder(url) + Text msg = Text.builder(url) .color(TextColors.GRAY) .onClick(TextActions.openUrl(new URL(url))) .build(); - - sender.sendMessage(text); + broadcast(msg); } catch (MalformedURLException e) { e.printStackTrace(); } @@ -69,6 +89,11 @@ public class SparkSpongePlugin implements CommandCallable { protected ThreadDumper getDefaultThreadDumper() { return new ThreadDumper.Specific(new long[]{Thread.currentThread().getId()}); } + + @Override + protected TickCounter newTickCounter() { + return new SpongeTickCounter(SparkSpongePlugin.this); + } }; @Inject diff --git a/sponge/src/main/java/me/lucko/spark/sponge/SpongeTickCounter.java b/sponge/src/main/java/me/lucko/spark/sponge/SpongeTickCounter.java new file mode 100644 index 0000000..bda2a69 --- /dev/null +++ b/sponge/src/main/java/me/lucko/spark/sponge/SpongeTickCounter.java @@ -0,0 +1,54 @@ +package me.lucko.spark.sponge; + +import me.lucko.spark.profiler.TickCounter; + +import org.spongepowered.api.scheduler.Task; + +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.atomic.LongAdder; + +public class SpongeTickCounter implements TickCounter, Runnable { + private final SparkSpongePlugin plugin; + private Task task; + + private final Set<Runnable> tasks = new HashSet<>(); + private final LongAdder tick = new LongAdder(); + + public SpongeTickCounter(SparkSpongePlugin plugin) { + this.plugin = plugin; + } + + @Override + public void run() { + this.tick.increment(); + for (Runnable r : this.tasks){ + r.run(); + } + } + + @Override + public void start() { + this.task = Task.builder().intervalTicks(1).name("spark-ticker").execute(this).submit(this.plugin); + } + + @Override + public void close() { + this.task.cancel(); + } + + @Override + public long getCurrentTick() { + return this.tick.longValue(); + } + + @Override + public void addTickTask(Runnable runnable) { + this.tasks.add(runnable); + } + + @Override + public void removeTickTask(Runnable runnable) { + this.tasks.remove(runnable); + } +} |