aboutsummaryrefslogtreecommitdiff
path: root/spark-common/src/main/java/me/lucko/spark/common/sampler
diff options
context:
space:
mode:
authorLuck <git@lucko.me>2019-04-16 21:37:59 +0100
committerLuck <git@lucko.me>2019-04-16 21:37:59 +0100
commitecd4cec8545460a4fc4ca65b911c2503a00cd8e7 (patch)
tree62067383a1044abc3a09724e89c6e7c619e87ec0 /spark-common/src/main/java/me/lucko/spark/common/sampler
parent8a61b404848ed8e3c27f06eb73239d37d4273240 (diff)
downloadspark-ecd4cec8545460a4fc4ca65b911c2503a00cd8e7.tar.gz
spark-ecd4cec8545460a4fc4ca65b911c2503a00cd8e7.tar.bz2
spark-ecd4cec8545460a4fc4ca65b911c2503a00cd8e7.zip
Lots of refactoring, add tps command
Diffstat (limited to 'spark-common/src/main/java/me/lucko/spark/common/sampler')
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java200
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java90
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadDumper.java126
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadGrouper.java72
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/TickCounter.java64
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/DataAggregator.java54
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/SimpleDataAggregator.java82
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/TickedDataAggregator.java184
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/node/AbstractNode.java143
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/node/StackTraceNode.java86
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/node/ThreadNode.java44
11 files changed, 1145 insertions, 0 deletions
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java
new file mode 100644
index 0000000..d504247
--- /dev/null
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java
@@ -0,0 +1,200 @@
+/*
+ * This file is part of spark.
+ *
+ * Copyright (C) Albert Pham <http://www.sk89q.com>
+ * Copyright (c) lucko (Luck) <luck@lucko.me>
+ * Copyright (c) contributors
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+package me.lucko.spark.common.sampler;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.gson.stream.JsonWriter;
+import me.lucko.spark.common.sampler.aggregator.DataAggregator;
+import me.lucko.spark.common.sampler.aggregator.SimpleDataAggregator;
+import me.lucko.spark.common.sampler.aggregator.TickedDataAggregator;
+import me.lucko.spark.common.sampler.node.ThreadNode;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadInfo;
+import java.lang.management.ThreadMXBean;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.zip.GZIPOutputStream;
+
+/**
+ * Main sampler class.
+ */
+public class Sampler implements Runnable {
+ private static final AtomicInteger THREAD_ID = new AtomicInteger(0);
+
+ /** The worker pool for inserting stack nodes */
+ private final ScheduledExecutorService workerPool = Executors.newScheduledThreadPool(
+ 6, new ThreadFactoryBuilder().setNameFormat("spark-worker-" + THREAD_ID.getAndIncrement() + "-%d").build()
+ );
+
+ /** The main sampling task */
+ private ScheduledFuture<?> task;
+
+ /** The thread management interface for the current JVM */
+ private final ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
+ /** The instance used to generate thread information for use in sampling */
+ private final ThreadDumper threadDumper;
+ /** Responsible for aggregating and then outputting collected sampling data */
+ private final DataAggregator dataAggregator;
+
+ /** A future to encapsulation the completion of this sampler instance */
+ private final CompletableFuture<Sampler> future = new CompletableFuture<>();
+
+ /** The interval to wait between sampling, in microseconds */
+ private final int interval;
+ /** The time when sampling first began */
+ private long startTime = -1;
+ /** The unix timestamp (in millis) when this sampler should automatically complete.*/
+ private final long endTime; // -1 for nothing
+
+ public Sampler(int interval, ThreadDumper threadDumper, ThreadGrouper threadGrouper, long endTime, boolean includeLineNumbers) {
+ this.threadDumper = threadDumper;
+ this.dataAggregator = new SimpleDataAggregator(this.workerPool, threadGrouper, interval, includeLineNumbers);
+ this.interval = interval;
+ this.endTime = endTime;
+ }
+
+ public Sampler(int interval, ThreadDumper threadDumper, ThreadGrouper threadGrouper, long endTime, boolean includeLineNumbers, TickCounter tickCounter, int tickLengthThreshold) {
+ this.threadDumper = threadDumper;
+ this.dataAggregator = new TickedDataAggregator(this.workerPool, tickCounter, threadGrouper, interval, includeLineNumbers, tickLengthThreshold);
+ this.interval = interval;
+ this.endTime = endTime;
+ }
+
+ /**
+ * Starts the sampler.
+ */
+ public void start() {
+ this.startTime = System.currentTimeMillis();
+ this.dataAggregator.start();
+ this.task = this.workerPool.scheduleAtFixedRate(this, 0, this.interval, TimeUnit.MICROSECONDS);
+ }
+
+ public long getStartTime() {
+ if (this.startTime == -1) {
+ throw new IllegalStateException("Not yet started");
+ }
+ return this.startTime;
+ }
+
+ public long getEndTime() {
+ return this.endTime;
+ }
+
+ public CompletableFuture<Sampler> getFuture() {
+ return this.future;
+ }
+
+ public void cancel() {
+ this.task.cancel(false);
+ }
+
+ @Override
+ public void run() {
+ // this is effectively synchronized, the worker pool will not allow this task
+ // to concurrently execute.
+ try {
+ if (this.endTime != -1 && this.endTime <= System.currentTimeMillis()) {
+ this.future.complete(this);
+ cancel();
+ return;
+ }
+
+ ThreadInfo[] threadDumps = this.threadDumper.dumpThreads(this.threadBean);
+ this.workerPool.execute(new InsertDataTask(this.dataAggregator, threadDumps));
+ } catch (Throwable t) {
+ this.future.completeExceptionally(t);
+ cancel();
+ }
+ }
+
+ private static final class InsertDataTask implements Runnable {
+ private final DataAggregator dataAggregator;
+ private final ThreadInfo[] threadDumps;
+
+ InsertDataTask(DataAggregator dataAggregator, ThreadInfo[] threadDumps) {
+ this.dataAggregator = dataAggregator;
+ this.threadDumps = threadDumps;
+ }
+
+ @Override
+ public void run() {
+ for (ThreadInfo threadInfo : this.threadDumps) {
+ String threadName = threadInfo.getThreadName();
+ StackTraceElement[] stack = threadInfo.getStackTrace();
+
+ if (threadName == null || stack == null) {
+ continue;
+ }
+
+ this.dataAggregator.insertData(threadName, stack);
+ }
+ }
+ }
+
+ private void writeOutput(JsonWriter writer) throws IOException {
+ writer.beginObject();
+
+ writer.name("type").value("sampler");
+ writer.name("threads").beginArray();
+
+ List<Map.Entry<String, ThreadNode>> data = new ArrayList<>(this.dataAggregator.getData().entrySet());
+ data.sort(Map.Entry.comparingByKey());
+
+ for (Map.Entry<String, ThreadNode> entry : data) {
+ writer.beginObject();
+ writer.name("threadName").value(entry.getKey());
+ writer.name("totalTime").value(entry.getValue().getTotalTime());
+ writer.name("rootNode");
+ entry.getValue().serializeTo(writer);
+ writer.endObject();
+ }
+
+ writer.endArray();
+ writer.endObject();
+ }
+
+ public byte[] formCompressedDataPayload() {
+ ByteArrayOutputStream byteOut = new ByteArrayOutputStream();
+ try (Writer writer = new OutputStreamWriter(new GZIPOutputStream(byteOut), StandardCharsets.UTF_8)) {
+ try (JsonWriter jsonWriter = new JsonWriter(writer)) {
+ writeOutput(jsonWriter);
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return byteOut.toByteArray();
+ }
+
+}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java
new file mode 100644
index 0000000..4ce69df
--- /dev/null
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java
@@ -0,0 +1,90 @@
+/*
+ * This file is part of spark.
+ *
+ * Copyright (c) lucko (Luck) <luck@lucko.me>
+ * Copyright (c) contributors
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+package me.lucko.spark.common.sampler;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Builds {@link Sampler} instances.
+ */
+public class SamplerBuilder {
+
+ private double samplingInterval = 4; // milliseconds
+ private boolean includeLineNumbers = false;
+ private long timeout = -1;
+ private ThreadDumper threadDumper = ThreadDumper.ALL;
+ private ThreadGrouper threadGrouper = ThreadGrouper.BY_NAME;
+
+ private int ticksOver = -1;
+ private TickCounter tickCounter = null;
+
+ public SamplerBuilder() {
+ }
+
+ public SamplerBuilder samplingInterval(double samplingInterval) {
+ this.samplingInterval = samplingInterval;
+ return this;
+ }
+
+ public SamplerBuilder completeAfter(long timeout, TimeUnit unit) {
+ if (timeout <= 0) {
+ throw new IllegalArgumentException("timeout > 0");
+ }
+ this.timeout = System.currentTimeMillis() + unit.toMillis(timeout);
+ return this;
+ }
+
+ public SamplerBuilder threadDumper(ThreadDumper threadDumper) {
+ this.threadDumper = threadDumper;
+ return this;
+ }
+
+ public SamplerBuilder threadGrouper(ThreadGrouper threadGrouper) {
+ this.threadGrouper = threadGrouper;
+ return this;
+ }
+
+ public SamplerBuilder ticksOver(int ticksOver, TickCounter tickCounter) {
+ this.ticksOver = ticksOver;
+ this.tickCounter = tickCounter;
+ return this;
+ }
+
+ public SamplerBuilder includeLineNumbers(boolean includeLineNumbers) {
+ this.includeLineNumbers = includeLineNumbers;
+ return this;
+ }
+
+ public Sampler start() {
+ Sampler sampler;
+
+ int intervalMicros = (int) (this.samplingInterval * 1000d);
+ if (this.ticksOver != -1 && this.tickCounter != null) {
+ sampler = new Sampler(intervalMicros, this.threadDumper, this.threadGrouper, this.timeout, this.includeLineNumbers, this.tickCounter, this.ticksOver);
+ } else {
+ sampler = new Sampler(intervalMicros, this.threadDumper, this.threadGrouper, this.timeout, this.includeLineNumbers);
+ }
+
+ sampler.start();
+ return sampler;
+ }
+
+}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadDumper.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadDumper.java
new file mode 100644
index 0000000..14938ac
--- /dev/null
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadDumper.java
@@ -0,0 +1,126 @@
+/*
+ * This file is part of spark.
+ *
+ * Copyright (C) Albert Pham <http://www.sk89q.com>
+ * Copyright (c) lucko (Luck) <luck@lucko.me>
+ * Copyright (c) contributors
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+package me.lucko.spark.common.sampler;
+
+import me.lucko.spark.common.util.ThreadFinder;
+
+import java.lang.management.ThreadInfo;
+import java.lang.management.ThreadMXBean;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+import java.util.stream.Collectors;
+
+/**
+ * Uses the {@link ThreadMXBean} to generate {@link ThreadInfo} instances for the threads being
+ * sampled.
+ */
+@FunctionalInterface
+public interface ThreadDumper {
+
+ /**
+ * Generates {@link ThreadInfo} data for the sampled threads.
+ *
+ * @param threadBean the thread bean instance to obtain the data from
+ * @return an array of generated thread info instances
+ */
+ ThreadInfo[] dumpThreads(ThreadMXBean threadBean);
+
+ /**
+ * Implementation of {@link ThreadDumper} that generates data for all threads.
+ */
+ ThreadDumper ALL = threadBean -> threadBean.dumpAllThreads(false, false);
+
+ /**
+ * Implementation of {@link ThreadDumper} that generates data for a specific set of threads.
+ */
+ final class Specific implements ThreadDumper {
+ private final ThreadFinder threadFinder = new ThreadFinder();
+ private final long[] ids;
+
+ public Specific(long[] ids) {
+ this.ids = ids;
+ }
+
+ public Specific(Set<String> names) {
+ Set<String> namesLower = names.stream().map(String::toLowerCase).collect(Collectors.toSet());
+ this.ids = this.threadFinder.getThreads()
+ .filter(t -> namesLower.contains(t.getName().toLowerCase()))
+ .mapToLong(Thread::getId)
+ .toArray();
+ }
+
+ @Override
+ public ThreadInfo[] dumpThreads(ThreadMXBean threadBean) {
+ return threadBean.getThreadInfo(this.ids, Integer.MAX_VALUE);
+ }
+ }
+
+ /**
+ * Implementation of {@link ThreadDumper} that generates data for a regex matched set of threads.
+ */
+ final class Regex implements ThreadDumper {
+ private final ThreadFinder threadFinder = new ThreadFinder();
+ private final Set<Pattern> namePatterns;
+ private final Map<Long, Boolean> cache = new HashMap<>();
+
+ public Regex(Set<String> namePatterns) {
+ this.namePatterns = namePatterns.stream()
+ .map(regex -> {
+ try {
+ return Pattern.compile(regex);
+ } catch (PatternSyntaxException e) {
+ return null;
+ }
+ })
+ .filter(Objects::nonNull)
+ .collect(Collectors.toSet());
+ }
+
+ @Override
+ public ThreadInfo[] dumpThreads(ThreadMXBean threadBean) {
+ return this.threadFinder.getThreads()
+ .filter(thread -> {
+ Boolean result = this.cache.get(thread.getId());
+ if (result != null) {
+ return result;
+ }
+
+ for (Pattern pattern : this.namePatterns) {
+ if (pattern.matcher(thread.getName()).matches()) {
+ this.cache.put(thread.getId(), true);
+ return true;
+ }
+ }
+ this.cache.put(thread.getId(), false);
+ return false;
+ })
+ .map(thread -> threadBean.getThreadInfo(thread.getId()))
+ .filter(Objects::nonNull)
+ .toArray(ThreadInfo[]::new);
+ }
+ }
+
+}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadGrouper.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadGrouper.java
new file mode 100644
index 0000000..f53800a
--- /dev/null
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadGrouper.java
@@ -0,0 +1,72 @@
+/*
+ * This file is part of spark.
+ *
+ * Copyright (c) lucko (Luck) <luck@lucko.me>
+ * Copyright (c) contributors
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+package me.lucko.spark.common.sampler;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Function for grouping threads together
+ */
+@FunctionalInterface
+public interface ThreadGrouper {
+
+ /**
+ * Gets the group for the given thread.
+ *
+ * @param threadName the name of the thread
+ * @return the group
+ */
+ String getGroup(String threadName);
+
+ /**
+ * Implementation of {@link ThreadGrouper} that just groups by thread name.
+ */
+ ThreadGrouper BY_NAME = threadName -> threadName;
+
+ /**
+ * Implementation of {@link ThreadGrouper} that attempts to group by the name of the pool
+ * the thread originated from.
+ *
+ * <p>The regex pattern used to match pools expects a digit at the end of the thread name,
+ * separated from the pool name with any of one or more of ' ', '-', or '#'.</p>
+ */
+ ThreadGrouper BY_POOL = new ThreadGrouper() {
+ private final Pattern pattern = Pattern.compile("^(.*?)[-# ]+\\d+$");
+
+ @Override
+ public String getGroup(String threadName) {
+ Matcher matcher = this.pattern.matcher(threadName);
+ if (!matcher.matches()) {
+ return threadName;
+ }
+
+ return matcher.group(1).trim() + " (Combined)";
+ }
+ };
+
+ /**
+ * Implementation of {@link ThreadGrouper} which groups all threads as one, under
+ * the name "All".
+ */
+ ThreadGrouper AS_ONE = threadName -> "All";
+
+}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/TickCounter.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/TickCounter.java
new file mode 100644
index 0000000..aa839ba
--- /dev/null
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/TickCounter.java
@@ -0,0 +1,64 @@
+/*
+ * This file is part of spark.
+ *
+ * Copyright (c) lucko (Luck) <luck@lucko.me>
+ * Copyright (c) contributors
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+package me.lucko.spark.common.sampler;
+
+/**
+ * A hook with the game's "tick loop".
+ */
+public interface TickCounter extends AutoCloseable {
+
+ /**
+ * Starts the counter
+ */
+ void start();
+
+ /**
+ * Stops the counter
+ */
+ @Override
+ void close();
+
+ /**
+ * Gets the current tick number
+ *
+ * @return the current tick
+ */
+ int getCurrentTick();
+
+ /**
+ * Adds a task to be called each time the tick increments
+ *
+ * @param runnable the task
+ */
+ void addTickTask(TickTask runnable);
+
+ /**
+ * Removes a tick task
+ *
+ * @param runnable the task
+ */
+ void removeTickTask(TickTask runnable);
+
+ interface TickTask {
+ void onTick(TickCounter counter);
+ }
+
+}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/DataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/DataAggregator.java
new file mode 100644
index 0000000..8c65c2d
--- /dev/null
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/DataAggregator.java
@@ -0,0 +1,54 @@
+/*
+ * This file is part of spark.
+ *
+ * Copyright (c) lucko (Luck) <luck@lucko.me>
+ * Copyright (c) contributors
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+package me.lucko.spark.common.sampler.aggregator;
+
+import me.lucko.spark.common.sampler.node.ThreadNode;
+
+import java.util.Map;
+
+/**
+ * Aggregates sampling data.
+ */
+public interface DataAggregator {
+
+ /**
+ * Called before the sampler begins to insert data
+ */
+ default void start() {
+
+ }
+
+ /**
+ * Forms the output data
+ *
+ * @return the output data
+ */
+ Map<String, ThreadNode> getData();
+
+ /**
+ * Inserts sampling data into this aggregator
+ *
+ * @param threadName the name of the thread
+ * @param stack the call stack
+ */
+ void insertData(String threadName, StackTraceElement[] stack);
+
+}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/SimpleDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/SimpleDataAggregator.java
new file mode 100644
index 0000000..8fbd03f
--- /dev/null
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/SimpleDataAggregator.java
@@ -0,0 +1,82 @@
+/*
+ * This file is part of spark.
+ *
+ * Copyright (c) lucko (Luck) <luck@lucko.me>
+ * Copyright (c) contributors
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+package me.lucko.spark.common.sampler.aggregator;
+
+import me.lucko.spark.common.sampler.ThreadGrouper;
+import me.lucko.spark.common.sampler.node.AbstractNode;
+import me.lucko.spark.common.sampler.node.ThreadNode;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Basic implementation of {@link DataAggregator}.
+ */
+public class SimpleDataAggregator implements DataAggregator {
+
+ /** A map of root stack nodes for each thread with sampling data */
+ private final Map<String, ThreadNode> threadData = new ConcurrentHashMap<>();
+
+ /** The worker pool used for sampling */
+ private final ExecutorService workerPool;
+
+ /** The instance used to group threads together */
+ private final ThreadGrouper threadGrouper;
+
+ /** The interval to wait between sampling, in microseconds */
+ private final int interval;
+
+ /** If line numbers should be included in the output */
+ private final boolean includeLineNumbers;
+
+ public SimpleDataAggregator(ExecutorService workerPool, ThreadGrouper threadGrouper, int interval, boolean includeLineNumbers) {
+ this.workerPool = workerPool;
+ this.threadGrouper = threadGrouper;
+ this.interval = interval;
+ this.includeLineNumbers = includeLineNumbers;
+ }
+
+ @Override
+ public void insertData(String threadName, StackTraceElement[] stack) {
+ try {
+ String group = this.threadGrouper.getGroup(threadName);
+ AbstractNode node = this.threadData.computeIfAbsent(group, ThreadNode::new);
+ node.log(stack, this.interval, this.includeLineNumbers);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public Map<String, ThreadNode> getData() {
+ // wait for all pending data to be inserted
+ this.workerPool.shutdown();
+ try {
+ this.workerPool.awaitTermination(15, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ return this.threadData;
+ }
+}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/TickedDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/TickedDataAggregator.java
new file mode 100644
index 0000000..8f8124b
--- /dev/null
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/TickedDataAggregator.java
@@ -0,0 +1,184 @@
+/*
+ * This file is part of spark.
+ *
+ * Copyright (c) lucko (Luck) <luck@lucko.me>
+ * Copyright (c) contributors
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+package me.lucko.spark.common.sampler.aggregator;
+
+import me.lucko.spark.common.sampler.ThreadGrouper;
+import me.lucko.spark.common.sampler.TickCounter;
+import me.lucko.spark.common.sampler.node.AbstractNode;
+import me.lucko.spark.common.sampler.node.ThreadNode;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Implementation of {@link DataAggregator} which supports only including sampling data from "ticks"
+ * which exceed a certain threshold in duration.
+ */
+public class TickedDataAggregator implements DataAggregator {
+
+ /** A map of root stack nodes for each thread with sampling data */
+ private final Map<String, ThreadNode> threadData = new ConcurrentHashMap<>();
+
+ /** The worker pool for inserting stack nodes */
+ private final ExecutorService workerPool;
+
+ /** Used to monitor the current "tick" of the server */
+ private final TickCounter tickCounter;
+
+ /** The instance used to group threads together */
+ private final ThreadGrouper threadGrouper;
+
+ /** The interval to wait between sampling, in microseconds */
+ private final int interval;
+
+ /** If line numbers should be included in the output */
+ private final boolean includeLineNumbers;
+
+ /** Tick durations under this threshold will not be inserted, measured in microseconds */
+ private final long tickLengthThreshold;
+
+ /** The expected number of samples in each tick */
+ private final int expectedSize;
+
+ private final Object mutex = new Object();
+
+ // state
+ private int currentTick = -1;
+ private TickList currentData = new TickList(0);
+
+ public TickedDataAggregator(ExecutorService workerPool, TickCounter tickCounter, ThreadGrouper threadGrouper, int interval, boolean includeLineNumbers, int tickLengthThreshold) {
+ this.workerPool = workerPool;
+ this.tickCounter = tickCounter;
+ this.threadGrouper = threadGrouper;
+ this.interval = interval;
+ this.includeLineNumbers = includeLineNumbers;
+ this.tickLengthThreshold = TimeUnit.MILLISECONDS.toMicros(tickLengthThreshold);
+ // 50 millis in a tick, plus 10 so we have a bit of room to go over
+ double intervalMilliseconds = interval / 1000d;
+ this.expectedSize = (int) ((50 / intervalMilliseconds) + 10);
+ }
+
+ @Override
+ public void insertData(String threadName, StackTraceElement[] stack) {
+ synchronized (this.mutex) {
+ int tick = this.tickCounter.getCurrentTick();
+ if (this.currentTick != tick) {
+ pushCurrentTick();
+ this.currentTick = tick;
+ this.currentData = new TickList(this.expectedSize);
+ }
+
+ // form the queued data
+ QueuedThreadInfo queuedData = new QueuedThreadInfo(threadName, stack);
+ // insert it
+ this.currentData.addData(queuedData);
+ }
+ }
+
+ // guarded by 'mutex'
+ private void pushCurrentTick() {
+ TickList currentData = this.currentData;
+
+ // approximate how long the tick lasted
+ int tickLengthMicros = currentData.getList().size() * this.interval;
+
+ // don't push data below the threshold
+ if (tickLengthMicros < this.tickLengthThreshold) {
+ return;
+ }
+
+ this.workerPool.submit(currentData);
+ }
+
+ @Override
+ public void start() {
+ this.tickCounter.start();
+ }
+
+ @Override
+ public Map<String, ThreadNode> getData() {
+ // push the current tick
+ synchronized (this.mutex) {
+ pushCurrentTick();
+ }
+
+ // close the tick counter
+ this.tickCounter.close();
+
+ // wait for all pending data to be inserted
+ this.workerPool.shutdown();
+ try {
+ this.workerPool.awaitTermination(15, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ return this.threadData;
+ }
+
+ // called by TickList
+ void insertData(List<QueuedThreadInfo> dataList) {
+ for (QueuedThreadInfo data : dataList) {
+ try {
+ String group = this.threadGrouper.getGroup(data.threadName);
+ AbstractNode node = this.threadData.computeIfAbsent(group, ThreadNode::new);
+ node.log(data.stack, this.interval, this.includeLineNumbers);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ private final class TickList implements Runnable {
+ private final List<QueuedThreadInfo> list;
+
+ TickList(int expectedSize) {
+ this.list = new ArrayList<>(expectedSize);
+ }
+
+ @Override
+ public void run() {
+ insertData(this.list);
+ }
+
+ public List<QueuedThreadInfo> getList() {
+ return this.list;
+ }
+
+ public void addData(QueuedThreadInfo data) {
+ this.list.add(data);
+ }
+ }
+
+ private static final class QueuedThreadInfo {
+ private final String threadName;
+ private final StackTraceElement[] stack;
+
+ QueuedThreadInfo(String threadName, StackTraceElement[] stack) {
+ this.threadName = threadName;
+ this.stack = stack;
+ }
+ }
+}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/node/AbstractNode.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/node/AbstractNode.java
new file mode 100644
index 0000000..5cfc0f2
--- /dev/null
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/node/AbstractNode.java
@@ -0,0 +1,143 @@
+/*
+ * This file is part of spark.
+ *
+ * Copyright (C) Albert Pham <http://www.sk89q.com>
+ * Copyright (c) lucko (Luck) <luck@lucko.me>
+ * Copyright (c) contributors
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+package me.lucko.spark.common.sampler.node;
+
+import com.google.gson.stream.JsonWriter;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.LongAdder;
+
+/**
+ * Encapsulates a timed node in the sampling stack.
+ */
+public abstract class AbstractNode {
+
+ private static final int MAX_STACK_DEPTH = 300;
+
+ /**
+ * A map of this nodes children
+ */
+ private final Map<String, StackTraceNode> children = new ConcurrentHashMap<>();
+
+ /**
+ * The accumulated sample time for this node, measured in microseconds
+ */
+ private final LongAdder totalTime = new LongAdder();
+
+ /**
+ * Returns the total sample time for this node in milliseconds.
+ *
+ * @return the total time
+ */
+ public long getTotalTime() {
+ long millis = TimeUnit.MICROSECONDS.toMillis(this.totalTime.longValue());
+ if (millis == 0) {
+ return 1;
+ }
+ return millis;
+ }
+
+ private AbstractNode resolveChild(String className, String methodName, int lineNumber) {
+ return this.children.computeIfAbsent(
+ StackTraceNode.generateKey(className, methodName, lineNumber),
+ name -> new StackTraceNode(className, methodName, lineNumber)
+ );
+ }
+
+ public void log(StackTraceElement[] elements, long time, boolean includeLineNumbers) {
+ log(elements, 0, time, includeLineNumbers);
+ }
+
+ private void log(StackTraceElement[] elements, int offset, long time, boolean includeLineNumbers) {
+ this.totalTime.add(time);
+
+ if (offset >= MAX_STACK_DEPTH) {
+ return;
+ }
+
+ if (elements.length - offset == 0) {
+ return;
+ }
+
+ // the first element in the array is the top of the call stack, and the last is the root
+ // offset starts at 0.
+
+ // pointer is determined by subtracting the offset from the index of the last element
+ int pointer = (elements.length - 1) - offset;
+ StackTraceElement element = elements[pointer];
+
+ // the parent stack element is located at pointer+1.
+ // when the current offset is 0, we know the current pointer is at the last element in the
+ // array (the root) and therefore there is no parent.
+ StackTraceElement parent = offset == 0 ? null : elements[pointer + 1];
+
+ // get the line number of the parent element - the line which called "us"
+ int lineNumber = parent == null || !includeLineNumbers ? StackTraceNode.NULL_LINE_NUMBER : parent.getLineNumber();
+
+ // resolve a child element within the structure for the element at pointer
+ AbstractNode child = resolveChild(element.getClassName(), element.getMethodName(), lineNumber);
+ // call the log method on the found child, with an incremented offset.
+ child.log(elements, offset + 1, time, includeLineNumbers);
+ }
+
+ private Collection<? extends AbstractNode> getChildren() {
+ if (this.children.isEmpty()) {
+ return Collections.emptyList();
+ }
+
+ List<StackTraceNode> list = new ArrayList<>(this.children.values());
+ list.sort(null);
+ return list;
+ }
+
+ public void serializeTo(JsonWriter writer) throws IOException {
+ writer.beginObject();
+
+ // append metadata about this node
+ appendMetadata(writer);
+
+ // include the total time recorded for this node
+ writer.name("t").value(getTotalTime());
+
+ // append child nodes, if any are present
+ Collection<? extends AbstractNode> childNodes = getChildren();
+ if (!childNodes.isEmpty()) {
+ writer.name("c").beginArray();
+ for (AbstractNode child : childNodes) {
+ child.serializeTo(writer);
+ }
+ writer.endArray();
+ }
+
+ writer.endObject();
+ }
+
+ protected abstract void appendMetadata(JsonWriter writer) throws IOException;
+
+}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/node/StackTraceNode.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/node/StackTraceNode.java
new file mode 100644
index 0000000..c4e7ac4
--- /dev/null
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/node/StackTraceNode.java
@@ -0,0 +1,86 @@
+/*
+ * This file is part of spark.
+ *
+ * Copyright (C) Albert Pham <http://www.sk89q.com>
+ * Copyright (c) lucko (Luck) <luck@lucko.me>
+ * Copyright (c) contributors
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+package me.lucko.spark.common.sampler.node;
+
+import com.google.gson.stream.JsonWriter;
+
+import java.io.IOException;
+
+/**
+ * Represents a stack trace element within the {@link AbstractNode node} structure.
+ */
+public final class StackTraceNode extends AbstractNode implements Comparable<StackTraceNode> {
+
+ /**
+ * Magic number to denote "no present" line number for a node.
+ */
+ public static final int NULL_LINE_NUMBER = -1;
+
+ /**
+ * Forms a key to represent the given node.
+ *
+ * @param className the name of the class
+ * @param methodName the name of the method
+ * @param lineNumber the line number of the parent method call
+ * @return the key
+ */
+ static String generateKey(String className, String methodName, int lineNumber) {
+ return className + "." + methodName + "." + lineNumber;
+ }
+
+ /** The name of the class */
+ private final String className;
+ /** The name of the method */
+ private final String methodName;
+ /** The line number of the invocation which created this node */
+ private final int lineNumber;
+
+ public StackTraceNode(String className, String methodName, int lineNumber) {
+ this.className = className;
+ this.methodName = methodName;
+ this.lineNumber = lineNumber;
+ }
+
+ @Override
+ protected void appendMetadata(JsonWriter writer) throws IOException {
+ writer.name("cl").value(this.className);
+ writer.name("m").value(this.methodName);
+ if (this.lineNumber >= 0) {
+ writer.name("ln").value(this.lineNumber);
+ }
+ }
+
+ private String key() {
+ return generateKey(this.className, this.methodName, this.lineNumber);
+ }
+
+ @Override
+ public int compareTo(StackTraceNode that) {
+ int i = -Long.compare(this.getTotalTime(), that.getTotalTime());
+ if (i != 0) {
+ return i;
+ }
+
+ return this.key().compareTo(that.key());
+ }
+
+}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/node/ThreadNode.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/node/ThreadNode.java
new file mode 100644
index 0000000..4e8714c
--- /dev/null
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/node/ThreadNode.java
@@ -0,0 +1,44 @@
+/*
+ * This file is part of spark.
+ *
+ * Copyright (c) lucko (Luck) <luck@lucko.me>
+ * Copyright (c) contributors
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+package me.lucko.spark.common.sampler.node;
+
+import com.google.gson.stream.JsonWriter;
+
+import java.io.IOException;
+
+/**
+ * The root of a sampling stack for a given thread / thread group.
+ */
+public final class ThreadNode extends AbstractNode {
+
+ /**
+ * The name of this thread
+ */
+ private final String threadName;
+
+ public ThreadNode(String threadName) {
+ this.threadName = threadName;
+ }
+
+ protected void appendMetadata(JsonWriter writer) throws IOException {
+ writer.name("name").value(this.threadName);
+ }
+}