diff options
Diffstat (limited to 'spark-common')
15 files changed, 1502 insertions, 0 deletions
diff --git a/spark-common/build.gradle b/spark-common/build.gradle new file mode 100644 index 0000000..d59ce24 --- /dev/null +++ b/spark-common/build.gradle @@ -0,0 +1,6 @@ +dependencies { + compile 'com.squareup.okhttp3:okhttp:3.10.0' + compile 'com.squareup.okio:okio:1.14.0' + compileOnly 'com.google.code.gson:gson:2.7' + compileOnly 'com.google.guava:guava:19.0' +} diff --git a/spark-common/src/main/java/me/lucko/spark/common/CommandHandler.java b/spark-common/src/main/java/me/lucko/spark/common/CommandHandler.java new file mode 100644 index 0000000..898bba7 --- /dev/null +++ b/spark-common/src/main/java/me/lucko/spark/common/CommandHandler.java @@ -0,0 +1,372 @@ +package me.lucko.spark.common; + +import com.google.common.collect.HashMultimap; +import com.google.common.collect.SetMultimap; +import com.google.common.collect.Sets; + +import me.lucko.spark.common.http.Bytebin; +import me.lucko.spark.profiler.Sampler; +import me.lucko.spark.profiler.SamplerBuilder; +import me.lucko.spark.profiler.ThreadDumper; +import me.lucko.spark.profiler.ThreadGrouper; +import me.lucko.spark.profiler.TickCounter; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Set; +import java.util.Timer; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +/** + * Abstract command handling class used by all platforms. + * + * @param <T> the sender (e.g. CommandSender) type used by the platform + */ +public abstract class CommandHandler<T> { + + /** The URL of the viewer frontend */ + private static final String VIEWER_URL = "https://sparkprofiler.github.io/?"; + /** The prefix used in all messages */ + private static final String PREFIX = "&8[&fspark&8] &7"; + + /** + * The {@link Timer} being used by the {@link #activeSampler}. + */ + private final Timer samplingThread = new Timer("spark-sampling-thread", true); + + /** Guards {@link #activeSampler} */ + private final Object[] activeSamplerMutex = new Object[0]; + /** The WarmRoast instance currently running, if any */ + private Sampler activeSampler = null; + /** The tick monitor instance currently running, if any */ + private ReportingTickMonitor activeTickMonitor = null; + + + // abstract methods implemented by each platform + + protected abstract void sendMessage(T sender, String message); + protected abstract void sendMessage(String message); + protected abstract void sendLink(String url); + protected abstract void runAsync(Runnable r); + protected abstract ThreadDumper getDefaultThreadDumper(); + protected abstract TickCounter newTickCounter(); + + private void sendPrefixedMessage(T sender, String message) { + sendMessage(sender, PREFIX + message); + } + + private void sendPrefixedMessage(String message) { + sendMessage(PREFIX + message); + } + + public void handleCommand(T sender, String[] args) { + try { + if (args.length == 0) { + sendInfo(sender); + return; + } + + List<String> arguments = new ArrayList<>(Arrays.asList(args)); + switch (arguments.remove(0).toLowerCase()) { + case "start": + handleStart(sender, arguments); + break; + case "info": + handleInfo(sender); + break; + case "cancel": + handleCancel(sender); + break; + case "stop": + case "upload": + case "paste": + handleStop(sender); + break; + case "monitoring": + handleMonitoring(sender, arguments); + break; + default: + sendInfo(sender); + break; + } + } catch (IllegalArgumentException e) { + sendMessage(sender, "&c" + e.getMessage()); + } + } + + private void sendInfo(T sender) { + sendPrefixedMessage(sender, "&fspark profiler &7v1.0"); + sendMessage(sender, "&b&l> &7/profiler start"); + sendMessage(sender, " &8[&7--timeout&8 <timeout seconds>]"); + sendMessage(sender, " &8[&7--thread&8 <thread name>]"); + sendMessage(sender, " &8[&7--not-combined]"); + sendMessage(sender, " &8[&7--interval&8 <interval millis>]"); + sendMessage(sender, " &8[&7--only-ticks-over&8 <tick length millis>]"); + sendMessage(sender, "&b&l> &7/profiler info"); + sendMessage(sender, "&b&l> &7/profiler stop"); + sendMessage(sender, "&b&l> &7/profiler cancel"); + sendMessage(sender, "&b&l> &7/profiler monitoring"); + sendMessage(sender, " &8[&7--threshold&8 <percentage increase>]"); + } + + private void handleStart(T sender, List<String> args) { + SetMultimap<String, String> arguments = parseArguments(args); + + int timeoutSeconds = parseInt(arguments, "timeout", "d"); + if (timeoutSeconds != -1 && timeoutSeconds <= 10) { + sendPrefixedMessage(sender, "&cThe specified timeout is not long enough for accurate results to be formed. Please choose a value greater than 10."); + return; + } + + if (timeoutSeconds != -1 && timeoutSeconds < 30) { + sendPrefixedMessage(sender, "&7The accuracy of the output will significantly improve when sampling is able to run for longer periods. Consider setting a timeout value over 30 seconds."); + } + + int intervalMillis = parseInt(arguments, "interval", "i"); + if (intervalMillis <= 0) { + intervalMillis = 4; + } + + Set<String> threads = Sets.union(arguments.get("thread"), arguments.get("t")); + ThreadDumper threadDumper; + if (threads.isEmpty()) { + // use the server thread + threadDumper = getDefaultThreadDumper(); + } else if (threads.contains("*")) { + threadDumper = ThreadDumper.ALL; + } else { + threadDumper = new ThreadDumper.Specific(threads); + } + + ThreadGrouper threadGrouper; + if (arguments.containsKey("not-combined")) { + threadGrouper = ThreadGrouper.BY_NAME; + } else { + threadGrouper = ThreadGrouper.BY_POOL; + } + + int ticksOver = parseInt(arguments, "only-ticks-over", "o"); + TickCounter tickCounter = null; + if (ticksOver != -1) { + try { + tickCounter = newTickCounter(); + } catch (UnsupportedOperationException e) { + sendPrefixedMessage(sender, "&cTick counting is not supported on BungeeCord!"); + return; + } + } + + Sampler sampler; + synchronized (this.activeSamplerMutex) { + if (this.activeSampler != null) { + sendPrefixedMessage(sender, "&7An active sampler is already running."); + return; + } + + sendPrefixedMessage("&7Initializing a new profiler, please wait..."); + + SamplerBuilder builder = new SamplerBuilder(); + builder.threadDumper(threadDumper); + builder.threadGrouper(threadGrouper); + if (timeoutSeconds != -1) { + builder.completeAfter(timeoutSeconds, TimeUnit.SECONDS); + } + builder.samplingInterval(intervalMillis); + if (ticksOver != -1) { + builder.ticksOver(ticksOver, tickCounter); + } + sampler = this.activeSampler = builder.start(this.samplingThread); + + sendPrefixedMessage("&bProfiler now active!"); + if (timeoutSeconds == -1) { + sendPrefixedMessage("&7Use '/profiler stop' to stop profiling and upload the results."); + } else { + sendPrefixedMessage("&7The results will be automatically returned after the profiler has been running for " + timeoutSeconds + " seconds."); + } + } + + CompletableFuture<Sampler> future = sampler.getFuture(); + + // send message if profiling fails + future.whenCompleteAsync((s, throwable) -> { + if (throwable != null) { + sendPrefixedMessage("&cSampling operation failed unexpectedly. Error: " + throwable.toString()); + throwable.printStackTrace(); + } + }); + + // set activeSampler to null when complete. + future.whenCompleteAsync((s, throwable) -> { + synchronized (this.activeSamplerMutex) { + if (sampler == this.activeSampler) { + this.activeSampler = null; + } + } + }); + + // await the result + if (timeoutSeconds != -1) { + future.thenAcceptAsync(s -> { + sendPrefixedMessage("&7The active sampling operation has completed! Uploading results..."); + handleUpload(s); + }); + } + } + + private void handleInfo(T sender) { + synchronized (this.activeSamplerMutex) { + if (this.activeSampler == null) { + sendPrefixedMessage(sender, "&7There isn't an active sampling task running."); + } else { + long timeout = this.activeSampler.getEndTime(); + if (timeout == -1) { + sendPrefixedMessage(sender, "&7There is an active sampler currently running, with no defined timeout."); + } else { + long timeoutDiff = (timeout - System.currentTimeMillis()) / 1000L; + sendPrefixedMessage(sender, "&7There is an active sampler currently running, due to timeout in " + timeoutDiff + " seconds."); + } + + long runningTime = (System.currentTimeMillis() - this.activeSampler.getStartTime()) / 1000L; + sendPrefixedMessage(sender, "&7It has been sampling for " + runningTime + " seconds so far."); + } + } + } + + private void handleStop(T sender) { + synchronized (this.activeSamplerMutex) { + if (this.activeSampler == null) { + sendPrefixedMessage(sender, "&7There isn't an active sampling task running."); + } else { + this.activeSampler.cancel(); + sendPrefixedMessage("&7The active sampling operation has been stopped! Uploading results..."); + handleUpload(this.activeSampler); + this.activeSampler = null; + } + } + } + + private void handleCancel(T sender) { + synchronized (this.activeSamplerMutex) { + if (this.activeSampler == null) { + sendPrefixedMessage(sender, "&7There isn't an active sampling task running."); + } else { + this.activeSampler.cancel(); + this.activeSampler = null; + sendPrefixedMessage("&bThe active sampling task has been cancelled."); + } + } + } + + private void handleUpload(Sampler sampler) { + runAsync(() -> { + byte[] output = sampler.formCompressedDataPayload(); + try { + String pasteId = Bytebin.postCompressedContent(output); + sendPrefixedMessage("&bSampling results:"); + sendLink(VIEWER_URL + pasteId); + } catch (IOException e) { + sendPrefixedMessage("&cAn error occurred whilst uploading the results."); + e.printStackTrace(); + } + }); + } + + private void handleMonitoring(T sender, List<String> args) { + SetMultimap<String, String> arguments = parseArguments(args); + + if (this.activeTickMonitor == null) { + + int threshold = parseInt(arguments, "threshold", "t"); + if (threshold == -1) { + threshold = 100; + } + + try { + TickCounter tickCounter = newTickCounter(); + this.activeTickMonitor = new ReportingTickMonitor(tickCounter, threshold); + } catch (UnsupportedOperationException e) { + sendPrefixedMessage(sender, "&cNot supported on BungeeCord!"); + } + } else { + this.activeTickMonitor.close(); + this.activeTickMonitor = null; + sendPrefixedMessage("&7Tick monitor disabled."); + } + } + + private class ReportingTickMonitor extends TickMonitor { + public ReportingTickMonitor(TickCounter tickCounter, int percentageChangeThreshold) { + super(tickCounter, percentageChangeThreshold); + } + + @Override + protected void sendMessage(String message) { + sendPrefixedMessage(message); + } + } + + private int parseInt(SetMultimap<String, String> arguments, String longArg, String shortArg) { + Iterator<String> it = Sets.union(arguments.get(longArg), arguments.get(shortArg)).iterator(); + if (it.hasNext()) { + try { + return Math.abs(Integer.parseInt(it.next())); + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Invalid input for '" + longArg + "' argument. Please specify a number!"); + } + } + return -1; // undefined + } + + private static final Pattern FLAG_REGEX = Pattern.compile("--(.+)$|-([a-zA-z])$"); + + private static SetMultimap<String, String> parseArguments(List<String> args) { + SetMultimap<String, String> arguments = HashMultimap.create(); + + String flag = null; + List<String> value = null; + + for (int i = 0; i < args.size(); i++) { + String arg = args.get(i); + + Matcher matcher = FLAG_REGEX.matcher(arg); + boolean matches = matcher.matches(); + + if (flag == null || matches) { + if (!matches) { + throw new IllegalArgumentException("Expected flag at position " + i + " but got '" + arg + "' instead!"); + } + + String match = matcher.group(1); + if (match == null) { + match = matcher.group(2); + } + + // store existing value, if present + if (flag != null) { + arguments.put(flag, value.stream().collect(Collectors.joining(" "))); + } + + flag = match.toLowerCase(); + value = new ArrayList<>(); + } else { + // part of a value + value.add(arg); + } + } + + // store remaining value, if present + if (flag != null) { + arguments.put(flag, value.stream().collect(Collectors.joining(" "))); + } + + return arguments; + } + +} diff --git a/spark-common/src/main/java/me/lucko/spark/common/TickMonitor.java b/spark-common/src/main/java/me/lucko/spark/common/TickMonitor.java new file mode 100644 index 0000000..a30a4db --- /dev/null +++ b/spark-common/src/main/java/me/lucko/spark/common/TickMonitor.java @@ -0,0 +1,88 @@ +package me.lucko.spark.common; + +import me.lucko.spark.profiler.TickCounter; + +import java.text.DecimalFormat; +import java.util.DoubleSummaryStatistics; + +public abstract class TickMonitor implements Runnable { + private static final DecimalFormat df = new DecimalFormat("#.##"); + + private final TickCounter tickCounter; + private final int percentageChangeThreshold; + + // data + private double lastTickTime = 0; + private State state = null; + private DoubleSummaryStatistics averageTickTime = new DoubleSummaryStatistics(); + private double avg; + + public TickMonitor(TickCounter tickCounter, int percentageChangeThreshold) { + this.tickCounter = tickCounter; + this.percentageChangeThreshold = percentageChangeThreshold; + + this.tickCounter.start(); + this.tickCounter.addTickTask(this); + } + + protected abstract void sendMessage(String message); + + public void close() { + this.tickCounter.close(); + } + + @Override + public void run() { + double now = ((double) System.nanoTime()) / 1000000d; + + // init + if (this.state == null) { + this.state = State.SETUP; + this.lastTickTime = now; + sendMessage("Tick monitor started. Before the monitor becomes fully active, the server's " + + "average tick rate will be calculated over a period of 120 ticks (approx 6 seconds)."); + return; + } + + // find the diff + double diff = now - this.lastTickTime; + this.lastTickTime = now; + + // form averages + if (this.state == State.SETUP) { + this.averageTickTime.accept(diff); + + // move onto the next state + if (this.averageTickTime.getCount() >= 120) { + + sendMessage("&bAnalysis is now complete."); + sendMessage("&f> &7Max: " + df.format(this.averageTickTime.getMax()) + "ms"); + sendMessage("&f> &7Min: " + df.format(this.averageTickTime.getMin()) + "ms"); + sendMessage("&f> &7Avg: " + df.format(this.averageTickTime.getAverage()) + "ms"); + sendMessage("Starting now, any ticks with >" + this.percentageChangeThreshold + "% increase in " + + "duration compared to the average will be reported."); + + this.avg = this.averageTickTime.getAverage(); + this.state = State.MONITORING; + } + } + + if (this.state == State.MONITORING) { + double increase = diff - this.avg; + if (increase <= 0) { + return; + } + + double percentageChange = (increase * 100d) / this.avg; + if (percentageChange > this.percentageChangeThreshold) { + sendMessage("&7Tick &8#" + this.tickCounter.getCurrentTick() + " &7lasted &b" + df.format(diff) + "&7 milliseconds. " + + "&7(&b" + df.format(percentageChange) + "% &7increase from average)"); + } + } + } + + private enum State { + SETUP, + MONITORING + } +} diff --git a/spark-common/src/main/java/me/lucko/spark/common/http/Bytebin.java b/spark-common/src/main/java/me/lucko/spark/common/http/Bytebin.java new file mode 100644 index 0000000..3cd5e4c --- /dev/null +++ b/spark-common/src/main/java/me/lucko/spark/common/http/Bytebin.java @@ -0,0 +1,54 @@ +package me.lucko.spark.common.http; + +import com.google.gson.Gson; +import com.google.gson.JsonObject; + +import okhttp3.MediaType; +import okhttp3.Request; +import okhttp3.RequestBody; +import okhttp3.Response; +import okhttp3.ResponseBody; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; + +/** + * Utility for uploading JSON data to bytebin. + */ +public final class Bytebin { + + /** Media type for JSON data */ + private static final MediaType JSON_TYPE = MediaType.parse("application/json; charset=utf-8"); + /** The URL used to upload sampling data */ + private static final String UPLOAD_ENDPOINT = "https://bytebin.lucko.me/post"; + + public static String postCompressedContent(byte[] buf) throws IOException { + RequestBody body = RequestBody.create(JSON_TYPE, buf); + + Request.Builder requestBuilder = new Request.Builder() + .url(UPLOAD_ENDPOINT) + .header("Content-Encoding", "gzip") + .post(body); + + Request request = requestBuilder.build(); + try (Response response = HttpClient.makeCall(request)) { + try (ResponseBody responseBody = response.body()) { + if (responseBody == null) { + throw new RuntimeException("No response"); + } + + try (InputStream inputStream = responseBody.byteStream()) { + try (BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))) { + JsonObject object = new Gson().fromJson(reader, JsonObject.class); + return object.get("key").getAsString(); + } + } + } + } + } + + private Bytebin() {} +} diff --git a/spark-common/src/main/java/me/lucko/spark/common/http/HttpClient.java b/spark-common/src/main/java/me/lucko/spark/common/http/HttpClient.java new file mode 100644 index 0000000..61db597 --- /dev/null +++ b/spark-common/src/main/java/me/lucko/spark/common/http/HttpClient.java @@ -0,0 +1,113 @@ +/* + * This file is part of LuckPerms, licensed under the MIT License. + * + * Copyright (c) lucko (Luck) <luck@lucko.me> + * Copyright (c) contributors + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package me.lucko.spark.common.http; + +import okhttp3.Interceptor; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import okhttp3.ResponseBody; + +import java.io.IOException; +import java.net.Proxy; +import java.net.ProxySelector; +import java.net.SocketAddress; +import java.net.URI; +import java.util.Collections; +import java.util.List; + +/** + * Utility class for making http requests. + */ +public final class HttpClient { + private static OkHttpClient client = null; + + private static synchronized OkHttpClient getClient() { + if (client == null) { + client = new OkHttpClient.Builder() + .proxySelector(new NullSafeProxySelector()) + .addInterceptor(new UserAgentInterceptor()) + .build(); + } + return client; + } + + public static Response makeCall(Request request) throws IOException { + Response response = getClient().newCall(request).execute(); + if (!response.isSuccessful()) { + throw exceptionForUnsuccessfulResponse(response); + } + return response; + } + + private static RuntimeException exceptionForUnsuccessfulResponse(Response response) { + String msg = ""; + try (ResponseBody responseBody = response.body()) { + if (responseBody != null) { + msg = responseBody.string(); + } + } catch (IOException e) { + // ignore + } + return new RuntimeException("Got response: " + response.code() + " - " + response.message() + " - " + msg); + } + + private static final class UserAgentInterceptor implements Interceptor { + @Override + public Response intercept(Chain chain) throws IOException { + Request orig = chain.request(); + Request modified = orig.newBuilder() + .header("User-Agent", "spark-plugin") + .build(); + + return chain.proceed(modified); + } + } + + // sometimes ProxySelector#getDefault returns null, and okhttp doesn't like that + private static final class NullSafeProxySelector extends ProxySelector { + private static final List<Proxy> DIRECT = Collections.singletonList(Proxy.NO_PROXY); + + @Override + public List<Proxy> select(URI uri) { + ProxySelector def = ProxySelector.getDefault(); + if (def == null) { + return DIRECT; + } + return def.select(uri); + } + + @Override + public void connectFailed(URI uri, SocketAddress sa, IOException ioe) { + ProxySelector def = ProxySelector.getDefault(); + if (def != null) { + def.connectFailed(uri, sa, ioe); + } + } + } + + private HttpClient() {} +}
\ No newline at end of file diff --git a/spark-common/src/main/java/me/lucko/spark/profiler/AsyncDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/profiler/AsyncDataAggregator.java new file mode 100644 index 0000000..9a4090e --- /dev/null +++ b/spark-common/src/main/java/me/lucko/spark/profiler/AsyncDataAggregator.java @@ -0,0 +1,77 @@ +package me.lucko.spark.profiler; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * Implementation of {@link DataAggregator} that makes use of a "worker" thread pool for inserting + * data. + */ +public class AsyncDataAggregator implements DataAggregator { + + /** A map of root stack nodes for each thread with sampling data */ + private final Map<String, StackNode> threadData = new ConcurrentHashMap<>(); + + /** The worker pool for inserting stack nodes */ + private final ExecutorService workerPool; + + /** The instance used to group threads together */ + private final ThreadGrouper threadGrouper; + + /** The interval to wait between sampling, in milliseconds */ + private final int interval; + + public AsyncDataAggregator(ExecutorService workerPool, ThreadGrouper threadGrouper, int interval) { + this.workerPool = workerPool; + this.threadGrouper = threadGrouper; + this.interval = interval; + } + + @Override + public void insertData(String threadName, StackTraceElement[] stack) { + // form the queued data + QueuedThreadInfo queuedData = new QueuedThreadInfo(threadName, stack); + // schedule insertion of the data + this.workerPool.execute(queuedData); + } + + @Override + public Map<String, StackNode> getData() { + // wait for all pending data to be inserted + this.workerPool.shutdown(); + try { + this.workerPool.awaitTermination(15, TimeUnit.SECONDS); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + return this.threadData; + } + + void insertData(QueuedThreadInfo data) { + try { + String group = this.threadGrouper.getGroup(data.threadName); + StackNode node = this.threadData.computeIfAbsent(group, StackNode::new); + node.log(data.stack, this.interval); + } catch (Exception e) { + e.printStackTrace(); + } + } + + private final class QueuedThreadInfo implements Runnable { + private final String threadName; + private final StackTraceElement[] stack; + + QueuedThreadInfo(String threadName, StackTraceElement[] stack) { + this.threadName = threadName; + this.stack = stack; + } + + @Override + public void run() { + insertData(this); + } + } +} diff --git a/spark-common/src/main/java/me/lucko/spark/profiler/DataAggregator.java b/spark-common/src/main/java/me/lucko/spark/profiler/DataAggregator.java new file mode 100644 index 0000000..1afa52c --- /dev/null +++ b/spark-common/src/main/java/me/lucko/spark/profiler/DataAggregator.java @@ -0,0 +1,32 @@ +package me.lucko.spark.profiler; + +import java.util.Map; + +/** + * Aggregates sampling data. + */ +public interface DataAggregator { + + /** + * Called before the sampler begins to insert data + */ + default void start() { + + } + + /** + * Forms the output data + * + * @return the output data + */ + Map<String, StackNode> getData(); + + /** + * Inserts sampling data into this aggregator + * + * @param threadName the name of the thread + * @param stack the call stack + */ + void insertData(String threadName, StackTraceElement[] stack); + +} diff --git a/spark-common/src/main/java/me/lucko/spark/profiler/Sampler.java b/spark-common/src/main/java/me/lucko/spark/profiler/Sampler.java new file mode 100644 index 0000000..3476f03 --- /dev/null +++ b/spark-common/src/main/java/me/lucko/spark/profiler/Sampler.java @@ -0,0 +1,170 @@ +/* + * WarmRoast + * Copyright (C) 2013 Albert Pham <http://www.sk89q.com> + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +package me.lucko.spark.profiler; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.google.gson.stream.JsonWriter; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.Writer; +import java.lang.management.ManagementFactory; +import java.lang.management.ThreadInfo; +import java.lang.management.ThreadMXBean; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.zip.GZIPOutputStream; + +/** + * Main sampler class. + */ +public class Sampler extends TimerTask { + private static final AtomicInteger THREAD_ID = new AtomicInteger(0); + + /** The worker pool for inserting stack nodes */ + private final ExecutorService workerPool = Executors.newFixedThreadPool( + 6, new ThreadFactoryBuilder().setNameFormat("spark-worker-" + THREAD_ID.getAndIncrement()).build() + ); + + /** The thread management interface for the current JVM */ + private final ThreadMXBean threadBean = ManagementFactory.getThreadMXBean(); + /** The instance used to generate thread information for use in sampling */ + private final ThreadDumper threadDumper; + /** Responsible for aggregating and then outputting collected sampling data */ + private final DataAggregator dataAggregator; + + /** A future to encapsulation the completion of this sampler instance */ + private final CompletableFuture<Sampler> future = new CompletableFuture<>(); + + /** The interval to wait between sampling, in milliseconds */ + private final int interval; + /** The time when sampling first began */ + private long startTime = -1; + /** The unix timestamp (in millis) when this sampler should automatically complete.*/ + private final long endTime; // -1 for nothing + + public Sampler(int interval, ThreadDumper threadDumper, ThreadGrouper threadGrouper, long endTime) { + this.threadDumper = threadDumper; + this.dataAggregator = new AsyncDataAggregator(this.workerPool, threadGrouper, interval); + this.interval = interval; + this.endTime = endTime; + } + + public Sampler(int interval, ThreadDumper threadDumper, ThreadGrouper threadGrouper, long endTime, TickCounter tickCounter, int tickLengthThreshold) { + this.threadDumper = threadDumper; + this.dataAggregator = new TickedDataAggregator(this.workerPool, tickCounter, threadGrouper, interval, tickLengthThreshold); + this.interval = interval; + this.endTime = endTime; + } + + /** + * Starts the sampler. + * + * @param samplingThread the timer to schedule the sampling o |
