aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--bukkit/src/main/java/me/lucko/spark/bukkit/BukkitTickCounter.java55
-rw-r--r--bukkit/src/main/java/me/lucko/spark/bukkit/SparkBukkitPlugin.java34
-rw-r--r--bungeecord/src/main/java/me/lucko/spark/bungeecord/SparkBungeeCordPlugin.java32
-rw-r--r--common/src/main/java/me/lucko/spark/common/CommandHandler.java96
-rw-r--r--common/src/main/java/me/lucko/spark/common/TickMonitor.java87
-rw-r--r--common/src/main/java/me/lucko/spark/profiler/AsyncDataAggregator.java77
-rw-r--r--common/src/main/java/me/lucko/spark/profiler/DataAggregator.java32
-rw-r--r--common/src/main/java/me/lucko/spark/profiler/Sampler.java80
-rw-r--r--common/src/main/java/me/lucko/spark/profiler/SamplerBuilder.java17
-rw-r--r--common/src/main/java/me/lucko/spark/profiler/TickCounter.java39
-rw-r--r--common/src/main/java/me/lucko/spark/profiler/TickedDataAggregator.java147
-rw-r--r--sponge/src/main/java/me/lucko/spark/sponge/SparkSpongePlugin.java37
-rw-r--r--sponge/src/main/java/me/lucko/spark/sponge/SpongeTickCounter.java54
13 files changed, 697 insertions, 90 deletions
diff --git a/bukkit/src/main/java/me/lucko/spark/bukkit/BukkitTickCounter.java b/bukkit/src/main/java/me/lucko/spark/bukkit/BukkitTickCounter.java
new file mode 100644
index 0000000..61a7690
--- /dev/null
+++ b/bukkit/src/main/java/me/lucko/spark/bukkit/BukkitTickCounter.java
@@ -0,0 +1,55 @@
+package me.lucko.spark.bukkit;
+
+import me.lucko.spark.profiler.TickCounter;
+
+import org.bukkit.plugin.Plugin;
+import org.bukkit.scheduler.BukkitTask;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.atomic.LongAdder;
+
+public class BukkitTickCounter implements TickCounter, Runnable {
+ private final Plugin plugin;
+ private BukkitTask task;
+
+ private final Set<Runnable> tasks = new HashSet<>();
+ private final LongAdder tick = new LongAdder();
+
+ public BukkitTickCounter(Plugin plugin) {
+ this.plugin = plugin;
+ }
+
+ @Override
+ public void run() {
+ this.tick.increment();
+ for (Runnable r : this.tasks){
+ r.run();
+ }
+ }
+
+ @Override
+ public void start() {
+ this.task = this.plugin.getServer().getScheduler().runTaskTimer(this.plugin, this, 1, 1);
+ }
+
+ @Override
+ public void close() {
+ this.task.cancel();
+ }
+
+ @Override
+ public long getCurrentTick() {
+ return this.tick.longValue();
+ }
+
+ @Override
+ public void addTickTask(Runnable runnable) {
+ this.tasks.add(runnable);
+ }
+
+ @Override
+ public void removeTickTask(Runnable runnable) {
+ this.tasks.remove(runnable);
+ }
+}
diff --git a/bukkit/src/main/java/me/lucko/spark/bukkit/SparkBukkitPlugin.java b/bukkit/src/main/java/me/lucko/spark/bukkit/SparkBukkitPlugin.java
index 8b3ed28..dc432c5 100644
--- a/bukkit/src/main/java/me/lucko/spark/bukkit/SparkBukkitPlugin.java
+++ b/bukkit/src/main/java/me/lucko/spark/bukkit/SparkBukkitPlugin.java
@@ -2,23 +2,46 @@ package me.lucko.spark.bukkit;
import me.lucko.spark.common.CommandHandler;
import me.lucko.spark.profiler.ThreadDumper;
+import me.lucko.spark.profiler.TickCounter;
import org.bukkit.ChatColor;
import org.bukkit.command.Command;
import org.bukkit.command.CommandSender;
+import org.bukkit.entity.Player;
import org.bukkit.plugin.java.JavaPlugin;
public class SparkBukkitPlugin extends JavaPlugin {
private final CommandHandler<CommandSender> commandHandler = new CommandHandler<CommandSender>() {
+
+ private String colorize(String message) {
+ return ChatColor.translateAlternateColorCodes('&', message);
+ }
+
+ private void broadcast(String msg) {
+ getServer().getConsoleSender().sendMessage(msg);
+ for (Player player : getServer().getOnlinePlayers()) {
+ if (player.hasPermission("spark.profiler")) {
+ player.sendMessage(msg);
+ }
+ }
+ }
+
@Override
protected void sendMessage(CommandSender sender, String message) {
- sender.sendMessage(ChatColor.translateAlternateColorCodes('&', message));
+ sender.sendMessage(colorize(message));
}
@Override
- protected void sendLink(CommandSender sender, String url) {
- sendMessage(sender, "&7" + url);
+ protected void sendMessage(String message) {
+ String msg = colorize(message);
+ broadcast(msg);
+ }
+
+ @Override
+ protected void sendLink(String url) {
+ String msg = colorize("&7" + url);
+ broadcast(msg);
}
@Override
@@ -30,6 +53,11 @@ public class SparkBukkitPlugin extends JavaPlugin {
protected ThreadDumper getDefaultThreadDumper() {
return new ThreadDumper.Specific(new long[]{Thread.currentThread().getId()});
}
+
+ @Override
+ protected TickCounter newTickCounter() {
+ return new BukkitTickCounter(SparkBukkitPlugin.this);
+ }
};
@Override
diff --git a/bungeecord/src/main/java/me/lucko/spark/bungeecord/SparkBungeeCordPlugin.java b/bungeecord/src/main/java/me/lucko/spark/bungeecord/SparkBungeeCordPlugin.java
index 717497c..59bab67 100644
--- a/bungeecord/src/main/java/me/lucko/spark/bungeecord/SparkBungeeCordPlugin.java
+++ b/bungeecord/src/main/java/me/lucko/spark/bungeecord/SparkBungeeCordPlugin.java
@@ -2,28 +2,49 @@ package me.lucko.spark.bungeecord;
import me.lucko.spark.common.CommandHandler;
import me.lucko.spark.profiler.ThreadDumper;
+import me.lucko.spark.profiler.TickCounter;
import net.md_5.bungee.api.ChatColor;
import net.md_5.bungee.api.CommandSender;
+import net.md_5.bungee.api.chat.BaseComponent;
import net.md_5.bungee.api.chat.ClickEvent;
import net.md_5.bungee.api.chat.TextComponent;
+import net.md_5.bungee.api.connection.ProxiedPlayer;
import net.md_5.bungee.api.plugin.Command;
import net.md_5.bungee.api.plugin.Plugin;
public class SparkBungeeCordPlugin extends Plugin {
private final CommandHandler<CommandSender> commandHandler = new CommandHandler<CommandSender>() {
+ private BaseComponent[] colorize(String message) {
+ return TextComponent.fromLegacyText(ChatColor.translateAlternateColorCodes('&', message));
+ }
+
+ private void broadcast(BaseComponent... msg) {
+ getProxy().getConsole().sendMessage(msg);
+ for (ProxiedPlayer player : getProxy().getPlayers()) {
+ if (player.hasPermission("spark.profiler")) {
+ player.sendMessage(msg);
+ }
+ }
+ }
+
@Override
protected void sendMessage(CommandSender sender, String message) {
- sender.sendMessage(TextComponent.fromLegacyText(ChatColor.translateAlternateColorCodes('&', message)));
+ sender.sendMessage(colorize(message));
}
@Override
- protected void sendLink(CommandSender sender, String url) {
+ protected void sendMessage(String message) {
+ broadcast(colorize(message));
+ }
+
+ @Override
+ protected void sendLink(String url) {
TextComponent component = new TextComponent(url);
component.setColor(ChatColor.GRAY);
component.setClickEvent(new ClickEvent(ClickEvent.Action.OPEN_URL, url));
- sender.sendMessage(component);
+ broadcast(component);
}
@Override
@@ -35,6 +56,11 @@ public class SparkBungeeCordPlugin extends Plugin {
protected ThreadDumper getDefaultThreadDumper() {
return new ThreadDumper.All();
}
+
+ @Override
+ protected TickCounter newTickCounter() {
+ throw new UnsupportedOperationException();
+ }
};
@Override
diff --git a/common/src/main/java/me/lucko/spark/common/CommandHandler.java b/common/src/main/java/me/lucko/spark/common/CommandHandler.java
index 6664c18..a9bbe61 100644
--- a/common/src/main/java/me/lucko/spark/common/CommandHandler.java
+++ b/common/src/main/java/me/lucko/spark/common/CommandHandler.java
@@ -10,6 +10,7 @@ import me.lucko.spark.profiler.Sampler;
import me.lucko.spark.profiler.SamplerBuilder;
import me.lucko.spark.profiler.ThreadDumper;
import me.lucko.spark.profiler.ThreadGrouper;
+import me.lucko.spark.profiler.TickCounter;
import java.io.IOException;
import java.util.ArrayList;
@@ -33,6 +34,8 @@ public abstract class CommandHandler<T> {
/** The URL of the viewer frontend */
private static final String VIEWER_URL = "https://sparkprofiler.github.io/?";
+ /** The prefix used in all messages */
+ private static final String PREFIX = "&8[&fspark&8] &7";
/**
* The {@link Timer} being used by the {@link #activeSampler}.
@@ -43,17 +46,25 @@ public abstract class CommandHandler<T> {
private final Object[] activeSamplerMutex = new Object[0];
/** The WarmRoast instance currently running, if any */
private Sampler activeSampler = null;
+ /** The tick monitor instance currently running, if any */
+ private ReportingTickMonitor activeTickMonitor = null;
// abstract methods implemented by each platform
protected abstract void sendMessage(T sender, String message);
- protected abstract void sendLink(T sender, String url);
+ protected abstract void sendMessage(String message);
+ protected abstract void sendLink(String url);
protected abstract void runAsync(Runnable r);
protected abstract ThreadDumper getDefaultThreadDumper();
+ protected abstract TickCounter newTickCounter();
private void sendPrefixedMessage(T sender, String message) {
- sendMessage(sender, "&8[&fspark&8] &7" + message);
+ sendMessage(sender, PREFIX + message);
+ }
+
+ private void sendPrefixedMessage(String message) {
+ sendMessage(PREFIX + message);
}
public void handleCommand(T sender, String[] args) {
@@ -79,6 +90,9 @@ public abstract class CommandHandler<T> {
case "paste":
handleStop(sender);
break;
+ case "monitoring":
+ handleMonitoring(sender, arguments);
+ break;
default:
sendInfo(sender);
break;
@@ -95,9 +109,11 @@ public abstract class CommandHandler<T> {
sendMessage(sender, " &8[&7--thread&8 <thread name>]");
sendMessage(sender, " &8[&7--not-combined]");
sendMessage(sender, " &8[&7--interval&8 <interval millis>]");
+ sendMessage(sender, " &8[&7--only-ticks-over&8 <tick length millis>]");
sendMessage(sender, "&b&l> &7/profiler info");
sendMessage(sender, "&b&l> &7/profiler stop");
sendMessage(sender, "&b&l> &7/profiler cancel");
+ sendMessage(sender, "&b&l> &7/profiler monitoring");
}
private void handleStart(T sender, List<String> args) {
@@ -136,6 +152,17 @@ public abstract class CommandHandler<T> {
threadGrouper = ThreadGrouper.BY_POOL;
}
+ int ticksOver = parseInt(arguments, "only-ticks-over", "o");
+ TickCounter tickCounter = null;
+ if (ticksOver != -1) {
+ try {
+ tickCounter = newTickCounter();
+ } catch (UnsupportedOperationException e) {
+ sendPrefixedMessage(sender, "&cTick counting is not supported on BungeeCord!");
+ return;
+ }
+ }
+
Sampler sampler;
synchronized (this.activeSamplerMutex) {
if (this.activeSampler != null) {
@@ -143,7 +170,7 @@ public abstract class CommandHandler<T> {
return;
}
- sendPrefixedMessage(sender, "&7Initializing a new profiler, please wait...");
+ sendPrefixedMessage("&7Initializing a new profiler, please wait...");
SamplerBuilder builder = new SamplerBuilder();
builder.threadDumper(threadDumper);
@@ -152,13 +179,16 @@ public abstract class CommandHandler<T> {
builder.completeAfter(timeoutSeconds, TimeUnit.SECONDS);
}
builder.samplingInterval(intervalMillis);
+ if (ticksOver != -1) {
+ builder.ticksOver(ticksOver, tickCounter);
+ }
sampler = this.activeSampler = builder.start(this.samplingThread);
- sendPrefixedMessage(sender, "&bProfiler now active!");
+ sendPrefixedMessage("&bProfiler now active!");
if (timeoutSeconds == -1) {
- sendPrefixedMessage(sender, "&7Use '/profiler stop' to stop profiling and upload the results.");
+ sendPrefixedMessage("&7Use '/profiler stop' to stop profiling and upload the results.");
} else {
- sendPrefixedMessage(sender, "&7The results will be automatically returned after the profiler has been running for " + timeoutSeconds + " seconds.");
+ sendPrefixedMessage("&7The results will be automatically returned after the profiler has been running for " + timeoutSeconds + " seconds.");
}
}
@@ -167,7 +197,7 @@ public abstract class CommandHandler<T> {
// send message if profiling fails
future.whenCompleteAsync((s, throwable) -> {
if (throwable != null) {
- sendPrefixedMessage(sender, "&cSampling operation failed unexpectedly. Error: " + throwable.toString());
+ sendPrefixedMessage("&cSampling operation failed unexpectedly. Error: " + throwable.toString());
throwable.printStackTrace();
}
});
@@ -184,8 +214,8 @@ public abstract class CommandHandler<T> {
// await the result
if (timeoutSeconds != -1) {
future.thenAcceptAsync(s -> {
- sendPrefixedMessage(sender, "&7The active sampling operation has completed! Uploading results...");
- handleUpload(sender, s);
+ sendPrefixedMessage("&7The active sampling operation has completed! Uploading results...");
+ handleUpload(s);
});
}
}
@@ -215,8 +245,8 @@ public abstract class CommandHandler<T> {
sendPrefixedMessage(sender, "&7There isn't an active sampling task running.");
} else {
this.activeSampler.cancel();
- sendPrefixedMessage(sender, "&7The active sampling operation has been stopped! Uploading results...");
- handleUpload(sender, this.activeSampler);
+ sendPrefixedMessage("&7The active sampling operation has been stopped! Uploading results...");
+ handleUpload(this.activeSampler);
this.activeSampler = null;
}
}
@@ -229,25 +259,59 @@ public abstract class CommandHandler<T> {
} else {
this.activeSampler.cancel();
this.activeSampler = null;
- sendPrefixedMessage(sender, "&bThe active sampling task has been cancelled.");
+ sendPrefixedMessage("&bThe active sampling task has been cancelled.");
}
}
}
- private void handleUpload(T sender, Sampler sampler) {
+ private void handleUpload(Sampler sampler) {
runAsync(() -> {
JsonObject output = sampler.formOutput();
try {
String pasteId = Bytebin.postContent(output);
- sendPrefixedMessage(sender, "&bSampling results:");
- sendLink(sender, VIEWER_URL + pasteId);
+ sendPrefixedMessage("&bSampling results:");
+ sendLink(VIEWER_URL + pasteId);
} catch (IOException e) {
- sendPrefixedMessage(sender, "&cAn error occurred whilst uploading the results.");
+ sendPrefixedMessage("&cAn error occurred whilst uploading the results.");
e.printStackTrace();
}
});
}
+ private void handleMonitoring(T sender, List<String> args) {
+ SetMultimap<String, String> arguments = parseArguments(args);
+
+ if (this.activeTickMonitor == null) {
+
+ int threshold = parseInt(arguments, "threshold", "t");
+ if (threshold == -1) {
+ threshold = 100;
+ }
+
+ try {
+ TickCounter tickCounter = newTickCounter();
+ this.activeTickMonitor = new ReportingTickMonitor(tickCounter, threshold);
+ } catch (UnsupportedOperationException e) {
+ sendPrefixedMessage(sender, "&cNot supported on BungeeCord!");
+ }
+ } else {
+ this.activeTickMonitor.close();
+ this.activeTickMonitor = null;
+ sendPrefixedMessage("&7Tick monitor disabled.");
+ }
+ }
+
+ private class ReportingTickMonitor extends TickMonitor {
+ public ReportingTickMonitor(TickCounter tickCounter, int percentageChangeThreshold) {
+ super(tickCounter, percentageChangeThreshold);
+ }
+
+ @Override
+ protected void sendMessage(String message) {
+ sendPrefixedMessage(message);
+ }
+ }
+
private int parseInt(SetMultimap<String, String> arguments, String longArg, String shortArg) {
Iterator<String> it = Sets.union(arguments.get(longArg), arguments.get(shortArg)).iterator();
if (it.hasNext()) {
diff --git a/common/src/main/java/me/lucko/spark/common/TickMonitor.java b/common/src/main/java/me/lucko/spark/common/TickMonitor.java
new file mode 100644
index 0000000..e6342c4
--- /dev/null
+++ b/common/src/main/java/me/lucko/spark/common/TickMonitor.java
@@ -0,0 +1,87 @@
+package me.lucko.spark.common;
+
+import me.lucko.spark.profiler.TickCounter;
+
+import java.text.DecimalFormat;
+import java.util.DoubleSummaryStatistics;
+
+public abstract class TickMonitor implements Runnable {
+ private static final DecimalFormat df = new DecimalFormat("#.##");
+
+ private final TickCounter tickCounter;
+ private final int percentageChangeThreshold;
+
+ // data
+ private double lastTickTime = 0;
+ private State state = null;
+ private DoubleSummaryStatistics averageTickTime = new DoubleSummaryStatistics();
+
+ public TickMonitor(TickCounter tickCounter, int percentageChangeThreshold) {
+ this.tickCounter = tickCounter;
+ this.percentageChangeThreshold = percentageChangeThreshold;
+
+ this.tickCounter.start();
+ this.tickCounter.addTickTask(this);
+ }
+
+ protected abstract void sendMessage(String message);
+
+ public void close() {
+ this.tickCounter.close();
+ }
+
+ @Override
+ public void run() {
+ double now = ((double) System.nanoTime()) / 1000000d;
+
+ // init
+ if (this.state == null) {
+ this.state = State.SETUP;
+ this.lastTickTime = now;
+ sendMessage("Tick monitor started. Before the monitor becomes fully active, the server's " +
+ "average tick rate will be calculated over a period of 100 ticks (approx 5 seconds).");
+ return;
+ }
+
+ // find the diff
+ double diff = now - this.lastTickTime;
+ this.lastTickTime = now;
+
+ // form averages
+ if (this.state == State.SETUP) {
+ this.averageTickTime.accept(diff);
+
+ // move onto the next state
+ if (this.averageTickTime.getCount() >= 100) {
+
+ sendMessage("&bAnalysis is now complete.");
+ sendMessage("&f> &7Max: " + df.format(this.averageTickTime.getMax()) + "ms");
+ sendMessage("&f> &7Min: " + df.format(this.averageTickTime.getMin()) + "ms");
+ sendMessage("&f> &7Avg: " + df.format(this.averageTickTime.getAverage()) + "ms");
+ sendMessage("Starting now, any ticks with >" + this.percentageChangeThreshold + "% increase in " +
+ "duration compared to the average will be reported.");
+ this.state = State.MONITORING;
+ }
+ }
+
+ if (this.state == State.MONITORING) {
+ double avg = this.averageTickTime.getAverage();
+
+ double increase = diff - avg;
+ if (increase <= 0) {
+ return;
+ }
+
+ double percentageChange = (increase * 100d) / avg;
+ if (percentageChange > this.percentageChangeThreshold) {
+ sendMessage("&7Tick &8#" + this.tickCounter.getCurrentTick() + " &7lasted &b" + df.format(diff) + "&7 milliseconds. " +
+ "&7(a &b" + df.format(percentageChange) + "% &7increase from average)");
+ }
+ }
+ }
+
+ private enum State {
+ SETUP,
+ MONITORING
+ }
+}
diff --git a/common/src/main/java/me/lucko/spark/profiler/AsyncDataAggregator.java b/common/src/main/java/me/lucko/spark/profiler/AsyncDataAggregator.java
new file mode 100644
index 0000000..9a4090e
--- /dev/null
+++ b/common/src/main/java/me/lucko/spark/profiler/AsyncDataAggregator.java
@@ -0,0 +1,77 @@
+package me.lucko.spark.profiler;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Implementation of {@link DataAggregator} that makes use of a "worker" thread pool for inserting
+ * data.
+ */
+public class AsyncDataAggregator implements DataAggregator {
+
+ /** A map of root stack nodes for each thread with sampling data */
+ private final Map<String, StackNode> threadData = new ConcurrentHashMap<>();
+
+ /** The worker pool for inserting stack nodes */
+ private final ExecutorService workerPool;
+
+ /** The instance used to group threads together */
+ private final ThreadGrouper threadGrouper;
+
+ /** The interval to wait between sampling, in milliseconds */
+ private final int interval;
+
+ public AsyncDataAggregator(ExecutorService workerPool, ThreadGrouper threadGrouper, int interval) {
+ this.workerPool = workerPool;
+ this.threadGrouper = threadGrouper;
+ this.interval = interval;
+ }
+
+ @Override
+ public void insertData(String threadName, StackTraceElement[] stack) {
+ // form the queued data
+ QueuedThreadInfo queuedData = new QueuedThreadInfo(threadName, stack);
+ // schedule insertion of the data
+ this.workerPool.execute(queuedData);
+ }
+
+ @Override
+ public Map<String, StackNode> getData() {
+ // wait for all pending data to be inserted
+ this.workerPool.shutdown();
+ try {
+ this.workerPool.awaitTermination(15, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ return this.threadData;
+ }
+
+ void insertData(QueuedThreadInfo data) {
+ try {
+ String group = this.threadGrouper.getGroup(data.threadName);
+ StackNode node = this.threadData.computeIfAbsent(group, StackNode::new);
+ node.log(data.stack, this.interval);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ private final class QueuedThreadInfo implements Runnable {
+ private final String threadName;
+ private final StackTraceElement[] stack;
+
+ QueuedThreadInfo(String threadName, StackTraceElement[] stack) {
+ this.threadName = threadName;
+ this.stack = stack;
+ }
+
+ @Override
+ public void run() {
+ insertData(this);
+ }
+ }
+}
diff --git a/common/src/main/java/me/lucko/spark/profiler/DataAggregator.java b/common/src/main/java/me/lucko/spark/profiler/DataAggregator.java
new file mode 100644
index 0000000..1afa52c
--- /dev/null
+++ b/common/src/main/java/me/lucko/spark/profiler/DataAggregator.java
@@ -0,0 +1,32 @@
+package me.lucko.spark.profiler;
+
+import java.util.Map;
+
+/**
+ * Aggregates sampling data.
+ */
+public interface DataAggregator {
+
+ /**
+ * Called before the sampler begins to insert data
+ */
+ default void start() {
+
+ }
+
+ /**
+ * Forms the output data
+ *
+ * @return the output data
+ */
+ Map<String, StackNode> getData();
+
+ /**
+ * Inserts sampling data into this aggregator
+ *
+ * @param threadName the name of the thread
+ * @param stack the call stack
+ */
+ void insertData(String threadName, StackTraceElement[] stack);
+
+}
diff --git a/common/src/main/java/me/lucko/spark/profiler/Sampler.java b/common/src/main/java/me/lucko/spark/profiler/Sampler.java
index d03b4b6..44cf445 100644
--- a/common/src/main/java/me/lucko/spark/profiler/Sampler.java
+++ b/common/src/main/java/me/lucko/spark/profiler/Sampler.java
@@ -31,10 +31,8 @@ import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
@@ -43,35 +41,39 @@ import java.util.concurrent.atomic.AtomicInteger;
public class Sampler extends TimerTask {
private static final AtomicInteger THREAD_ID = new AtomicInteger(0);
- /** The thread management interface for the current JVM */
- private final ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
-
- /** A map of root stack nodes for each thread with sampling data */
- private final Map<String, StackNode> threadData = new ConcurrentHashMap<>();
-
/** The worker pool for inserting stack nodes */
private final ExecutorService workerPool = Executors.newFixedThreadPool(
6, new ThreadFactoryBuilder().setNameFormat("spark-worker-" + THREAD_ID.getAndIncrement()).build()
);
+ /** The thread management interface for the current JVM */
+ private final ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
+ /** The instance used to generate thread information for use in sampling */
+ private final ThreadDumper threadDumper;
+ /** Responsible for aggregating and then outputting collected sampling data */
+ private final DataAggregator dataAggregator;
+
/** A future to encapsulation the completion of this sampler instance */
private final CompletableFuture<Sampler> future = new CompletableFuture<>();
/** The interval to wait between sampling, in milliseconds */
private final int interval;
- /** The instance used to generate thread information for use in sampling */
- private final ThreadDumper threadDumper;
- /** The instance used to group threads together */
- private final ThreadGrouper threadGrouper;
/** The time when sampling first began */
private long startTime = -1;
/** The unix timestamp (in millis) when this sampler should automatically complete.*/
private final long endTime; // -1 for nothing
public Sampler(int interval, ThreadDumper threadDumper, ThreadGrouper threadGrouper, long endTime) {
+ this.threadDumper = threadDumper;
+ this.dataAggregator = new AsyncDataAggregator(this.workerPool, threadGrouper, interval);
this.interval = interval;
+ this.endTime = endTime;
+ }
+
+ public Sampler(int interval, ThreadDumper threadDumper, ThreadGrouper threadGrouper, long endTime, TickCounter tickCounter, int tickLengthThreshold) {
this.threadDumper = threadDumper;
- this.threadGrouper = threadGrouper;
+ this.dataAggregator = new TickedDataAggregator(this.workerPool, tickCounter, threadGrouper, interval, tickLengthThreshold);
+ this.interval = interval;
this.endTime = endTime;
}
@@ -81,35 +83,9 @@ public class Sampler extends TimerTask {
* @param samplingThread the timer to schedule the sampling on
*/
public void start(Timer samplingThread) {
- samplingThread.scheduleAtFixedRate(this, 0, this.interval);
this.startTime = System.currentTimeMillis();
- }
-
- private void insertData(QueuedThreadInfo data) {
- try {
- String group = this.threadGrouper.getGroup(data.threadName);
- StackNode node = this.threadData.computeIfAbsent(group, StackNode::new);
- node.log(data.stack, Sampler.this.interval);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- /**
- * Gets the sampling data recorded by this instance.
- *
- * @return the data
- */
- public Map<String, StackNode> getData() {
- // wait for all pending data to be inserted
- this.workerPool.shutdown();
- try {
- this.workerPool.awaitTermination(15, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
-
- return this.threadData;
+ this.dataAggregator.start();
+ samplingThread.scheduleAtFixedRate(this, 0, this.interval);
}
public long getStartTime() {
@@ -145,10 +121,7 @@ public class Sampler extends TimerTask {
continue;
}
- // form the queued data
- QueuedThreadInfo queuedData = new QueuedThreadInfo(threadName, stack);
- // schedule insertion of the data
- this.workerPool.execute(queuedData);
+ this.dataAggregator.insertData(threadName, stack);
}
} catch (Throwable t) {
this.future.completeExceptionally(t);
@@ -161,7 +134,7 @@ public class Sampler extends TimerTask {
JsonArray threads = new JsonArray();
- List<Map.Entry<String, StackNode>> data = new ArrayList<>(getData().entrySet());
+ List<Map.Entry<String, StackNode>> data = new ArrayList<>(this.dataAggregator.getData().entrySet());
data.sort(Map.Entry.comparingByKey());
for (Map.Entry<String, StackNode> entry : data) {
@@ -177,19 +150,4 @@ public class Sampler extends TimerTask {
return out;
}
- private final class QueuedThreadInfo implements Runnable {
- private final String threadName;
- private final StackTraceElement[] stack;
-
- private QueuedThreadInfo(String threadName, StackTraceElement[] stack) {
- this.threadName = threadName;
- this.stack = stack;
- }
-
- @Override
- public void run() {
- insertData(this);
- }
- }
-
}
diff --git a/common/src/main/java/me/lucko/spark/profiler/SamplerBuilder.java b/common/src/main/java/me/lucko/spark/profiler/SamplerBuilder.java
index fa2898b..4c16d50 100644
--- a/common/src/main/java/me/lucko/spark/profiler/SamplerBuilder.java
+++ b/common/src/main/java/me/lucko/spark/profiler/SamplerBuilder.java
@@ -15,6 +15,9 @@ public class SamplerBuilder {
private ThreadDumper threadDumper = ThreadDumper.ALL;
private ThreadGrouper threadGrouper = ThreadGrouper.BY_NAME;
+ private int ticksOver = -1;
+ private TickCounter tickCounter = null;
+
public SamplerBuilder() {
}
@@ -39,8 +42,20 @@ public class SamplerBuilder {
return this;
}
+ public SamplerBuilder ticksOver(int ticksOver, TickCounter tickCounter) {
+ this.ticksOver = ticksOver;
+ this.tickCounter = tickCounter;
+ return this;
+ }
+
public Sampler start(Timer samplingThread) {
- Sampler sampler = new Sampler(this.samplingInterval, this.threadDumper, this.threadGrouper, this.timeout);
+ Sampler sampler;
+ if (this.ticksOver != -1 && this.tickCounter != null) {
+ sampler = new Sampler(this.samplingInterval, this.threadDumper, this.threadGrouper, this.timeout, this.tickCounter, this.ticksOver);
+ } else {
+ sampler = new Sampler(this.samplingInterval, this.threadDumper, this.threadGrouper, this.timeout);
+ }
+
sampler.start(samplingThread);
return sampler;
}
diff --git a/common/src/main/java/me/lucko/spark/profiler/TickCounter.java b/common/src/main/java/me/lucko/spark/profiler/TickCounter.java
new file mode 100644
index 0000000..53a9c27
--- /dev/null
+++ b/common/src/main/java/me/lucko/spark/profiler/TickCounter.java
@@ -0,0 +1,39 @@
+package me.lucko.spark.profiler;
+
+/**
+ * A hook with the game's "tick loop".
+ */
+public interface TickCounter {
+
+ /**
+ * Starts the counter
+ */
+ void start();
+
+ /**
+ * Stops the counter
+ */
+ void close();
+
+ /**
+ * Gets the current tick number
+ *
+ * @return the current tick
+ */
+ long getCurrentTick();
+
+ /**
+ * Adds a task to be called each time the tick increments
+ *
+ * @param runnable the task
+ */
+ void addTickTask(Runnable runnable);
+
+ /**
+ * Removes a tick task
+ *
+ * @param runnable the task
+ */
+ void removeTickTask(Runnable runnable);
+
+}
diff --git a/common/src/main/java/me/lucko/spark/profiler/TickedDataAggregator.java b/common/src/main/java/me/lucko/spark/profiler/TickedDataAggregator.java
new file mode 100644
index 0000000..abca4b3
--- /dev/null
+++ b/common/src/main/java/me/lucko/spark/profiler/TickedDataAggregator.java
@@ -0,0 +1,147 @@
+package me.lucko.spark.profiler;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Implementation of {@link DataAggregator} which supports only including sampling data from "ticks"
+ * which exceed a certain threshold in duration.
+ */
+public class TickedDataAggregator implements DataAggregator {
+
+ /** A map of root stack nodes for each thread with sampling data */
+ private final Map<String, StackNode> threadData = new ConcurrentHashMap<>();
+
+ /** The worker pool for inserting stack nodes */
+ private final ExecutorService workerPool;
+
+ /** Used to monitor the current "tick" of the server */
+ private final TickCounter tickCounter;
+
+ /** The instance used to group threads together */
+ private final ThreadGrouper threadGrouper;
+
+ /** The interval to wait between sampling, in milliseconds */
+ private final int interval;
+
+ /** Tick durations under this threshold will not be inserted */
+ private final int tickLengthThreshold;
+
+ /** The expected number of samples in each tick */
+ private final int expectedSize;
+
+ // state
+ private long currentTick = -1;
+ private TickList currentData = new TickList(0);
+
+ public TickedDataAggregator(ExecutorService workerPool, TickCounter tickCounter, ThreadGrouper threadGrouper, int interval, int tickLengthThreshold) {
+ this.workerPool = workerPool;
+ this.tickCounter = tickCounter;
+ this.threadGrouper = threadGrouper;
+ this.interval = interval;
+ this.tickLengthThreshold = tickLengthThreshold;
+ // 50 millis in a tick, plus 10 so we have a bit of room to go over
+ this.expectedSize = (50 / interval) + 10;
+ }
+
+ // this is effectively synchronized by the Timer instance in Sampler
+ @Override
+ public void insertData(String threadName, StackTraceElement[] stack) {
+ long tick = this.tickCounter.getCurrentTick();
+ if (this.currentTick != tick) {
+ pushCurrentTick();
+ this.currentTick = tick;
+ this.currentData = new TickList(this.expectedSize);
+ }
+
+ // form the queued data
+ QueuedThreadInfo queuedData = new QueuedThreadInfo(threadName, stack);
+ // insert it
+ this.currentData.addData(queuedData);
+ }
+
+ private void pushCurrentTick() {
+ TickList currentData = this.currentData;
+
+ // approximate how long the tick lasted
+ int tickLengthMillis = currentData.getList().size() * this.interval;
+
+ // don't push data below the threshold
+ if (tickLengthMillis < this.tickLengthThreshold) {
+ return;
+ }
+
+ this.workerPool.submit(currentData);
+ }
+
+ @Override
+ public void start() {
+ this.tickCounter.start();
+ }
+
+ @Override
+ public Map<String, StackNode> getData() {
+ // push the current tick
+ pushCurrentTick();
+
+ // close the tick counter
+ this.tickCounter.close();
+
+ // wait for all pending data to be inserted
+ this.workerPool.shutdown();
+ try {
+ this.workerPool.awaitTermination(15, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ return this.threadData;
+ }
+
+ void insertData(List<QueuedThreadInfo> dataList) {
+ for (QueuedThreadInfo data : dataList) {
+ try {
+ String group = this.threadGrouper.getGroup(data.threadName);
+ StackNode node = this.threadData.computeIfAbsent(group, StackNode::new);
+ node.log(data.stack, this.interval);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ private final class TickList implements Runnable {
+ private final List<QueuedThreadInfo> list;
+
+ TickList(int expectedSize) {
+ this.list = new ArrayList<>(expectedSize);
+ }
+
+ @Override
+ public void run() {
+ insertData(this.list);
+ }
+
+ public List<QueuedThreadInfo> getList() {
+ return this.list;
+ }
+
+ public void addData(QueuedThreadInfo data) {
+ this.list.add(data);
+ }
+ }
+
+ private static final class QueuedThreadInfo {
+ private final String threadName;
+ private final StackTraceElement[] stack;
+
+ QueuedThreadInfo(String threadName, StackTraceElement[] stack) {
+ this.threadName = threadName;
+ this.stack = stack;
+ }
+ }
+}
diff --git a/sponge/src/main/java/me/lucko/spark/sponge/SparkSpongePlugin.java b/sponge/src/main/java/me/lucko/spark/sponge/SparkSpongePlugin.java
index 9149cb0..b77b13c 100644
--- a/sponge/src/main/java/me/lucko/spark/sponge/SparkSpongePlugin.java
+++ b/sponge/src/main/java/me/lucko/spark/sponge/SparkSpongePlugin.java
@@ -4,18 +4,20 @@ import com.google.inject.Inject;
import me.lucko.spark.common.CommandHandler;
import me.lucko.spark.profiler.ThreadDumper;
+import me.lucko.spark.profiler.TickCounter;
import me.lucko.spark.sponge.utils.PomData;
import org.spongepowered.api.Game;
+import org.spongepowered.api.Sponge;
import org.spongepowered.api.command.CommandCallable;
import org.spongepowered.api.command.CommandResult;
import org.spongepowered.api.command.CommandSource;
+import org.spongepowered.api.entity.living.player.Player;
import org.spongepowered.api.event.Listener;
import org.spongepowered.api.event.game.state.GameStartedServerEvent;
import org.spongepowered.api.plugin.Plugin;
import org.spongepowered.api.scheduler.AsynchronousExecutor;
import org.spongepowered.api.scheduler.SpongeExecutorService;
-import org.spongepowered.api.text.LiteralText;
import org.spongepowered.api.text.Text;
import org.spongepowered.api.text.action.TextActions;
import org.spongepowered.api.text.format.TextColors;
@@ -41,20 +43,38 @@ import javax.annotation.Nullable;
public class SparkSpongePlugin implements CommandCallable {
private final CommandHandler<CommandSource> commandHandler = new CommandHandler<CommandSource>() {
+ private Text colorize(String message) {
+ return TextSerializers.FORMATTING_CODE.deserialize(message);
+ }
+
+ private void broadcast(Text msg) {
+ Sponge.getServer().getConsole().sendMessage(msg);
+ for (Player player : Sponge.getServer().getOnlinePlayers()) {
+ if (player.hasPermission("spark.profiler")) {
+ player.sendMessage(msg);
+ }
+ }
+ }
+
@Override
protected void sendMessage(CommandSource sender, String message) {
- sender.sendMessage(TextSerializers.FORMATTING_CODE.deserialize(message));
+ sender.sendMessage(colorize(message));
+ }
+
+ @Override
+ protected void sendMessage(String message) {
+ Text msg = colorize(message);
+ broadcast(msg);
}
@Override
- protected void sendLink(CommandSource sender, String url) {
+ protected void sendLink(String url) {
try {
- LiteralText text = Text.builder(url)
+ Text msg = Text.builder(url)
.color(TextColors.GRAY)
.onClick(TextActions.openUrl(new URL(url)))
.build();
-
- sender.sendMessage(text);
+ broadcast(msg);
} catch (MalformedURLException e) {
e.printStackTrace();
}
@@ -69,6 +89,11 @@ public class SparkSpongePlugin implements CommandCallable {
protected ThreadDumper getDefaultThreadDumper() {
return new ThreadDumper.Specific(new long[]{Thread.currentThread().getId()});
}
+
+ @Override
+ protected TickCounter newTickCounter() {
+ return new SpongeTickCounter(SparkSpongePlugin.this);
+ }
};
@Inject
diff --git a/sponge/src/main/java/me/lucko/spark/sponge/SpongeTickCounter.java b/sponge/src/main/java/me/lucko/spark/sponge/SpongeTickCounter.java
new file mode 100644
index 0000000..bda2a69
--- /dev/null
+++ b/sponge/src/main/java/me/lucko/spark/sponge/SpongeTickCounter.java
@@ -0,0 +1,54 @@
+package me.lucko.spark.sponge;
+
+import me.lucko.spark.profiler.TickCounter;
+
+import org.spongepowered.api.scheduler.Task;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.atomic.LongAdder;
+
+public class SpongeTickCounter implements TickCounter, Runnable {
+ private final SparkSpongePlugin plugin;
+ private Task task;
+
+ private final Set<Runnable> tasks = new HashSet<>();
+ private final LongAdder tick = new LongAdder();
+
+ public SpongeTickCounter(SparkSpongePlugin plugin) {
+ this.plugin = plugin;
+ }
+
+ @Override
+ public void run() {
+ this.tick.increment();
+ for (Runnable r : this.tasks){
+ r.run();
+ }
+ }
+
+ @Override
+ public void start() {
+ this.task = Task.builder().intervalTicks(1).name("spark-ticker").execute(this).submit(this.plugin);
+ }
+
+ @Override
+ public void close() {
+ this.task.cancel();
+ }
+
+ @Override
+ public long getCurrentTick() {
+ return this.tick.longValue();
+ }
+
+ @Override
+ public void addTickTask(Runnable runnable) {
+ this.tasks.add(runnable);
+ }
+
+ @Override
+ public void removeTickTask(Runnable runnable) {
+ this.tasks.remove(runnable);
+ }
+}