diff options
author | Luck <git@lucko.me> | 2018-10-08 16:04:51 +0100 |
---|---|---|
committer | Luck <git@lucko.me> | 2018-10-08 16:04:51 +0100 |
commit | 648167064ad2064fc5ab77fb57b347253ac9d468 (patch) | |
tree | 5b44e40b144cdb013cc66e67a08b0208392fb6f5 /spark-common/src/main/java/me/lucko/spark/sampler | |
parent | a342e45839970129ce5cdf1f5bad8da5c607106b (diff) | |
download | spark-648167064ad2064fc5ab77fb57b347253ac9d468.tar.gz spark-648167064ad2064fc5ab77fb57b347253ac9d468.tar.bz2 spark-648167064ad2064fc5ab77fb57b347253ac9d468.zip |
reorganise some packages
Diffstat (limited to 'spark-common/src/main/java/me/lucko/spark/sampler')
11 files changed, 1023 insertions, 0 deletions
diff --git a/spark-common/src/main/java/me/lucko/spark/sampler/Sampler.java b/spark-common/src/main/java/me/lucko/spark/sampler/Sampler.java new file mode 100644 index 0000000..1931ca6 --- /dev/null +++ b/spark-common/src/main/java/me/lucko/spark/sampler/Sampler.java @@ -0,0 +1,184 @@ +/* + * This file is part of spark. + * + * Copyright (C) Albert Pham <http://www.sk89q.com> + * Copyright (c) lucko (Luck) <luck@lucko.me> + * Copyright (c) contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +package me.lucko.spark.sampler; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.google.gson.stream.JsonWriter; + +import me.lucko.spark.sampler.aggregator.DataAggregator; +import me.lucko.spark.sampler.aggregator.SimpleDataAggregator; +import me.lucko.spark.sampler.aggregator.TickedDataAggregator; +import me.lucko.spark.sampler.node.ThreadNode; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.Writer; +import java.lang.management.ManagementFactory; +import java.lang.management.ThreadInfo; +import java.lang.management.ThreadMXBean; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.zip.GZIPOutputStream; + +/** + * Main sampler class. + */ +public class Sampler implements Runnable { + private static final AtomicInteger THREAD_ID = new AtomicInteger(0); + + /** The worker pool for inserting stack nodes */ + private final ScheduledExecutorService workerPool = Executors.newScheduledThreadPool( + 9, new ThreadFactoryBuilder().setNameFormat("spark-worker-" + THREAD_ID.getAndIncrement() + "-%d").build() + ); + + /** The main sampling task */ + private ScheduledFuture<?> task; + + /** The thread management interface for the current JVM */ + private final ThreadMXBean threadBean = ManagementFactory.getThreadMXBean(); + /** The instance used to generate thread information for use in sampling */ + private final ThreadDumper threadDumper; + /** Responsible for aggregating and then outputting collected sampling data */ + private final DataAggregator dataAggregator; + + /** A future to encapsulation the completion of this sampler instance */ + private final CompletableFuture<Sampler> future = new CompletableFuture<>(); + + /** The interval to wait between sampling, in milliseconds */ + private final int interval; + /** The time when sampling first began */ + private long startTime = -1; + /** The unix timestamp (in millis) when this sampler should automatically complete.*/ + private final long endTime; // -1 for nothing + + public Sampler(int interval, ThreadDumper threadDumper, ThreadGrouper threadGrouper, long endTime) { + this.threadDumper = threadDumper; + this.dataAggregator = new SimpleDataAggregator(this.workerPool, threadGrouper, interval); + this.interval = interval; + this.endTime = endTime; + } + + public Sampler(int interval, ThreadDumper threadDumper, ThreadGrouper threadGrouper, long endTime, TickCounter tickCounter, int tickLengthThreshold) { + this.threadDumper = threadDumper; + this.dataAggregator = new TickedDataAggregator(this.workerPool, tickCounter, threadGrouper, interval, tickLengthThreshold); + this.interval = interval; + this.endTime = endTime; + } + + /** + * Starts the sampler. + */ + public void start() { + this.startTime = System.currentTimeMillis(); + this.dataAggregator.start(); + this.task = workerPool.scheduleAtFixedRate(this, 0, interval, TimeUnit.MILLISECONDS); + } + + public long getStartTime() { + if (this.startTime == -1) { + throw new IllegalStateException("Not yet started"); + } + return this.startTime; + } + + public long getEndTime() { + return this.endTime; + } + + public CompletableFuture<Sampler> getFuture() { + return this.future; + } + + public void cancel() { + task.cancel(false); + } + + @Override + public void run() { + try { + if (this.endTime != -1 && this.endTime <= System.currentTimeMillis()) { + this.future.complete(this); + cancel(); + return; + } + + ThreadInfo[] threadDumps = this.threadDumper.dumpThreads(this.threadBean); + for (ThreadInfo threadInfo : threadDumps) { + String threadName = threadInfo.getThreadName(); + StackTraceElement[] stack = threadInfo.getStackTrace(); + + if (threadName == null || stack == null) { + continue; + } + + this.dataAggregator.insertData(threadName, stack); + } + } catch (Throwable t) { + this.future.completeExceptionally(t); + cancel(); + } + } + + private void writeOutput(JsonWriter writer) throws IOException { + writer.beginObject(); + + writer.name("type").value("sampler"); + writer.name("threads").beginArray(); + + List<Map.Entry<String, ThreadNode>> data = new ArrayList<>(this.dataAggregator.getData().entrySet()); + data.sort(Map.Entry.comparingByKey()); + + for (Map.Entry<String, ThreadNode> entry : data) { + writer.beginObject(); + writer.name("threadName").value(entry.getKey()); + writer.name("totalTime").value(entry.getValue().getTotalTime()); + writer.name("rootNode"); + entry.getValue().serializeTo(writer); + writer.endObject(); + } + + writer.endArray(); + writer.endObject(); + } + + public byte[] formCompressedDataPayload() { + ByteArrayOutputStream byteOut = new ByteArrayOutputStream(); + try (Writer writer = new OutputStreamWriter(new GZIPOutputStream(byteOut), StandardCharsets.UTF_8)) { + try (JsonWriter jsonWriter = new JsonWriter(writer)) { + writeOutput(jsonWriter); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + return byteOut.toByteArray(); + } + +} diff --git a/spark-common/src/main/java/me/lucko/spark/sampler/SamplerBuilder.java b/spark-common/src/main/java/me/lucko/spark/sampler/SamplerBuilder.java new file mode 100644 index 0000000..2936c65 --- /dev/null +++ b/spark-common/src/main/java/me/lucko/spark/sampler/SamplerBuilder.java @@ -0,0 +1,82 @@ +/* + * This file is part of spark. + * + * Copyright (c) lucko (Luck) <luck@lucko.me> + * Copyright (c) contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +package me.lucko.spark.sampler; + +import java.util.concurrent.TimeUnit; + +/** + * Builds {@link Sampler} instances. + */ +public class SamplerBuilder { + + private int samplingInterval = 4; + private long timeout = -1; + private ThreadDumper threadDumper = ThreadDumper.ALL; + private ThreadGrouper threadGrouper = ThreadGrouper.BY_NAME; + + private int ticksOver = -1; + private TickCounter tickCounter = null; + + public SamplerBuilder() { + } + + public SamplerBuilder samplingInterval(int samplingInterval) { + this.samplingInterval = samplingInterval; + return this; + } + + public SamplerBuilder completeAfter(long timeout, TimeUnit unit) { + if (timeout <= 0) { + throw new IllegalArgumentException("timeout > 0"); + } + this.timeout = System.currentTimeMillis() + unit.toMillis(timeout); + return this; + } + + public SamplerBuilder threadDumper(ThreadDumper threadDumper) { + this.threadDumper = threadDumper; + return this; + } + + public SamplerBuilder threadGrouper(ThreadGrouper threadGrouper) { + this.threadGrouper = threadGrouper; + return this; + } + + public SamplerBuilder ticksOver(int ticksOver, TickCounter tickCounter) { + this.ticksOver = ticksOver; + this.tickCounter = tickCounter; + return this; + } + + public Sampler start() { + Sampler sampler; + if (this.ticksOver != -1 && this.tickCounter != null) { + sampler = new Sampler(this.samplingInterval, this.threadDumper, this.threadGrouper, this.timeout, this.tickCounter, this.ticksOver); + } else { + sampler = new Sampler(this.samplingInterval, this.threadDumper, this.threadGrouper, this.timeout); + } + + sampler.start(); + return sampler; + } + +} diff --git a/spark-common/src/main/java/me/lucko/spark/sampler/ThreadDumper.java b/spark-common/src/main/java/me/lucko/spark/sampler/ThreadDumper.java new file mode 100644 index 0000000..af963c6 --- /dev/null +++ b/spark-common/src/main/java/me/lucko/spark/sampler/ThreadDumper.java @@ -0,0 +1,80 @@ +/* + * This file is part of spark. + * + * Copyright (C) Albert Pham <http://www.sk89q.com> + * Copyright (c) lucko (Luck) <luck@lucko.me> + * Copyright (c) contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +package me.lucko.spark.sampler; + +import java.lang.management.ThreadInfo; +import java.lang.management.ThreadMXBean; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Uses the {@link ThreadMXBean} to generate {@link ThreadInfo} instances for the threads being + * sampled. + */ +@FunctionalInterface +public interface ThreadDumper { + + /** + * Generates {@link ThreadInfo} data for the sampled threads. + * + * @param threadBean the thread bean instance to obtain the data from + * @return an array of generated thread info instances + */ + ThreadInfo[] dumpThreads(ThreadMXBean threadBean); + + /** + * Implementation of {@link ThreadDumper} that generates data for all threads. + */ + ThreadDumper ALL = new All(); + + final class All implements ThreadDumper { + @Override + public ThreadInfo[] dumpThreads(ThreadMXBean threadBean) { + return threadBean.dumpAllThreads(false, false); + } + } + + /** + * Implementation of {@link ThreadDumper} that generates data for a specific set of threads. + */ + final class Specific implements ThreadDumper { + private final long[] ids; + + public Specific(long[] ids) { + this.ids = ids; + } + + public Specific(Set<String> names) { + Set<String> threadNamesLower = names.stream().map(String::toLowerCase).collect(Collectors.toSet()); + this.ids = Thread.getAllStackTraces().keySet().stream() + .filter(t -> threadNamesLower.contains(t.getName().toLowerCase())) + .mapToLong(Thread::getId) + .toArray(); + } + + @Override + public ThreadInfo[] dumpThreads(ThreadMXBean threadBean) { + return threadBean.getThreadInfo(this.ids, Integer.MAX_VALUE); + } + } + +} diff --git a/spark-common/src/main/java/me/lucko/spark/sampler/ThreadGrouper.java b/spark-common/src/main/java/me/lucko/spark/sampler/ThreadGrouper.java new file mode 100644 index 0000000..0707df6 --- /dev/null +++ b/spark-common/src/main/java/me/lucko/spark/sampler/ThreadGrouper.java @@ -0,0 +1,72 @@ +/* + * This file is part of spark. + * + * Copyright (c) lucko (Luck) <luck@lucko.me> + * Copyright (c) contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +package me.lucko.spark.sampler; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * Function for grouping threads together + */ +@FunctionalInterface +public interface ThreadGrouper { + + /** + * Gets the group for the given thread. + * + * @param threadName the name of the thread + * @return the group + */ + String getGroup(String threadName); + + /** + * Implementation of {@link ThreadGrouper} that just groups by thread name. + */ + ThreadGrouper BY_NAME = new ByName(); + + final class ByName implements ThreadGrouper { + @Override + public String getGroup(String threadName) { + return threadName; + } + } + + /** + * Implementation of {@link ThreadGrouper} that attempts to group by the name of the pool + * the thread originated from. + */ + ThreadGrouper BY_POOL = new ByPool(); + + final class ByPool implements ThreadGrouper { + private static final Pattern THREAD_POOL_PATTERN = Pattern.compile("^(.*)[-#] ?\\d+$"); + + @Override + public String getGroup(String threadName) { + Matcher matcher = THREAD_POOL_PATTERN.matcher(threadName); + if (!matcher.matches()) { + return threadName; + } + + return matcher.group(1).trim() + " (Combined)"; + } + } + +} diff --git a/spark-common/src/main/java/me/lucko/spark/sampler/TickCounter.java b/spark-common/src/main/java/me/lucko/spark/sampler/TickCounter.java new file mode 100644 index 0000000..d6b754b --- /dev/null +++ b/spark-common/src/main/java/me/lucko/spark/sampler/TickCounter.java @@ -0,0 +1,60 @@ +/* + * This file is part of spark. + * + * Copyright (c) lucko (Luck) <luck@lucko.me> + * Copyright (c) contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +package me.lucko.spark.sampler; + +/** + * A hook with the game's "tick loop". + */ +public interface TickCounter extends AutoCloseable { + + /** + * Starts the counter + */ + void start(); + + /** + * Stops the counter + */ + @Override + void close(); + + /** + * Gets the current tick number + * + * @return the current tick + */ + long getCurrentTick(); + + /** + * Adds a task to be called each time the tick increments + * + * @param runnable the task + */ + void addTickTask(Runnable runnable); + + /** + * Removes a tick task + * + * @param runnable the task + */ + void removeTickTask(Runnable runnable); + +} diff --git a/spark-common/src/main/java/me/lucko/spark/sampler/aggregator/DataAggregator.java b/spark-common/src/main/java/me/lucko/spark/sampler/aggregator/DataAggregator.java new file mode 100644 index 0000000..0e38eb4 --- /dev/null +++ b/spark-common/src/main/java/me/lucko/spark/sampler/aggregator/DataAggregator.java @@ -0,0 +1,54 @@ +/* + * This file is part of spark. + * + * Copyright (c) lucko (Luck) <luck@lucko.me> + * Copyright (c) contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +package me.lucko.spark.sampler.aggregator; + +import me.lucko.spark.sampler.node.ThreadNode; + +import java.util.Map; + +/** + * Aggregates sampling data. + */ +public interface DataAggregator { + + /** + * Called before the sampler begins to insert data + */ + default void start() { + + } + + /** + * Forms the output data + * + * @return the output data + */ + Map<String, ThreadNode> getData(); + + /** + * Inserts sampling data into this aggregator + * + * @param threadName the name of the thread + * @param stack the call stack + */ + void insertData(String threadName, StackTraceElement[] stack); + +} diff --git a/spark-common/src/main/java/me/lucko/spark/sampler/aggregator/SimpleDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/sampler/aggregator/SimpleDataAggregator.java new file mode 100644 index 0000000..6f01dd5 --- /dev/null +++ b/spark-common/src/main/java/me/lucko/spark/sampler/aggregator/SimpleDataAggregator.java @@ -0,0 +1,78 @@ +/* + * This file is part of spark. + * + * Copyright (c) lucko (Luck) <luck@lucko.me> + * Copyright (c) contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +package me.lucko.spark.sampler.aggregator; + +import me.lucko.spark.sampler.ThreadGrouper; +import me.lucko.spark.sampler.node.AbstractNode; +import me.lucko.spark.sampler.node.ThreadNode; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * Basic implementation of {@link DataAggregator}. + */ +public class SimpleDataAggregator implements DataAggregator { + + /** A map of root stack nodes for each thread with sampling data */ + private final Map<String, ThreadNode> threadData = new ConcurrentHashMap<>(); + + /** The worker pool used for sampling */ + private final ExecutorService workerPool; + + /** The instance used to group threads together */ + private final ThreadGrouper threadGrouper; + + /** The interval to wait between sampling, in milliseconds */ + private final int interval; + + public SimpleDataAggregator(ExecutorService workerPool, ThreadGrouper threadGrouper, int interval) { + this.workerPool = workerPool; + this.threadGrouper = threadGrouper; + this.interval = interval; + } + + @Override + public void insertData(String threadName, StackTraceElement[] stack) { + try { + String group = this.threadGrouper.getGroup(threadName); + AbstractNode node = this.threadData.computeIfAbsent(group, ThreadNode::new); + node.log(stack, this.interval); + } catch (Exception e) { + e.printStackTrace(); + } + } + + @Override + public Map<String, ThreadNode> getData() { + // wait for all pending data to be inserted + this.workerPool.shutdown(); + try { + this.workerPool.awaitTermination(15, TimeUnit.SECONDS); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + return this.threadData; + } +} diff --git a/spark-common/src/main/java/me/lucko/spark/sampler/aggregator/TickedDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/sampler/aggregator/TickedDataAggregator.java new file mode 100644 index 0000000..d78a2a4 --- /dev/null +++ b/spark-common/src/main/java/me/lucko/spark/sampler/aggregator/TickedDataAggregator.java @@ -0,0 +1,179 @@ +/* + * This file is part of spark. + * + * Copyright (c) lucko (Luck) <luck@lucko.me> + * Copyright (c) contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +package me.lucko.spark.sampler.aggregator; + +import me.lucko.spark.sampler.ThreadGrouper; +import me.lucko.spark.sampler.TickCounter; +import me.lucko.spark.sampler.node.AbstractNode; +import me.lucko.spark.sampler.node.ThreadNode; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * Implementation of {@link DataAggregator} which supports only including sampling data from "ticks" + * which exceed a certain threshold in duration. + */ +public class TickedDataAggregator implements DataAggregator { + + /** A map of root stack nodes for each thread with sampling data */ + private final Map<String, ThreadNode> threadData = new ConcurrentHashMap<>(); + + /** The worker pool for inserting stack nodes */ + private final ExecutorService workerPool; + + /** Used to monitor the current "tick" of the server */ + private final TickCounter tickCounter; + + /** The instance used to group threads together */ + private final ThreadGrouper threadGrouper; + + /** The interval to wait between sampling, in milliseconds */ + private final int interval; + + /** Tick durations under this threshold will not be inserted */ + private final int tickLengthThreshold; + + /** The expected number of samples in each tick */ + private final int expectedSize; + + private final Object mutex = new Object(); + + // state + private long currentTick = -1; + private TickList currentData = new TickList(0); + + public TickedDataAggregator(ExecutorService workerPool, TickCounter tickCounter, ThreadGrouper threadGrouper, int interval, int tickLengthThreshold) { + this.workerPool = workerPool; + this.tickCounter = tickCounter; + this.threadGrouper = threadGrouper; + this.interval = interval; + this.tickLengthThreshold = tickLengthThreshold; + // 50 millis in a tick, plus 10 so we have a bit of room to go over + this.expectedSize = (50 / interval) + 10; + } + + @Override + public void insertData(String threadName, StackTraceElement[] stack) { + synchronized (this.mutex) { + long tick = this.tickCounter.getCurrentTick(); + if (this.currentTick != tick) { + pushCurrentTick(); + this.currentTick = tick; + this.currentData = new TickList(this.expectedSize); + } + + // form the queued data + QueuedThreadInfo queuedData = new QueuedThreadInfo(threadName, stack); + // insert it + this.currentData.addData(queuedData); + } + } + + // guarded by 'mutex' + private void pushCurrentTick() { + TickList currentData = this.currentData; + + // approximate how long the tick lasted + int tickLengthMillis = currentData.getList().size() * this.interval; + + // don't push data below the threshold + if (tickLengthMillis < this.tickLengthThreshold) { + return; + } + + this.workerPool.submit(currentData); + } + + @Override + public void start() { + this.tickCounter.start(); + } + + @Override + public Map<String, ThreadNode> getData() { + // push the current tick + synchronized (this.mutex) { + pushCurrentTick(); + } + + // close the tick counter + this.tickCounter.close(); + + // wait for all pending data to be inserted + this.workerPool.shutdown(); + try { + this.workerPool.awaitTermination(15, TimeUnit.SECONDS); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + return this.threadData; + } + + // called by TickList + void insertData(List<QueuedThreadInfo> dataList) { + for (QueuedThreadInfo data : dataList) { + try { + String group = this.threadGrouper.getGroup(data.threadName); + AbstractNode node = this.threadData.computeIfAbsent(group, ThreadNode::new); + node.log(data.stack, this.interval); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + + private final class TickList implements Runnable { + private final List<QueuedThreadInfo> list; + + TickList(int expectedSize) { + this.list = new ArrayList<>(expectedSize); + } + + @Override + public void run() { + insertData(this.list); + } + + public List<QueuedThreadInfo> getList() { + return this.list; + } + + public void addData(QueuedThreadInfo data) { + this.list.add(data); + } + } + + private static final class QueuedThreadInfo { + private final String threadName; + private final StackTraceElement[] stack; + + QueuedThreadInfo(String threadName, StackTraceElement[] stack) { + this.threadName = threadName; + this.stack = stack; + } + } +} diff --git a/spark-common/src/main/java/me/lucko/spark/sampler/node/AbstractNode.java b/spark-common/src/main/java/me/lucko/spark/sampler/node/AbstractNode.java new file mode 100644 index 0000000..e660140 --- /dev/null +++ b/spark-common/src/main/java/me/lucko/spark/sampler/node/AbstractNode.java @@ -0,0 +1,116 @@ +/* + * This file is part of spark. + * + * Copyright (C) Albert Pham <http://www.sk89q.com> + * Copyright (c) lucko (Luck) <luck@lucko.me> + * Copyright (c) contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +package me.lucko.spark.sampler.node; + +import com.google.gson.stream.JsonWriter; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.LongAdder; + +/** + * Encapsulates a timed node in the sampling stack. + */ +public abstract class AbstractNode { + + private static final int MAX_STACK_DEPTH = 300; + + /** + * A map of this nodes children + */ + private final Map<String, StackTraceNode> children = new ConcurrentHashMap<>(); + + /** + * The accumulated sample time for this node + */ + private final LongAdder totalTime = new LongAdder(); + + public long getTotalTime() { + return this.totalTime.longValue(); + } + + private AbstractNode resolveChild(String className, String methodName) { + return this.children.computeIfAbsent( + StackTraceNode.generateKey(className, methodName), + name -> new StackTraceNode(className, methodName) + ); + } + + public void log(StackTraceElement[] elements, long time) { + log(elements, 0, time); + } + + private void log(StackTraceElement[] elements, int skip, long time) { + this.totalTime.add(time); + + if (skip >= MAX_STACK_DEPTH) { + return; + } + + if (elements.length - skip == 0) { + return; + } + + StackTraceElement bottom = elements[elements.length - (skip + 1)]; + resolveChild(bottom.getClassName(), bottom.getMethodName()).log(elements, skip + 1, time); + } + + private Collection<? extends AbstractNode> getChildren() { + if (this.children.isEmpty()) { + return Collections.emptyList(); + } + + List<StackTraceNode> list = new ArrayList<>(this.children.values()); + list.sort(null); + return list; + } + + public void serializeTo(JsonWriter writer) throws IOException { + writer.beginObject(); + + // append metadata about this node + appendMetadata(writer); + + // include the total time recorded for this node + writer.name("t").value(getTotalTime()); + + // append child nodes, if any are present + Collection<? extends AbstractNode> childNodes = getChildren(); + if (!childNodes.isEmpty()) { + writer.name("c").beginArray(); + for (AbstractNode child : childNodes) { + child.serializeTo(writer); + } + writer.endArray(); + } + + writer.endObject(); + } + + protected abstract void appendMetadata(JsonWriter writer) throws IOException; + +} diff --git a/spark-common/src/main/java/me/lucko/spark/sampler/node/StackTraceNode.java b/spark-common/src/main/java/me/lucko/spark/sampler/node/StackTraceNode.java new file mode 100644 index 0000000..d161b42 --- /dev/null +++ b/spark-common/src/main/java/me/lucko/spark/sampler/node/StackTraceNode.java @@ -0,0 +1,74 @@ +/* + * This file is part of spark. + * + * Copyright (C) Albert Pham <http://www.sk89q.com> + * Copyright (c) lucko (Luck) <luck@lucko.me> + * Copyright (c) contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +package me.lucko.spark.sampler.node; + +import com.google.gson.stream.JsonWriter; + +import java.io.IOException; + +/** + * Represents a stack trace element within the {@link AbstractNode node} structure. + */ +public final class StackTraceNode extends AbstractNode implements Comparable<StackTraceNode> { + + /** + * Forms a key to represent the given node. + * + * @param className the name of the class + * @param methodName the name of the method + * @return the key + */ + static String generateKey(String className, String methodName) { + return className + "." + methodName; + } + + /** The name of the class */ + private final String className; + /** The name of the method */ + private final String methodName; + + public StackTraceNode(String className, String methodName) { + this.className = className; + this.methodName = methodName; + } + + @Override + protected void appendMetadata(JsonWriter writer) throws IOException { + writer.name("cl").value(this.className); + writer.name("m").value(this.methodName); + } + + private String key() { + return generateKey(this.className, this.methodName); + } + + @Override + public int compareTo(StackTraceNode that) { + int i = -Long.compare(this.getTotalTime(), that.getTotalTime()); + if (i != 0) { + return i; + } + + return this.key().compareTo(that.key()); + } + +} diff --git a/spark-common/src/main/java/me/lucko/spark/sampler/node/ThreadNode.java b/spark-common/src/main/java/me/lucko/spark/sampler/node/ThreadNode.java new file mode 100644 index 0000000..2acce21 --- /dev/null +++ b/spark-common/src/main/java/me/lucko/spark/sampler/node/ThreadNode.java @@ -0,0 +1,44 @@ +/* + * This file is part of spark. + * + * Copyright (c) lucko (Luck) <luck@lucko.me> + * Copyright (c) contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +package me.lucko.spark.sampler.node; + +import com.google.gson.stream.JsonWriter; + +import java.io.IOException; + +/** + * The root of a sampling stack for a given thread / thread group. + */ +public final class ThreadNode extends AbstractNode { + + /** + * The name of this thread + */ + private final String threadName; + + public ThreadNode(String threadName) { + this.threadName = threadName; + } + + protected void appendMetadata(JsonWriter writer) throws IOException { + writer.name("name").value(this.threadName); + } +} |