aboutsummaryrefslogtreecommitdiff
path: root/spark-common/src/main/java/me/lucko
diff options
context:
space:
mode:
Diffstat (limited to 'spark-common/src/main/java/me/lucko')
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/CommandHandler.java372
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/TickMonitor.java88
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/http/Bytebin.java54
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/http/HttpClient.java113
-rw-r--r--spark-common/src/main/java/me/lucko/spark/profiler/AsyncDataAggregator.java77
-rw-r--r--spark-common/src/main/java/me/lucko/spark/profiler/DataAggregator.java32
-rw-r--r--spark-common/src/main/java/me/lucko/spark/profiler/Sampler.java170
-rw-r--r--spark-common/src/main/java/me/lucko/spark/profiler/SamplerBuilder.java63
-rw-r--r--spark-common/src/main/java/me/lucko/spark/profiler/StackNode.java141
-rw-r--r--spark-common/src/main/java/me/lucko/spark/profiler/StackTraceNode.java71
-rw-r--r--spark-common/src/main/java/me/lucko/spark/profiler/ThreadDumper.java77
-rw-r--r--spark-common/src/main/java/me/lucko/spark/profiler/ThreadGrouper.java52
-rw-r--r--spark-common/src/main/java/me/lucko/spark/profiler/TickCounter.java39
-rw-r--r--spark-common/src/main/java/me/lucko/spark/profiler/TickedDataAggregator.java147
14 files changed, 1496 insertions, 0 deletions
diff --git a/spark-common/src/main/java/me/lucko/spark/common/CommandHandler.java b/spark-common/src/main/java/me/lucko/spark/common/CommandHandler.java
new file mode 100644
index 0000000..898bba7
--- /dev/null
+++ b/spark-common/src/main/java/me/lucko/spark/common/CommandHandler.java
@@ -0,0 +1,372 @@
+package me.lucko.spark.common;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.SetMultimap;
+import com.google.common.collect.Sets;
+
+import me.lucko.spark.common.http.Bytebin;
+import me.lucko.spark.profiler.Sampler;
+import me.lucko.spark.profiler.SamplerBuilder;
+import me.lucko.spark.profiler.ThreadDumper;
+import me.lucko.spark.profiler.ThreadGrouper;
+import me.lucko.spark.profiler.TickCounter;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.Timer;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+/**
+ * Abstract command handling class used by all platforms.
+ *
+ * @param <T> the sender (e.g. CommandSender) type used by the platform
+ */
+public abstract class CommandHandler<T> {
+
+ /** The URL of the viewer frontend */
+ private static final String VIEWER_URL = "https://sparkprofiler.github.io/?";
+ /** The prefix used in all messages */
+ private static final String PREFIX = "&8[&fspark&8] &7";
+
+ /**
+ * The {@link Timer} being used by the {@link #activeSampler}.
+ */
+ private final Timer samplingThread = new Timer("spark-sampling-thread", true);
+
+ /** Guards {@link #activeSampler} */
+ private final Object[] activeSamplerMutex = new Object[0];
+ /** The WarmRoast instance currently running, if any */
+ private Sampler activeSampler = null;
+ /** The tick monitor instance currently running, if any */
+ private ReportingTickMonitor activeTickMonitor = null;
+
+
+ // abstract methods implemented by each platform
+
+ protected abstract void sendMessage(T sender, String message);
+ protected abstract void sendMessage(String message);
+ protected abstract void sendLink(String url);
+ protected abstract void runAsync(Runnable r);
+ protected abstract ThreadDumper getDefaultThreadDumper();
+ protected abstract TickCounter newTickCounter();
+
+ private void sendPrefixedMessage(T sender, String message) {
+ sendMessage(sender, PREFIX + message);
+ }
+
+ private void sendPrefixedMessage(String message) {
+ sendMessage(PREFIX + message);
+ }
+
+ public void handleCommand(T sender, String[] args) {
+ try {
+ if (args.length == 0) {
+ sendInfo(sender);
+ return;
+ }
+
+ List<String> arguments = new ArrayList<>(Arrays.asList(args));
+ switch (arguments.remove(0).toLowerCase()) {
+ case "start":
+ handleStart(sender, arguments);
+ break;
+ case "info":
+ handleInfo(sender);
+ break;
+ case "cancel":
+ handleCancel(sender);
+ break;
+ case "stop":
+ case "upload":
+ case "paste":
+ handleStop(sender);
+ break;
+ case "monitoring":
+ handleMonitoring(sender, arguments);
+ break;
+ default:
+ sendInfo(sender);
+ break;
+ }
+ } catch (IllegalArgumentException e) {
+ sendMessage(sender, "&c" + e.getMessage());
+ }
+ }
+
+ private void sendInfo(T sender) {
+ sendPrefixedMessage(sender, "&fspark profiler &7v1.0");
+ sendMessage(sender, "&b&l> &7/profiler start");
+ sendMessage(sender, " &8[&7--timeout&8 <timeout seconds>]");
+ sendMessage(sender, " &8[&7--thread&8 <thread name>]");
+ sendMessage(sender, " &8[&7--not-combined]");
+ sendMessage(sender, " &8[&7--interval&8 <interval millis>]");
+ sendMessage(sender, " &8[&7--only-ticks-over&8 <tick length millis>]");
+ sendMessage(sender, "&b&l> &7/profiler info");
+ sendMessage(sender, "&b&l> &7/profiler stop");
+ sendMessage(sender, "&b&l> &7/profiler cancel");
+ sendMessage(sender, "&b&l> &7/profiler monitoring");
+ sendMessage(sender, " &8[&7--threshold&8 <percentage increase>]");
+ }
+
+ private void handleStart(T sender, List<String> args) {
+ SetMultimap<String, String> arguments = parseArguments(args);
+
+ int timeoutSeconds = parseInt(arguments, "timeout", "d");
+ if (timeoutSeconds != -1 && timeoutSeconds <= 10) {
+ sendPrefixedMessage(sender, "&cThe specified timeout is not long enough for accurate results to be formed. Please choose a value greater than 10.");
+ return;
+ }
+
+ if (timeoutSeconds != -1 && timeoutSeconds < 30) {
+ sendPrefixedMessage(sender, "&7The accuracy of the output will significantly improve when sampling is able to run for longer periods. Consider setting a timeout value over 30 seconds.");
+ }
+
+ int intervalMillis = parseInt(arguments, "interval", "i");
+ if (intervalMillis <= 0) {
+ intervalMillis = 4;
+ }
+
+ Set<String> threads = Sets.union(arguments.get("thread"), arguments.get("t"));
+ ThreadDumper threadDumper;
+ if (threads.isEmpty()) {
+ // use the server thread
+ threadDumper = getDefaultThreadDumper();
+ } else if (threads.contains("*")) {
+ threadDumper = ThreadDumper.ALL;
+ } else {
+ threadDumper = new ThreadDumper.Specific(threads);
+ }
+
+ ThreadGrouper threadGrouper;
+ if (arguments.containsKey("not-combined")) {
+ threadGrouper = ThreadGrouper.BY_NAME;
+ } else {
+ threadGrouper = ThreadGrouper.BY_POOL;
+ }
+
+ int ticksOver = parseInt(arguments, "only-ticks-over", "o");
+ TickCounter tickCounter = null;
+ if (ticksOver != -1) {
+ try {
+ tickCounter = newTickCounter();
+ } catch (UnsupportedOperationException e) {
+ sendPrefixedMessage(sender, "&cTick counting is not supported on BungeeCord!");
+ return;
+ }
+ }
+
+ Sampler sampler;
+ synchronized (this.activeSamplerMutex) {
+ if (this.activeSampler != null) {
+ sendPrefixedMessage(sender, "&7An active sampler is already running.");
+ return;
+ }
+
+ sendPrefixedMessage("&7Initializing a new profiler, please wait...");
+
+ SamplerBuilder builder = new SamplerBuilder();
+ builder.threadDumper(threadDumper);
+ builder.threadGrouper(threadGrouper);
+ if (timeoutSeconds != -1) {
+ builder.completeAfter(timeoutSeconds, TimeUnit.SECONDS);
+ }
+ builder.samplingInterval(intervalMillis);
+ if (ticksOver != -1) {
+ builder.ticksOver(ticksOver, tickCounter);
+ }
+ sampler = this.activeSampler = builder.start(this.samplingThread);
+
+ sendPrefixedMessage("&bProfiler now active!");
+ if (timeoutSeconds == -1) {
+ sendPrefixedMessage("&7Use '/profiler stop' to stop profiling and upload the results.");
+ } else {
+ sendPrefixedMessage("&7The results will be automatically returned after the profiler has been running for " + timeoutSeconds + " seconds.");
+ }
+ }
+
+ CompletableFuture<Sampler> future = sampler.getFuture();
+
+ // send message if profiling fails
+ future.whenCompleteAsync((s, throwable) -> {
+ if (throwable != null) {
+ sendPrefixedMessage("&cSampling operation failed unexpectedly. Error: " + throwable.toString());
+ throwable.printStackTrace();
+ }
+ });
+
+ // set activeSampler to null when complete.
+ future.whenCompleteAsync((s, throwable) -> {
+ synchronized (this.activeSamplerMutex) {
+ if (sampler == this.activeSampler) {
+ this.activeSampler = null;
+ }
+ }
+ });
+
+ // await the result
+ if (timeoutSeconds != -1) {
+ future.thenAcceptAsync(s -> {
+ sendPrefixedMessage("&7The active sampling operation has completed! Uploading results...");
+ handleUpload(s);
+ });
+ }
+ }
+
+ private void handleInfo(T sender) {
+ synchronized (this.activeSamplerMutex) {
+ if (this.activeSampler == null) {
+ sendPrefixedMessage(sender, "&7There isn't an active sampling task running.");
+ } else {
+ long timeout = this.activeSampler.getEndTime();
+ if (timeout == -1) {
+ sendPrefixedMessage(sender, "&7There is an active sampler currently running, with no defined timeout.");
+ } else {
+ long timeoutDiff = (timeout - System.currentTimeMillis()) / 1000L;
+ sendPrefixedMessage(sender, "&7There is an active sampler currently running, due to timeout in " + timeoutDiff + " seconds.");
+ }
+
+ long runningTime = (System.currentTimeMillis() - this.activeSampler.getStartTime()) / 1000L;
+ sendPrefixedMessage(sender, "&7It has been sampling for " + runningTime + " seconds so far.");
+ }
+ }
+ }
+
+ private void handleStop(T sender) {
+ synchronized (this.activeSamplerMutex) {
+ if (this.activeSampler == null) {
+ sendPrefixedMessage(sender, "&7There isn't an active sampling task running.");
+ } else {
+ this.activeSampler.cancel();
+ sendPrefixedMessage("&7The active sampling operation has been stopped! Uploading results...");
+ handleUpload(this.activeSampler);
+ this.activeSampler = null;
+ }
+ }
+ }
+
+ private void handleCancel(T sender) {
+ synchronized (this.activeSamplerMutex) {
+ if (this.activeSampler == null) {
+ sendPrefixedMessage(sender, "&7There isn't an active sampling task running.");
+ } else {
+ this.activeSampler.cancel();
+ this.activeSampler = null;
+ sendPrefixedMessage("&bThe active sampling task has been cancelled.");
+ }
+ }
+ }
+
+ private void handleUpload(Sampler sampler) {
+ runAsync(() -> {
+ byte[] output = sampler.formCompressedDataPayload();
+ try {
+ String pasteId = Bytebin.postCompressedContent(output);
+ sendPrefixedMessage("&bSampling results:");
+ sendLink(VIEWER_URL + pasteId);
+ } catch (IOException e) {
+ sendPrefixedMessage("&cAn error occurred whilst uploading the results.");
+ e.printStackTrace();
+ }
+ });
+ }
+
+ private void handleMonitoring(T sender, List<String> args) {
+ SetMultimap<String, String> arguments = parseArguments(args);
+
+ if (this.activeTickMonitor == null) {
+
+ int threshold = parseInt(arguments, "threshold", "t");
+ if (threshold == -1) {
+ threshold = 100;
+ }
+
+ try {
+ TickCounter tickCounter = newTickCounter();
+ this.activeTickMonitor = new ReportingTickMonitor(tickCounter, threshold);
+ } catch (UnsupportedOperationException e) {
+ sendPrefixedMessage(sender, "&cNot supported on BungeeCord!");
+ }
+ } else {
+ this.activeTickMonitor.close();
+ this.activeTickMonitor = null;
+ sendPrefixedMessage("&7Tick monitor disabled.");
+ }
+ }
+
+ private class ReportingTickMonitor extends TickMonitor {
+ public ReportingTickMonitor(TickCounter tickCounter, int percentageChangeThreshold) {
+ super(tickCounter, percentageChangeThreshold);
+ }
+
+ @Override
+ protected void sendMessage(String message) {
+ sendPrefixedMessage(message);
+ }
+ }
+
+ private int parseInt(SetMultimap<String, String> arguments, String longArg, String shortArg) {
+ Iterator<String> it = Sets.union(arguments.get(longArg), arguments.get(shortArg)).iterator();
+ if (it.hasNext()) {
+ try {
+ return Math.abs(Integer.parseInt(it.next()));
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException("Invalid input for '" + longArg + "' argument. Please specify a number!");
+ }
+ }
+ return -1; // undefined
+ }
+
+ private static final Pattern FLAG_REGEX = Pattern.compile("--(.+)$|-([a-zA-z])$");
+
+ private static SetMultimap<String, String> parseArguments(List<String> args) {
+ SetMultimap<String, String> arguments = HashMultimap.create();
+
+ String flag = null;
+ List<String> value = null;
+
+ for (int i = 0; i < args.size(); i++) {
+ String arg = args.get(i);
+
+ Matcher matcher = FLAG_REGEX.matcher(arg);
+ boolean matches = matcher.matches();
+
+ if (flag == null || matches) {
+ if (!matches) {
+ throw new IllegalArgumentException("Expected flag at position " + i + " but got '" + arg + "' instead!");
+ }
+
+ String match = matcher.group(1);
+ if (match == null) {
+ match = matcher.group(2);
+ }
+
+ // store existing value, if present
+ if (flag != null) {
+ arguments.put(flag, value.stream().collect(Collectors.joining(" ")));
+ }
+
+ flag = match.toLowerCase();
+ value = new ArrayList<>();
+ } else {
+ // part of a value
+ value.add(arg);
+ }
+ }
+
+ // store remaining value, if present
+ if (flag != null) {
+ arguments.put(flag, value.stream().collect(Collectors.joining(" ")));
+ }
+
+ return arguments;
+ }
+
+}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/TickMonitor.java b/spark-common/src/main/java/me/lucko/spark/common/TickMonitor.java
new file mode 100644
index 0000000..a30a4db
--- /dev/null
+++ b/spark-common/src/main/java/me/lucko/spark/common/TickMonitor.java
@@ -0,0 +1,88 @@
+package me.lucko.spark.common;
+
+import me.lucko.spark.profiler.TickCounter;
+
+import java.text.DecimalFormat;
+import java.util.DoubleSummaryStatistics;
+
+public abstract class TickMonitor implements Runnable {
+ private static final DecimalFormat df = new DecimalFormat("#.##");
+
+ private final TickCounter tickCounter;
+ private final int percentageChangeThreshold;
+
+ // data
+ private double lastTickTime = 0;
+ private State state = null;
+ private DoubleSummaryStatistics averageTickTime = new DoubleSummaryStatistics();
+ private double avg;
+
+ public TickMonitor(TickCounter tickCounter, int percentageChangeThreshold) {
+ this.tickCounter = tickCounter;
+ this.percentageChangeThreshold = percentageChangeThreshold;
+
+ this.tickCounter.start();
+ this.tickCounter.addTickTask(this);
+ }
+
+ protected abstract void sendMessage(String message);
+
+ public void close() {
+ this.tickCounter.close();
+ }
+
+ @Override
+ public void run() {
+ double now = ((double) System.nanoTime()) / 1000000d;
+
+ // init
+ if (this.state == null) {
+ this.state = State.SETUP;
+ this.lastTickTime = now;
+ sendMessage("Tick monitor started. Before the monitor becomes fully active, the server's " +
+ "average tick rate will be calculated over a period of 120 ticks (approx 6 seconds).");
+ return;
+ }
+
+ // find the diff
+ double diff = now - this.lastTickTime;
+ this.lastTickTime = now;
+
+ // form averages
+ if (this.state == State.SETUP) {
+ this.averageTickTime.accept(diff);
+
+ // move onto the next state
+ if (this.averageTickTime.getCount() >= 120) {
+
+ sendMessage("&bAnalysis is now complete.");
+ sendMessage("&f> &7Max: " + df.format(this.averageTickTime.getMax()) + "ms");
+ sendMessage("&f> &7Min: " + df.format(this.averageTickTime.getMin()) + "ms");
+ sendMessage("&f> &7Avg: " + df.format(this.averageTickTime.getAverage()) + "ms");
+ sendMessage("Starting now, any ticks with >" + this.percentageChangeThreshold + "% increase in " +
+ "duration compared to the average will be reported.");
+
+ this.avg = this.averageTickTime.getAverage();
+ this.state = State.MONITORING;
+ }
+ }
+
+ if (this.state == State.MONITORING) {
+ double increase = diff - this.avg;
+ if (increase <= 0) {
+ return;
+ }
+
+ double percentageChange = (increase * 100d) / this.avg;
+ if (percentageChange > this.percentageChangeThreshold) {
+ sendMessage("&7Tick &8#" + this.tickCounter.getCurrentTick() + " &7lasted &b" + df.format(diff) + "&7 milliseconds. " +
+ "&7(&b" + df.format(percentageChange) + "% &7increase from average)");
+ }
+ }
+ }
+
+ private enum State {
+ SETUP,
+ MONITORING
+ }
+}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/http/Bytebin.java b/spark-common/src/main/java/me/lucko/spark/common/http/Bytebin.java
new file mode 100644
index 0000000..3cd5e4c
--- /dev/null
+++ b/spark-common/src/main/java/me/lucko/spark/common/http/Bytebin.java
@@ -0,0 +1,54 @@
+package me.lucko.spark.common.http;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+
+import okhttp3.MediaType;
+import okhttp3.Request;
+import okhttp3.RequestBody;
+import okhttp3.Response;
+import okhttp3.ResponseBody;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+
+/**
+ * Utility for uploading JSON data to bytebin.
+ */
+public final class Bytebin {
+
+ /** Media type for JSON data */
+ private static final MediaType JSON_TYPE = MediaType.parse("application/json; charset=utf-8");
+ /** The URL used to upload sampling data */
+ private static final String UPLOAD_ENDPOINT = "https://bytebin.lucko.me/post";
+
+ public static String postCompressedContent(byte[] buf) throws IOException {
+ RequestBody body = RequestBody.create(JSON_TYPE, buf);
+
+ Request.Builder requestBuilder = new Request.Builder()
+ .url(UPLOAD_ENDPOINT)
+ .header("Content-Encoding", "gzip")
+ .post(body);
+
+ Request request = requestBuilder.build();
+ try (Response response = HttpClient.makeCall(request)) {
+ try (ResponseBody responseBody = response.body()) {
+ if (responseBody == null) {
+ throw new RuntimeException("No response");
+ }
+
+ try (InputStream inputStream = responseBody.byteStream()) {
+ try (BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))) {
+ JsonObject object = new Gson().fromJson(reader, JsonObject.class);
+ return object.get("key").getAsString();
+ }
+ }
+ }
+ }
+ }
+
+ private Bytebin() {}
+}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/http/HttpClient.java b/spark-common/src/main/java/me/lucko/spark/common/http/HttpClient.java
new file mode 100644
index 0000000..61db597
--- /dev/null
+++ b/spark-common/src/main/java/me/lucko/spark/common/http/HttpClient.java
@@ -0,0 +1,113 @@
+/*
+ * This file is part of LuckPerms, licensed under the MIT License.
+ *
+ * Copyright (c) lucko (Luck) <luck@lucko.me>
+ * Copyright (c) contributors
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package me.lucko.spark.common.http;
+
+import okhttp3.Interceptor;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import okhttp3.ResponseBody;
+
+import java.io.IOException;
+import java.net.Proxy;
+import java.net.ProxySelector;
+import java.net.SocketAddress;
+import java.net.URI;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Utility class for making http requests.
+ */
+public final class HttpClient {
+ private static OkHttpClient client = null;
+
+ private static synchronized OkHttpClient getClient() {
+ if (client == null) {
+ client = new OkHttpClient.Builder()
+ .proxySelector(new NullSafeProxySelector())
+ .addInterceptor(new UserAgentInterceptor())
+ .build();
+ }
+ return client;
+ }
+
+ public static Response makeCall(Request request) throws IOException {
+ Response response = getClient().newCall(request).execute();
+ if (!response.isSuccessful()) {
+ throw exceptionForUnsuccessfulResponse(response);
+ }
+ return response;
+ }
+
+ private static RuntimeException exceptionForUnsuccessfulResponse(Response response) {
+ String msg = "";
+ try (ResponseBody responseBody = response.body()) {
+ if (responseBody != null) {
+ msg = responseBody.string();
+ }
+ } catch (IOException e) {
+ // ignore
+ }
+ return new RuntimeException("Got response: " + response.code() + " - " + response.message() + " - " + msg);
+ }
+
+ private static final class UserAgentInterceptor implements Interceptor {
+ @Override
+ public Response intercept(Chain chain) throws IOException {
+ Request orig = chain.request();
+ Request modified = orig.newBuilder()
+ .header("User-Agent", "spark-plugin")
+ .build();
+
+ return chain.proceed(modified);
+ }
+ }
+
+ // sometimes ProxySelector#getDefault returns null, and okhttp doesn't like that
+ private static final class NullSafeProxySelector extends ProxySelector {
+ private static final List<Proxy> DIRECT = Collections.singletonList(Proxy.NO_PROXY);
+
+ @Override
+ public List<Proxy> select(URI uri) {
+ ProxySelector def = ProxySelector.getDefault();
+ if (def == null) {
+ return DIRECT;
+ }
+ return def.select(uri);
+ }
+
+ @Override
+ public void connectFailed(URI uri, SocketAddress sa, IOException ioe) {
+ ProxySelector def = ProxySelector.getDefault();
+ if (def != null) {
+ def.connectFailed(uri, sa, ioe);
+ }
+ }
+ }
+
+ private HttpClient() {}
+} \ No newline at end of file
diff --git a/spark-common/src/main/java/me/lucko/spark/profiler/AsyncDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/profiler/AsyncDataAggregator.java
new file mode 100644
index 0000000..9a4090e
--- /dev/null
+++ b/spark-common/src/main/java/me/lucko/spark/profiler/AsyncDataAggregator.java
@@ -0,0 +1,77 @@
+package me.lucko.spark.profiler;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Implementation of {@link DataAggregator} that makes use of a "worker" thread pool for inserting
+ * data.
+ */
+public class AsyncDataAggregator implements DataAggregator {
+
+ /** A map of root stack nodes for each thread with sampling data */
+ private final Map<String, StackNode> threadData = new ConcurrentHashMap<>();
+
+ /** The worker pool for inserting stack nodes */
+ private final ExecutorService workerPool;
+
+ /** The instance used to group threads together */
+ private final ThreadGrouper threadGrouper;
+
+ /** The interval to wait between sampling, in milliseconds */
+ private final int interval;
+
+ public AsyncDataAggregator(ExecutorService workerPool, ThreadGrouper threadGrouper, int interval) {
+ this.workerPool = workerPool;
+ this.threadGrouper = threadGrouper;
+ this.interval = interval;
+ }
+
+ @Override
+ public void insertData(String threadName, StackTraceElement[] stack) {
+ // form the queued data
+ QueuedThreadInfo queuedData = new QueuedThreadInfo(threadName, stack);
+ // schedule insertion of the data
+ this.workerPool.execute(queuedData);
+ }
+
+ @Override
+ public Map<String, StackNode> getData() {
+ // wait for all pending data to be inserted
+ this.workerPool.shutdown();
+ try {
+ this.workerPool.awaitTermination(15, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ return this.threadData;
+ }
+
+ void insertData(QueuedThreadInfo data) {
+ try {
+ String group = this.threadGrouper.getGroup(data.threadName);
+ StackNode node = this.threadData.computeIfAbsent(group, StackNode::new);
+ node.log(data.stack, this.interval);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ private final class QueuedThreadInfo implements Runnable {
+ private final String threadName;
+ private final StackTraceElement[] stack;
+
+ QueuedThreadInfo(String threadName, StackTraceElement[] stack) {
+ this.threadName = threadName;
+ this.stack = stack;
+ }
+
+ @Override
+ public void run() {
+ insertData(this);
+ }
+ }
+}
diff --git a/spark-common/src/main/java/me/lucko/spark/profiler/DataAggregator.java b/spark-common/src/main/java/me/lucko/spark/profiler/DataAggregator.java
new file mode 100644
index 0000000..1afa52c
--- /dev/null
+++ b/spark-common/src/main/java/me/lucko/spark/profiler/DataAggregator.java
@@ -0,0 +1,32 @@
+package me.lucko.spark.profiler;
+
+import java.util.Map;
+
+/**
+ * Aggregates sampling data.
+ */
+public interface DataAggregator {
+
+ /**
+ * Called before the sampler begins to insert data
+ */
+ default void start() {
+
+ }
+
+ /**
+ * Forms the output data
+ *
+ * @return the output data
+ */
+ Map<String, StackNode> getData();
+
+ /**
+ * Inserts sampling data into this aggregator
+ *
+ * @param threadName the name of the thread
+ * @param stack the call stack
+ */
+ void insertData(String threadName, StackTraceElement[] stack);
+
+}
diff --git a/spark-common/src/main/java/me/lucko/spark/profiler/Sampler.java b/spark-common/src/main/java/me/lucko/spark/profiler/Sampler.java
new file mode 100644
index 0000000..3476f03
--- /dev/null
+++ b/spark-common/src/main/java/me/lucko/spark/profiler/Sampler.java
@@ -0,0 +1,170 @@
+/*
+ * WarmRoast
+ * Copyright (C) 2013 Albert Pham <http://www.sk89q.com>
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+package me.lucko.spark.profiler;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.gson.stream.JsonWriter;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadInfo;
+import java.lang.management.ThreadMXBean;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.zip.GZIPOutputStream;
+
+/**
+ * Main sampler class.
+ */
+public class Sampler extends TimerTask {
+ private static final AtomicInteger THREAD_ID = new AtomicInteger(0);
+
+ /** The worker pool for inserting stack nodes */
+ private final ExecutorService workerPool = Executors.newFixedThreadPool(
+ 6, new ThreadFactoryBuilder().setNameFormat("spark-worker-" + THREAD_ID.getAndIncrement()).build()
+ );
+
+ /** The thread management interface for the current JVM */
+ private final ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
+ /** The instance used to generate thread information for use in sampling */
+ private final ThreadDumper threadDumper;
+ /** Responsible for aggregating and then outputting collected sampling data */
+ private final DataAggregator dataAggregator;
+
+ /** A future to encapsulation the completion of this sampler instance */
+ private final CompletableFuture<Sampler> future = new CompletableFuture<>();
+
+ /** The interval to wait between sampling, in milliseconds */
+ private final int interval;
+ /** The time when sampling first began */
+ private long startTime = -1;
+ /** The unix timestamp (in millis) when this sampler should automatically complete.*/
+ private final long endTime; // -1 for nothing
+
+ public Sampler(int interval, ThreadDumper threadDumper, ThreadGrouper threadGrouper, long endTime) {
+ this.threadDumper = threadDumper;
+ this.dataAggregator = new AsyncDataAggregator(this.workerPool, threadGrouper, interval);
+ this.interval = interval;
+ this.endTime = endTime;
+ }
+
+ public Sampler(int interval, ThreadDumper threadDumper, ThreadGrouper threadGrouper, long endTime, TickCounter tickCounter, int tickLengthThreshold) {
+ this.threadDumper = threadDumper;
+ this.dataAggregator = new TickedDataAggregator(this.workerPool, tickCounter, threadGrouper, interval, tickLengthThreshold);
+ this.interval = interval;
+ this.endTime = endTime;
+ }
+
+ /**
+ * Starts the sampler.
+ *
+ * @param samplingThread the timer to schedule the sampling on
+ */
+ public void start(Timer samplingThread) {
+ this.startTime = System.currentTimeMillis();
+ this.dataAggregator.start();
+ samplingThread.scheduleAtFixedRate(this, 0, this.interval);
+ }
+
+ public long getStartTime() {
+ if (this.startTime == -1) {
+ throw new IllegalStateException("Not yet started");
+ }
+ return this.startTime;
+ }
+
+ public long getEndTime() {
+ return this.endTime;
+ }
+
+ public CompletableFuture<Sampler> getFuture() {
+ return this.future;
+ }
+
+ @Override
+ public void run() {
+ try {
+ if (this.endTime != -1 && this.endTime <= System.currentTimeMillis()) {
+ this.future.complete(this);
+ cancel();
+ return;
+ }
+
+ ThreadInfo[] threadDumps = this.threadDumper.dumpThreads(this.threadBean);
+ for (ThreadInfo threadInfo : threadDumps) {
+ String threadName = threadInfo.getThreadName();
+ StackTraceElement[] stack = threadInfo.getStackTrace();
+
+ if (threadName == null || stack == null) {
+ continue;
+ }
+
+ this.dataAggregator.insertData(threadName, stack);
+ }
+ } catch (Throwable t) {
+ this.future.completeExceptionally(t);
+ cancel();
+ }
+ }
+
+ private void writeOutput(JsonWriter writer) throws IOException {
+ writer.beginObject();
+
+ writer.name("threads").beginArray();
+
+ List<Map.Entry<String, StackNode>> data = new ArrayList<>(this.dataAggregator.getData().entrySet());
+ data.sort(Map.Entry.comparingByKey());
+
+ for (Map.Entry<String, StackNode> entry : data) {
+ writer.beginObject();
+ writer.name("threadName").value(entry.getKey());
+ writer.name("totalTime").value(entry.getValue().getTotalTime());
+ writer.name("rootNode");
+ entry.getValue().serializeTo(writer);
+ writer.endObject();
+ }
+
+ writer.endArray();
+ writer.endObject();
+ }
+
+ public byte[] formCompressedDataPayload() {
+ ByteArrayOutputStream byteOut = new ByteArrayOutputStream();
+ try (Writer writer = new OutputStreamWriter(new GZIPOutputStream(byteOut), StandardCharsets.UTF_8)) {
+ try (JsonWriter jsonWriter = new JsonWriter(writer)) {
+ writeOutput(jsonWriter);
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return byteOut.toByteArray();
+ }
+
+}
diff --git a/spark-common/src/main/java/me/lucko/spark/profiler/SamplerBuilder.java b/spark-common/src/main/java/me/lucko/spark/profiler/SamplerBuilder.java
new file mode 100644
index 0000000..7db0515
--- /dev/null
+++ b/spark-common/src/main/java/me/lucko/spark/profiler/SamplerBuilder.java
@@ -0,0 +1,63 @@
+package me.lucko.spark.profiler;
+
+import java.util.Timer;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Builds {@link Sampler} instances.
+ */
+public class SamplerBuilder {
+
+ private int samplingInterval = 4;
+ private long timeout = -1;
+ private ThreadDumper threadDumper = ThreadDumper.ALL;
+ private ThreadGrouper threadGrouper = ThreadGrouper.BY_NAME;
+
+ private int ticksOver = -1;
+ private TickCounter tickCounter = null;
+
+ public SamplerBuilder() {
+ }
+
+ public SamplerBuilder samplingInterval(int samplingInterval) {
+ this.samplingInterval = samplingInterval;
+ return this;
+ }
+
+ public SamplerBuilder completeAfter(long timeout, TimeUnit unit) {
+ if (timeout <= 0) {
+ throw new IllegalArgumentException("timeout > 0");
+ }
+ this.timeout = System.currentTimeMillis() + unit.toMillis(timeout);
+ return this;
+ }
+
+ public SamplerBuilder threadDumper(ThreadDumper threadDumper) {
+ this.threadDumper = threadDumper;
+ return this;
+ }
+
+ public SamplerBuilder threadGrouper(ThreadGrouper threadGrouper) {
+ this.threadGrouper = threadGrouper;
+ return this;
+ }
+
+ public SamplerBuilder ticksOver(int ticksOver, TickCounter tickCounter) {
+ this.ticksOver = ticksOver;
+ this.tickCounter = tickCounter;
+ return this;
+ }
+
+ public Sampler start(Timer samplingThread) {
+ Sampler sampler;
+ if (this.ticksOver != -1 && this.tickCounter != null) {
+ sampler = new Sampler(this.samplingInterval, this.threadDumper, this.threadGrouper, this.timeout, this.tickCounter, this.ticksOver);
+ } else {
+ sampler = new Sampler(this.samplingInterval, this.threadDumper, this.threadGrouper, this.timeout);
+ }
+
+ sampler.start(samplingThread);
+ return sampler;
+ }
+
+}
diff --git a/spark-common/src/main/java/me/lucko/spark/profiler/StackNode.java b/spark-common/src/main/java/me/lucko/spark/profiler/StackNode.java
new file mode 100644
index 0000000..575400a
--- /dev/null
+++ b/spark-common/src/main/java/me/lucko/spark/profiler/StackNode.java
@@ -0,0 +1,141 @@
+/*
+ * WarmRoast
+ * Copyright (C) 2013 Albert Pham <http://www.sk89q.com>
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+package me.lucko.spark.profiler;
+
+import com.google.gson.stream.JsonWriter;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.LongAdder;
+
+/**
+ * Represents a node in the overall sampling stack.
+ *
+ * <p>The base implementation of this class is only used for the root of node structures. The
+ * {@link StackTraceNode} class is used for representing method calls in the structure.</p>
+ */
+public class StackNode implements Comparable<StackNode> {
+
+ private static final int MAX_STACK_DEPTH = 300;
+
+ /**
+ * The name of this node
+ */
+ private final String name;
+
+ /**
+ * A map of this nodes children
+ */
+ private final Map<String, StackNode> children = new ConcurrentHashMap<>();
+
+ /**
+ * The accumulated sample time for this node
+ */
+ private final LongAdder totalTime = new LongAdder();
+
+ public StackNode(String name) {
+ this.name = name;
+ }
+
+ public String getName() {
+ return this.name;
+ }
+
+ public Collection<StackNode> getChildren() {
+ if (this.children.isEmpty()) {
+ return Collections.emptyList();
+ }
+
+ List<StackNode> list = new ArrayList<>(this.children.values());
+ list.sort(null);
+ return list;
+ }
+
+ private StackNode resolveChild(String name) {
+ return this.children.computeIfAbsent(name, StackNode::new);
+ }
+
+ private StackNode resolveChild(String className, String methodName) {
+ return this.children.computeIfAbsent(StackTraceNode.formName(className, methodName), name -> new StackTraceNode(className, methodName));
+ }
+
+ public long getTotalTime() {
+ return this.totalTime.longValue();
+ }
+
+ public void accumulateTime(long time) {
+ this.totalTime.add(time);
+ }
+
+ private void log(StackTraceElement[] elements, int skip, long time) {
+ accumulateTime(time);
+
+ if (skip >= MAX_STACK_DEPTH) {
+ return;
+ }
+
+ if (elements.length - skip == 0) {
+ return;
+ }
+
+ StackTraceElement bottom = elements[elements.length - (skip + 1)];
+ resolveChild(bottom.getClassName(), bottom.getMethodName()).log(elements, skip + 1, time);
+ }
+
+ public void log(StackTraceElement[] elements, long time) {
+ log(elements, 0, time);
+ }
+
+ @Override
+ public int compareTo(StackNode o) {
+ return getName().compareTo(o.getName());
+ }
+
+ public void serializeTo(JsonWriter writer) throws IOException {
+ writer.beginObject();
+
+ // append metadata about this node
+ appendMetadata(writer);
+
+ // include the total time recorded for this node
+ writer.name("totalTime").value(getTotalTime());
+
+ // append child nodes, if any are present
+ Collection<StackNode> childNodes = getChildren();
+ if (!childNodes.isEmpty()) {
+ writer.name("children").beginArray();
+ for (StackNode child : childNodes) {
+ child.serializeTo(writer);
+ }
+ writer.endArray();
+ }
+
+ writer.endObject();
+ }
+
+ protected void appendMetadata(JsonWriter writer) throws IOException {
+ writer.name("name").value(getName());
+ }
+
+}
diff --git a/spark-common/src/main/java/me/lucko/spark/profiler/StackTraceNode.java b/spark-common/src/main/java/me/lucko/spark/profiler/StackTraceNode.java
new file mode 100644
index 0000000..d46a547
--- /dev/null
+++ b/spark-common/src/main/java/me/lucko/spark/profiler/StackTraceNode.java
@@ -0,0 +1,71 @@
+/*
+ * WarmRoast
+ * Copyright (C) 2013 Albert Pham <http://www.sk89q.com>
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+package me.lucko.spark.profiler;
+
+import com.google.gson.stream.JsonWriter;
+
+import java.io.IOException;
+
+/**
+ * Represents a {@link StackNode node} for a method call.
+ */
+public class StackTraceNode extends StackNode {
+
+ /**
+ * Forms the {@link StackNode#getName()} for a {@link StackTraceNode}.
+ *
+ * @param className the name of the class
+ * @param methodName the name of the method
+ * @return the name
+ */
+ static String formName(String className, String methodName) {
+ return className + "." + methodName + "()";
+ }
+
+ /** The name of the class */
+ private final String className;
+ /** The name of the method */
+ private final String methodName;
+
+ public StackTraceNode(String className, String methodName) {
+ super(formName(className, methodName));
+ this.className = className;
+ this.methodName = methodName;
+ }
+
+ public String getClassName() {
+ return this.className;
+ }
+
+ public String getMethodName() {
+ return this.methodName;
+ }
+
+ @Override
+ protected void appendMetadata(JsonWriter writer) throws IOException {
+ writer.name("className").value(this.className);
+ writer.name("methodName").value(this.methodName);
+ }
+
+ @Override
+ public int compareTo(StackNode that) {
+ return Long.compare(that.getTotalTime(), this.getTotalTime());
+ }
+
+}
diff --git a/spark-common/src/main/java/me/lucko/spark/profiler/ThreadDumper.java b/spark-common/src/main/java/me/lucko/spark/profiler/ThreadDumper.java
new file mode 100644
index 0000000..68d7dc9
--- /dev/null
+++ b/spark-common/src/main/java/me/lucko/spark/profiler/ThreadDumper.java
@@ -0,0 +1,77 @@
+/*
+ * WarmRoast
+ * Copyright (C) 2013 Albert Pham <http://www.sk89q.com>
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+package me.lucko.spark.profiler;
+
+import java.lang.management.ThreadInfo;
+import java.lang.management.ThreadMXBean;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Uses the {@link ThreadMXBean} to generate {@link ThreadInfo} instances for the threads being
+ * sampled.
+ */
+@FunctionalInterface
+public interface ThreadDumper {
+
+ /**
+ * Generates {@link ThreadInfo} data for the sampled threads.
+ *
+ * @param threadBean the thread bean instance to obtain the data from
+ * @return an array of generated thread info instances
+ */
+ ThreadInfo[] dumpThreads(ThreadMXBean threadBean);
+
+ /**
+ * Implementation of {@link ThreadDumper} that generates data for all threads.
+ */
+ ThreadDumper ALL = new All();
+
+ final class All implements ThreadDumper {
+ @Override
+ public ThreadInfo[] dumpThreads(ThreadMXBean threadBean) {
+ return threadBean.dumpAllThreads(false, false);
+ }
+ }
+
+ /**
+ * Implementation of {@link ThreadDumper} that generates data for a specific set of threads.
+ */
+ final class Specific implements ThreadDumper {
+ private final long[] ids;
+
+ public Specific(long[] ids) {
+ this.ids = ids;
+ }
+
+ public Specific(Set<String> names) {
+ Set<String> threadNamesLower = names.stream().map(String::toLowerCase).collect(Collectors.toSet());
+ this.ids = Thread.getAllStackTraces().keySet().stream()
+ .filter(t -> threadNamesLower.contains(t.getName().toLowerCase()))
+ .mapToLong(Thread::getId)
+ .toArray();
+ }
+
+ @Override
+ public ThreadInfo[] dumpThreads(ThreadMXBean threadBean) {
+ return threadBean.getThreadInfo(this.ids, Integer.MAX_VALUE);
+ }
+ }
+
+}
diff --git a/spark-common/src/main/java/me/lucko/spark/profiler/ThreadGrouper.java b/spark-common/src/main/java/me/lucko/spark/profiler/ThreadGrouper.java
new file mode 100644
index 0000000..56a6cc4
--- /dev/null
+++ b/spark-common/src/main/java/me/lucko/spark/profiler/ThreadGrouper.java
@@ -0,0 +1,52 @@
+package me.lucko.spark.profiler;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Function for grouping threads together
+ */
+@FunctionalInterface
+public interface ThreadGrouper {
+
+ /**
+ * Gets the group for the given thread.
+ *
+ * @param threadName the name of the thread
+ * @return the group
+ */
+ String getGroup(String threadName);
+
+ /**
+ * Implementation of {@link ThreadGrouper} that just groups by thread name.
+ */
+ ThreadGrouper BY_NAME = new ByName();
+
+ final class ByName implements ThreadGrouper {
+ @Override
+ public String getGroup(String threadName) {
+ return threadName;
+ }
+ }
+
+ /**
+ * Implementation of {@link ThreadGrouper} that attempts to group by the name of the pool
+ * the thread originated from.
+ */
+ ThreadGrouper BY_POOL = new ByPool();
+
+ final class ByPool implements ThreadGrouper {
+ private static final Pattern THREAD_POOL_PATTERN = Pattern.compile("^(.*)[-#] ?\\d+$");
+
+ @Override
+ public String getGroup(String threadName) {
+ Matcher matcher = THREAD_POOL_PATTERN.matcher(threadName);
+ if (!matcher.matches()) {
+ return threadName;
+ }
+
+ return matcher.group(1).trim() + " (Combined)";
+ }
+ }
+
+}
diff --git a/spark-common/src/main/java/me/lucko/spark/profiler/TickCounter.java b/spark-common/src/main/java/me/lucko/spark/profiler/TickCounter.java
new file mode 100644
index 0000000..53a9c27
--- /dev/null
+++ b/spark-common/src/main/java/me/lucko/spark/profiler/TickCounter.java
@@ -0,0 +1,39 @@
+package me.lucko.spark.profiler;
+
+/**
+ * A hook with the game's "tick loop".
+ */
+public interface TickCounter {
+
+ /**
+ * Starts the counter
+ */
+ void start();
+
+ /**
+ * Stops the counter
+ */
+ void close();
+
+ /**
+ * Gets the current tick number
+ *
+ * @return the current tick
+ */
+ long getCurrentTick();
+
+ /**
+ * Adds a task to be called each time the tick increments
+ *
+ * @param runnable the task
+ */
+ void addTickTask(Runnable runnable);
+
+ /**
+ * Removes a tick task
+ *
+ * @param runnable the task
+ */
+ void removeTickTask(Runnable runnable);
+
+}
diff --git a/spark-common/src/main/java/me/lucko/spark/profiler/TickedDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/profiler/TickedDataAggregator.java
new file mode 100644
index 0000000..abca4b3
--- /dev/null
+++ b/spark-common/src/main/java/me/lucko/spark/profiler/TickedDataAggregator.java
@@ -0,0 +1,147 @@
+package me.lucko.spark.profiler;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Implementation of {@link DataAggregator} which supports only including sampling data from "ticks"
+ * which exceed a certain threshold in duration.
+ */
+public class TickedDataAggregator implements DataAggregator {
+
+ /** A map of root stack nodes for each thread with sampling data */
+ private final Map<String, StackNode> threadData = new ConcurrentHashMap<>();
+
+ /** The worker pool for inserting stack nodes */
+ private final ExecutorService workerPool;
+
+ /** Used to monitor the current "tick" of the server */
+ private final TickCounter tickCounter;
+
+ /** The instance used to group threads together */
+ private final ThreadGrouper threadGrouper;
+
+ /** The interval to wait between sampling, in milliseconds */
+ private final int interval;
+
+ /** Tick durations under this threshold will not be inserted */
+ private final int tickLengthThreshold;
+
+ /** The expected number of samples in each tick */
+ private final int expectedSize;
+
+ // state
+ private long currentTick = -1;
+ private TickList currentData = new TickList(0);
+
+ public TickedDataAggregator(ExecutorService workerPool, TickCounter tickCounter, ThreadGrouper threadGrouper, int interval, int tickLengthThreshold) {
+ this.workerPool = workerPool;
+ this.tickCounter = tickCounter;
+ this.threadGrouper = threadGrouper;
+ this.interval = interval;
+ this.tickLengthThreshold = tickLengthThreshold;
+ // 50 millis in a tick, plus 10 so we have a bit of room to go over
+ this.expectedSize = (50 / interval) + 10;
+ }
+
+ // this is effectively synchronized by the Timer instance in Sampler
+ @Override
+ public void insertData(String threadName, StackTraceElement[] stack) {
+ long tick = this.tickCounter.getCurrentTick();
+ if (this.currentTick != tick) {
+ pushCurrentTick();
+ this.currentTick = tick;
+ this.currentData = new TickList(this.expectedSize);
+ }
+
+ // form the queued data
+ QueuedThreadInfo queuedData = new QueuedThreadInfo(threadName, stack);
+ // insert it
+ this.currentData.addData(queuedData);
+ }
+
+ private void pushCurrentTick() {
+ TickList currentData = this.currentData;
+
+ // approximate how long the tick lasted
+ int tickLengthMillis = currentData.getList().size() * this.interval;
+
+ // don't push data below the threshold
+ if (tickLengthMillis < this.tickLengthThreshold) {
+ return;
+ }
+
+ this.workerPool.submit(currentData);
+ }
+
+ @Override
+ public void start() {
+ this.tickCounter.start();
+ }
+
+ @Override
+ public Map<String, StackNode> getData() {
+ // push the current tick
+ pushCurrentTick();
+
+ // close the tick counter
+ this.tickCounter.close();
+
+ // wait for all pending data to be inserted
+ this.workerPool.shutdown();
+ try {
+ this.workerPool.awaitTermination(15, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ return this.threadData;
+ }
+
+ void insertData(List<QueuedThreadInfo> dataList) {
+ for (QueuedThreadInfo data : dataList) {
+ try {
+ String group = this.threadGrouper.getGroup(data.threadName);
+ StackNode node = this.threadData.computeIfAbsent(group, StackNode::new);
+ node.log(data.stack, this.interval);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ private final class TickList implements Runnable {
+ private final List<QueuedThreadInfo> list;
+
+ TickList(int expectedSize) {
+ this.list = new ArrayList<>(expectedSize);
+ }
+
+ @Override
+ public void run() {
+ insertData(this.list);
+ }
+
+ public List<QueuedThreadInfo> getList() {
+ return this.list;
+ }
+
+ public void addData(QueuedThreadInfo data) {
+ this.list.add(data);
+ }
+ }
+
+ private static final class QueuedThreadInfo {
+ private final String threadName;
+ private final StackTraceElement[] stack;
+
+ QueuedThreadInfo(String threadName, StackTraceElement[] stack) {
+ this.threadName = threadName;
+ this.stack = stack;
+ }
+ }
+}