diff options
author | Luck <git@lucko.me> | 2019-04-16 21:37:59 +0100 |
---|---|---|
committer | Luck <git@lucko.me> | 2019-04-16 21:37:59 +0100 |
commit | ecd4cec8545460a4fc4ca65b911c2503a00cd8e7 (patch) | |
tree | 62067383a1044abc3a09724e89c6e7c619e87ec0 /spark-common/src/main/java/me/lucko/spark/sampler | |
parent | 8a61b404848ed8e3c27f06eb73239d37d4273240 (diff) | |
download | spark-ecd4cec8545460a4fc4ca65b911c2503a00cd8e7.tar.gz spark-ecd4cec8545460a4fc4ca65b911c2503a00cd8e7.tar.bz2 spark-ecd4cec8545460a4fc4ca65b911c2503a00cd8e7.zip |
Lots of refactoring, add tps command
Diffstat (limited to 'spark-common/src/main/java/me/lucko/spark/sampler')
11 files changed, 0 insertions, 1142 deletions
diff --git a/spark-common/src/main/java/me/lucko/spark/sampler/Sampler.java b/spark-common/src/main/java/me/lucko/spark/sampler/Sampler.java deleted file mode 100644 index 6777770..0000000 --- a/spark-common/src/main/java/me/lucko/spark/sampler/Sampler.java +++ /dev/null @@ -1,201 +0,0 @@ -/* - * This file is part of spark. - * - * Copyright (C) Albert Pham <http://www.sk89q.com> - * Copyright (c) lucko (Luck) <luck@lucko.me> - * Copyright (c) contributors - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - */ - -package me.lucko.spark.sampler; - -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import com.google.gson.stream.JsonWriter; - -import me.lucko.spark.sampler.aggregator.DataAggregator; -import me.lucko.spark.sampler.aggregator.SimpleDataAggregator; -import me.lucko.spark.sampler.aggregator.TickedDataAggregator; -import me.lucko.spark.sampler.node.ThreadNode; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.OutputStreamWriter; -import java.io.Writer; -import java.lang.management.ManagementFactory; -import java.lang.management.ThreadInfo; -import java.lang.management.ThreadMXBean; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.zip.GZIPOutputStream; - -/** - * Main sampler class. - */ -public class Sampler implements Runnable { - private static final AtomicInteger THREAD_ID = new AtomicInteger(0); - - /** The worker pool for inserting stack nodes */ - private final ScheduledExecutorService workerPool = Executors.newScheduledThreadPool( - 6, new ThreadFactoryBuilder().setNameFormat("spark-worker-" + THREAD_ID.getAndIncrement() + "-%d").build() - ); - - /** The main sampling task */ - private ScheduledFuture<?> task; - - /** The thread management interface for the current JVM */ - private final ThreadMXBean threadBean = ManagementFactory.getThreadMXBean(); - /** The instance used to generate thread information for use in sampling */ - private final ThreadDumper threadDumper; - /** Responsible for aggregating and then outputting collected sampling data */ - private final DataAggregator dataAggregator; - - /** A future to encapsulation the completion of this sampler instance */ - private final CompletableFuture<Sampler> future = new CompletableFuture<>(); - - /** The interval to wait between sampling, in microseconds */ - private final int interval; - /** The time when sampling first began */ - private long startTime = -1; - /** The unix timestamp (in millis) when this sampler should automatically complete.*/ - private final long endTime; // -1 for nothing - - public Sampler(int interval, ThreadDumper threadDumper, ThreadGrouper threadGrouper, long endTime, boolean includeLineNumbers) { - this.threadDumper = threadDumper; - this.dataAggregator = new SimpleDataAggregator(this.workerPool, threadGrouper, interval, includeLineNumbers); - this.interval = interval; - this.endTime = endTime; - } - - public Sampler(int interval, ThreadDumper threadDumper, ThreadGrouper threadGrouper, long endTime, boolean includeLineNumbers, TickCounter tickCounter, int tickLengthThreshold) { - this.threadDumper = threadDumper; - this.dataAggregator = new TickedDataAggregator(this.workerPool, tickCounter, threadGrouper, interval, includeLineNumbers, tickLengthThreshold); - this.interval = interval; - this.endTime = endTime; - } - - /** - * Starts the sampler. - */ - public void start() { - this.startTime = System.currentTimeMillis(); - this.dataAggregator.start(); - this.task = this.workerPool.scheduleAtFixedRate(this, 0, this.interval, TimeUnit.MICROSECONDS); - } - - public long getStartTime() { - if (this.startTime == -1) { - throw new IllegalStateException("Not yet started"); - } - return this.startTime; - } - - public long getEndTime() { - return this.endTime; - } - - public CompletableFuture<Sampler> getFuture() { - return this.future; - } - - public void cancel() { - this.task.cancel(false); - } - - @Override - public void run() { - // this is effectively synchronized, the worker pool will not allow this task - // to concurrently execute. - try { - if (this.endTime != -1 && this.endTime <= System.currentTimeMillis()) { - this.future.complete(this); - cancel(); - return; - } - - ThreadInfo[] threadDumps = this.threadDumper.dumpThreads(this.threadBean); - this.workerPool.execute(new InsertDataTask(this.dataAggregator, threadDumps)); - } catch (Throwable t) { - this.future.completeExceptionally(t); - cancel(); - } - } - - private static final class InsertDataTask implements Runnable { - private final DataAggregator dataAggregator; - private final ThreadInfo[] threadDumps; - - InsertDataTask(DataAggregator dataAggregator, ThreadInfo[] threadDumps) { - this.dataAggregator = dataAggregator; - this.threadDumps = threadDumps; - } - - @Override - public void run() { - for (ThreadInfo threadInfo : this.threadDumps) { - String threadName = threadInfo.getThreadName(); - StackTraceElement[] stack = threadInfo.getStackTrace(); - - if (threadName == null || stack == null) { - continue; - } - - this.dataAggregator.insertData(threadName, stack); - } - } - } - - private void writeOutput(JsonWriter writer) throws IOException { - writer.beginObject(); - - writer.name("type").value("sampler"); - writer.name("threads").beginArray(); - - List<Map.Entry<String, ThreadNode>> data = new ArrayList<>(this.dataAggregator.getData().entrySet()); - data.sort(Map.Entry.comparingByKey()); - - for (Map.Entry<String, ThreadNode> entry : data) { - writer.beginObject(); - writer.name("threadName").value(entry.getKey()); - writer.name("totalTime").value(entry.getValue().getTotalTime()); - writer.name("rootNode"); - entry.getValue().serializeTo(writer); - writer.endObject(); - } - - writer.endArray(); - writer.endObject(); - } - - public byte[] formCompressedDataPayload() { - ByteArrayOutputStream byteOut = new ByteArrayOutputStream(); - try (Writer writer = new OutputStreamWriter(new GZIPOutputStream(byteOut), StandardCharsets.UTF_8)) { - try (JsonWriter jsonWriter = new JsonWriter(writer)) { - writeOutput(jsonWriter); - } - } catch (IOException e) { - throw new RuntimeException(e); - } - return byteOut.toByteArray(); - } - -} diff --git a/spark-common/src/main/java/me/lucko/spark/sampler/SamplerBuilder.java b/spark-common/src/main/java/me/lucko/spark/sampler/SamplerBuilder.java deleted file mode 100644 index bf9dc04..0000000 --- a/spark-common/src/main/java/me/lucko/spark/sampler/SamplerBuilder.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * This file is part of spark. - * - * Copyright (c) lucko (Luck) <luck@lucko.me> - * Copyright (c) contributors - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - */ - -package me.lucko.spark.sampler; - -import java.util.concurrent.TimeUnit; - -/** - * Builds {@link Sampler} instances. - */ -public class SamplerBuilder { - - private double samplingInterval = 4; // milliseconds - private boolean includeLineNumbers = false; - private long timeout = -1; - private ThreadDumper threadDumper = ThreadDumper.ALL; - private ThreadGrouper threadGrouper = ThreadGrouper.BY_NAME; - - private int ticksOver = -1; - private TickCounter tickCounter = null; - - public SamplerBuilder() { - } - - public SamplerBuilder samplingInterval(double samplingInterval) { - this.samplingInterval = samplingInterval; - return this; - } - - public SamplerBuilder completeAfter(long timeout, TimeUnit unit) { - if (timeout <= 0) { - throw new IllegalArgumentException("timeout > 0"); - } - this.timeout = System.currentTimeMillis() + unit.toMillis(timeout); - return this; - } - - public SamplerBuilder threadDumper(ThreadDumper threadDumper) { - this.threadDumper = threadDumper; - return this; - } - - public SamplerBuilder threadGrouper(ThreadGrouper threadGrouper) { - this.threadGrouper = threadGrouper; - return this; - } - - public SamplerBuilder ticksOver(int ticksOver, TickCounter tickCounter) { - this.ticksOver = ticksOver; - this.tickCounter = tickCounter; - return this; - } - - public SamplerBuilder includeLineNumbers(boolean includeLineNumbers) { - this.includeLineNumbers = includeLineNumbers; - return this; - } - - public Sampler start() { - Sampler sampler; - - int intervalMicros = (int) (this.samplingInterval * 1000d); - if (this.ticksOver != -1 && this.tickCounter != null) { - sampler = new Sampler(intervalMicros, this.threadDumper, this.threadGrouper, this.timeout, this.includeLineNumbers, this.tickCounter, this.ticksOver); - } else { - sampler = new Sampler(intervalMicros, this.threadDumper, this.threadGrouper, this.timeout, this.includeLineNumbers); - } - - sampler.start(); - return sampler; - } - -} diff --git a/spark-common/src/main/java/me/lucko/spark/sampler/ThreadDumper.java b/spark-common/src/main/java/me/lucko/spark/sampler/ThreadDumper.java deleted file mode 100644 index 5b68eaf..0000000 --- a/spark-common/src/main/java/me/lucko/spark/sampler/ThreadDumper.java +++ /dev/null @@ -1,126 +0,0 @@ -/* - * This file is part of spark. - * - * Copyright (C) Albert Pham <http://www.sk89q.com> - * Copyright (c) lucko (Luck) <luck@lucko.me> - * Copyright (c) contributors - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - */ - -package me.lucko.spark.sampler; - -import me.lucko.spark.util.ThreadFinder; - -import java.lang.management.ThreadInfo; -import java.lang.management.ThreadMXBean; -import java.util.HashMap; -import java.util.Map; -import java.util.Objects; -import java.util.Set; -import java.util.regex.Pattern; -import java.util.regex.PatternSyntaxException; -import java.util.stream.Collectors; - -/** - * Uses the {@link ThreadMXBean} to generate {@link ThreadInfo} instances for the threads being - * sampled. - */ -@FunctionalInterface -public interface ThreadDumper { - - /** - * Generates {@link ThreadInfo} data for the sampled threads. - * - * @param threadBean the thread bean instance to obtain the data from - * @return an array of generated thread info instances - */ - ThreadInfo[] dumpThreads(ThreadMXBean threadBean); - - /** - * Implementation of {@link ThreadDumper} that generates data for all threads. - */ - ThreadDumper ALL = threadBean -> threadBean.dumpAllThreads(false, false); - - /** - * Implementation of {@link ThreadDumper} that generates data for a specific set of threads. - */ - final class Specific implements ThreadDumper { - private final ThreadFinder threadFinder = new ThreadFinder(); - private final long[] ids; - - public Specific(long[] ids) { - this.ids = ids; - } - - public Specific(Set<String> names) { - Set<String> namesLower = names.stream().map(String::toLowerCase).collect(Collectors.toSet()); - this.ids = this.threadFinder.getThreads() - .filter(t -> namesLower.contains(t.getName().toLowerCase())) - .mapToLong(Thread::getId) - .toArray(); - } - - @Override - public ThreadInfo[] dumpThreads(ThreadMXBean threadBean) { - return threadBean.getThreadInfo(this.ids, Integer.MAX_VALUE); - } - } - - /** - * Implementation of {@link ThreadDumper} that generates data for a regex matched set of threads. - */ - final class Regex implements ThreadDumper { - private final ThreadFinder threadFinder = new ThreadFinder(); - private final Set<Pattern> namePatterns; - private final Map<Long, Boolean> cache = new HashMap<>(); - - public Regex(Set<String> namePatterns) { - this.namePatterns = namePatterns.stream() - .map(regex -> { - try { - return Pattern.compile(regex); - } catch (PatternSyntaxException e) { - return null; - } - }) - .filter(Objects::nonNull) - .collect(Collectors.toSet()); - } - - @Override - public ThreadInfo[] dumpThreads(ThreadMXBean threadBean) { - return this.threadFinder.getThreads() - .filter(thread -> { - Boolean result = this.cache.get(thread.getId()); - if (result != null) { - return result; - } - - for (Pattern pattern : this.namePatterns) { - if (pattern.matcher(thread.getName()).matches()) { - this.cache.put(thread.getId(), true); - return true; - } - } - this.cache.put(thread.getId(), false); - return false; - }) - .map(thread -> threadBean.getThreadInfo(thread.getId())) - .filter(Objects::nonNull) - .toArray(ThreadInfo[]::new); - } - } - -} diff --git a/spark-common/src/main/java/me/lucko/spark/sampler/ThreadGrouper.java b/spark-common/src/main/java/me/lucko/spark/sampler/ThreadGrouper.java deleted file mode 100644 index 3f1be33..0000000 --- a/spark-common/src/main/java/me/lucko/spark/sampler/ThreadGrouper.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * This file is part of spark. - * - * Copyright (c) lucko (Luck) <luck@lucko.me> - * Copyright (c) contributors - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - */ - -package me.lucko.spark.sampler; - -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -/** - * Function for grouping threads together - */ -@FunctionalInterface -public interface ThreadGrouper { - - /** - * Gets the group for the given thread. - * - * @param threadName the name of the thread - * @return the group - */ - String getGroup(String threadName); - - /** - * Implementation of {@link ThreadGrouper} that just groups by thread name. - */ - ThreadGrouper BY_NAME = threadName -> threadName; - - /** - * Implementation of {@link ThreadGrouper} that attempts to group by the name of the pool - * the thread originated from. - * - * <p>The regex pattern used to match pools expects a digit at the end of the thread name, - * separated from the pool name with any of one or more of ' ', '-', or '#'.</p> - */ - ThreadGrouper BY_POOL = new ThreadGrouper() { - private final Pattern pattern = Pattern.compile("^(.*?)[-# ]+\\d+$"); - - @Override - public String getGroup(String threadName) { - Matcher matcher = this.pattern.matcher(threadName); - if (!matcher.matches()) { - return threadName; - } - - return matcher.group(1).trim() + " (Combined)"; - } - }; - - /** - * Implementation of {@link ThreadGrouper} which groups all threads as one, under - * the name "All". - */ - ThreadGrouper AS_ONE = threadName -> "All"; - -} diff --git a/spark-common/src/main/java/me/lucko/spark/sampler/TickCounter.java b/spark-common/src/main/java/me/lucko/spark/sampler/TickCounter.java deleted file mode 100644 index 059e420..0000000 --- a/spark-common/src/main/java/me/lucko/spark/sampler/TickCounter.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * This file is part of spark. - * - * Copyright (c) lucko (Luck) <luck@lucko.me> - * Copyright (c) contributors - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - */ - -package me.lucko.spark.sampler; - -/** - * A hook with the game's "tick loop". - */ -public interface TickCounter extends AutoCloseable { - - /** - * Starts the counter - */ - void start(); - - /** - * Stops the counter - */ - @Override - void close(); - - /** - * Gets the current tick number - * - * @return the current tick - */ - int getCurrentTick(); - - /** - * Adds a task to be called each time the tick increments - * - * @param runnable the task - */ - void addTickTask(Runnable runnable); - - /** - * Removes a tick task - * - * @param runnable the task - */ - void removeTickTask(Runnable runnable); - -} diff --git a/spark-common/src/main/java/me/lucko/spark/sampler/aggregator/DataAggregator.java b/spark-common/src/main/java/me/lucko/spark/sampler/aggregator/DataAggregator.java deleted file mode 100644 index 0e38eb4..0000000 --- a/spark-common/src/main/java/me/lucko/spark/sampler/aggregator/DataAggregator.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * This file is part of spark. - * - * Copyright (c) lucko (Luck) <luck@lucko.me> - * Copyright (c) contributors - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - */ - -package me.lucko.spark.sampler.aggregator; - -import me.lucko.spark.sampler.node.ThreadNode; - -import java.util.Map; - -/** - * Aggregates sampling data. - */ -public interface DataAggregator { - - /** - * Called before the sampler begins to insert data - */ - default void start() { - - } - - /** - * Forms the output data - * - * @return the output data - */ - Map<String, ThreadNode> getData(); - - /** - * Inserts sampling data into this aggregator - * - * @param threadName the name of the thread - * @param stack the call stack - */ - void insertData(String threadName, StackTraceElement[] stack); - -} diff --git a/spark-common/src/main/java/me/lucko/spark/sampler/aggregator/SimpleDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/sampler/aggregator/SimpleDataAggregator.java deleted file mode 100644 index a72b47f..0000000 --- a/spark-common/src/main/java/me/lucko/spark/sampler/aggregator/SimpleDataAggregator.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * This file is part of spark. - * - * Copyright (c) lucko (Luck) <luck@lucko.me> - * Copyright (c) contributors - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - */ - -package me.lucko.spark.sampler.aggregator; - -import me.lucko.spark.sampler.ThreadGrouper; -import me.lucko.spark.sampler.node.AbstractNode; -import me.lucko.spark.sampler.node.ThreadNode; - -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; - -/** - * Basic implementation of {@link DataAggregator}. - */ -public class SimpleDataAggregator implements DataAggregator { - - /** A map of root stack nodes for each thread with sampling data */ - private final Map<String, ThreadNode> threadData = new ConcurrentHashMap<>(); - - /** The worker pool used for sampling */ - private final ExecutorService workerPool; - - /** The instance used to group threads together */ - private final ThreadGrouper threadGrouper; - - /** The interval to wait between sampling, in microseconds */ - private final int interval; - - /** If line numbers should be included in the output */ - private final boolean includeLineNumbers; - - public SimpleDataAggregator(ExecutorService workerPool, ThreadGrouper threadGrouper, int interval, boolean includeLineNumbers) { - this.workerPool = workerPool; - this.threadGrouper = threadGrouper; - this.interval = interval; - this.includeLineNumbers = includeLineNumbers; - } - - @Override - public void insertData(String threadName, StackTraceElement[] stack) { - try { - String group = this.threadGrouper.getGroup(threadName); - AbstractNode node = this.threadData.computeIfAbsent(group, ThreadNode::new); - node.log(stack, this.interval, this.includeLineNumbers); - } catch (Exception e) { - e.printStackTrace(); - } - } - - @Override - public Map<String, ThreadNode> getData() { - // wait for all pending data to be inserted - this.workerPool.shutdown(); - try { - this.workerPool.awaitTermination(15, TimeUnit.SECONDS); - } catch (InterruptedException e) { - e.printStackTrace(); - } - - return this.threadData; - } -} diff --git a/spark-common/src/main/java/me/lucko/spark/sampler/aggregator/TickedDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/sampler/aggregator/TickedDataAggregator.java deleted file mode 100644 index ef568c8..0000000 --- a/spark-common/src/main/java/me/lucko/spark/sampler/aggregator/TickedDataAggregator.java +++ /dev/null @@ -1,184 +0,0 @@ -/* - * This file is part of spark. - * - * Copyright (c) lucko (Luck) <luck@lucko.me> - * Copyright (c) contributors - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - */ - -package me.lucko.spark.sampler.aggregator; - -import me.lucko.spark.sampler.ThreadGrouper; -import me.lucko.spark.sampler.TickCounter; -import me.lucko.spark.sampler.node.AbstractNode; -import me.lucko.spark.sampler.node.ThreadNode; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; - -/** - * Implementation of {@link DataAggregator} which supports only including sampling data from "ticks" - * which exceed a certain threshold in duration. - */ -public class TickedDataAggregator implements DataAggregator { - - /** A map of root stack nodes for each thread with sampling data */ - private final Map<String, ThreadNode> threadData = new ConcurrentHashMap<>(); - - /** The worker pool for inserting stack nodes */ - private final ExecutorService workerPool; - - /** Used to monitor the current "tick" of the server */ - private final TickCounter tickCounter; - - /** The instance used to group threads together */ - private final ThreadGrouper threadGrouper; - - /** The interval to wait between sampling, in microseconds */ - private final int interval; - - /** If line numbers should be included in the output */ - private final boolean includeLineNumbers; - - /** Tick durations under this threshold will not be inserted, measured in microseconds */ - private final long tickLengthThreshold; - - /** The expected number of samples in each tick */ - private final int expectedSize; - - private final Object mutex = new Object(); - - // state - private int currentTick = -1; - private TickList currentData = new TickList(0); - - public TickedDataAggregator(ExecutorService workerPool, TickCounter tickCounter, ThreadGrouper threadGrouper, int interval, boolean includeLineNumbers, int tickLengthThreshold) { - this.workerPool = workerPool; - this.tickCounter = tickCounter; - this.threadGrouper = threadGrouper; - this.interval = interval; - this.includeLineNumbers = includeLineNumbers; - this.tickLengthThreshold = TimeUnit.MILLISECONDS.toMicros(tickLengthThreshold); - // 50 millis in a tick, plus 10 so we have a bit of room to go over - double intervalMilliseconds = interval / 1000d; - this.expectedSize = (int) ((50 / intervalMilliseconds) + 10); - } - - @Override - public void insertData(String threadName, StackTraceElement[] stack) { - synchronized (this.mutex) { - int tick = this.tickCounter.getCurrentTick(); - if (this.currentTick != tick) { - pushCurrentTick(); - this.currentTick = tick; - this.currentData = new TickList(this.expectedSize); - } - - // form the queued data - QueuedThreadInfo queuedData = new QueuedThreadInfo(threadName, stack); - // insert it - this.currentData.addData(queuedData); - } - } - - // guarded by 'mutex' - private void pushCurrentTick() { - TickList currentData = this.currentData; - - // approximate how long the tick lasted - int tickLengthMicros = currentData.getList().size() * this.interval; - - // don't push data below the threshold - if (tickLengthMicros < this.tickLengthThreshold) { - return; - } - - this.workerPool.submit(currentData); - } - - @Override - public void start() { - this.tickCounter.start(); - } - - @Override - public Map<String, ThreadNode> getData() { - // push the current tick - synchronized (this.mutex) { - pushCurrentTick(); - } - - // close the tick counter - this.tickCounter.close(); - - // wait for all pending data to be inserted - this.workerPool.shutdown(); - try { - this.workerPool.awaitTermination(15, TimeUnit.SECONDS); - } catch (InterruptedException e) { - e.printStackTrace(); - } - - return this.threadData; - } - - // called by TickList - void insertData(List<QueuedThreadInfo> dataList) { - for (QueuedThreadInfo data : dataList) { - try { - String group = this.threadGrouper.getGroup(data.threadName); - AbstractNode node = this.threadData.computeIfAbsent(group, ThreadNode::new); - node.log(data.stack, this.interval, this.includeLineNumbers); - } catch (Exception e) { - e.printStackTrace(); - } - } - } - - private final class TickList implements Runnable { - private final List<QueuedThreadInfo> list; - - TickList(int expectedSize) { - this.list = new ArrayList<>(expectedSize); - } - - @Override - public void run() { - insertData(this.list); - } - - public List<QueuedThreadInfo> getList() { - return this.list; - } - - public void addData(QueuedThreadInfo data) { - this.list.add(data); - } - } - - private static final class QueuedThreadInfo { - private final String threadName; - private final StackTraceElement[] stack; - - QueuedThreadInfo(String threadName, StackTraceElement[] stack) { - this.threadName = threadName; - this.stack = stack; - } - } -} diff --git a/spark-common/src/main/java/me/lucko/spark/sampler/node/AbstractNode.java b/spark-common/src/main/java/me/lucko/spark/sampler/node/AbstractNode.java deleted file mode 100644 index 859014f..0000000 --- a/spark-common/src/main/java/me/lucko/spark/sampler/node/AbstractNode.java +++ /dev/null @@ -1,143 +0,0 @@ -/* - * This file is part of spark. - * - * Copyright (C) Albert Pham <http://www.sk89q.com> - * Copyright (c) lucko (Luck) <luck@lucko.me> - * Copyright (c) contributors - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - */ - -package me.lucko.spark.sampler.node; - -import com.google.gson.stream.JsonWriter; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.LongAdder; - -/** - * Encapsulates a timed node in the sampling stack. - */ -public abstract class AbstractNode { - - private static final int MAX_STACK_DEPTH = 300; - - /** - * A map of this nodes children - */ - private final Map<String, StackTraceNode> children = new ConcurrentHashMap<>(); - - /** - * The accumulated sample time for this node, measured in microseconds - */ - private final LongAdder totalTime = new LongAdder(); - - /** - * Returns the total sample time for this node in milliseconds. - * - * @return the total time - */ - public long getTotalTime() { - long millis = TimeUnit.MICROSECONDS.toMillis(this.totalTime.longValue()); - if (millis == 0) { - return 1; - } - return millis; - } - - private AbstractNode resolveChild(String className, String methodName, int lineNumber) { - return this.children.computeIfAbsent( - StackTraceNode.generateKey(className, methodName, lineNumber), - name -> new StackTraceNode(className, methodName, lineNumber) - ); - } - - public void log(StackTraceElement[] elements, long time, boolean includeLineNumbers) { - log(elements, 0, time, includeLineNumbers); - } - - private void log(StackTraceElement[] elements, int offset, long time, boolean includeLineNumbers) { - this.totalTime.add(time); - - if (offset >= MAX_STACK_DEPTH) { - return; - } - - if (elements.length - offset == 0) { - return; - } - - // the first element in the array is the top of the call stack, and the last is the root - // offset starts at 0. - - // pointer is determined by subtracting the offset from the index of the last element - int pointer = (elements.length - 1) - offset; - StackTraceElement element = elements[pointer]; - - // the parent stack element is located at pointer+1. - // when the current offset is 0, we know the current pointer is at the last element in the - // array (the root) and therefore there is no parent. - StackTraceElement parent = offset == 0 ? null : elements[pointer + 1]; - - // get the line number of the parent element - the line which called "us" - int lineNumber = parent == null || !includeLineNumbers ? StackTraceNode.NULL_LINE_NUMBER : parent.getLineNumber(); - - // resolve a child element within the structure for the element at pointer - AbstractNode child = resolveChild(element.getClassName(), element.getMethodName(), lineNumber); - // call the log method on the found child, with an incremented offset. - child.log(elements, offset + 1, time, includeLineNumbers); - } - - private Collection<? extends AbstractNode> getChildren() { - if (this.children.isEmpty()) { - return Collections.emptyList(); - } - - List<StackTraceNode> list = new ArrayList<>(this.children.values()); - list.sort(null); - return list; - } - - public void serializeTo(JsonWriter writer) throws IOException { - writer.beginObject(); - - // append metadata about this node - appendMetadata(writer); - - // include the total time recorded for this node - writer.name("t").value(getTotalTime()); - - // append child nodes, if any are present - Collection<? extends AbstractNode> childNodes = getChildren(); - if (!childNodes.isEmpty()) { - writer.name("c").beginArray(); - for (AbstractNode child : childNodes) { - child.serializeTo(writer); - } - writer.endArray(); - } - - writer.endObject(); - } - - protected abstract void appendMetadata(JsonWriter writer) throws IOException; - -} diff --git a/spark-common/src/main/java/me/lucko/spark/sampler/node/StackTraceNode.java b/spark-common/src/main/java/me/lucko/spark/sampler/node/StackTraceNode.java deleted file mode 100644 index 8cbcd0f..0000000 --- a/spark-common/src/main/java/me/lucko/spark/sampler/node/StackTraceNode.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * This file is part of spark. - * - * Copyright (C) Albert Pham <http://www.sk89q.com> - * Copyright (c) lucko (Luck) <luck@lucko.me> - * Copyright (c) contributors - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - */ - -package me.lucko.spark.sampler.node; - -import com.google.gson.stream.JsonWriter; - -import java.io.IOException; - -/** - * Represents a stack trace element within the {@link AbstractNode node} structure. - */ -public final class StackTraceNode extends AbstractNode implements Comparable<StackTraceNode> { - - /** - * Magic number to denote "no present" line number for a node. - */ - public static final int NULL_LINE_NUMBER = -1; - - /** - * Forms a key to represent the given node. - * - * @param className the name of the class - * @param methodName the name of the method - * @param lineNumber the line number of the parent method call - * @return the key - */ - static String generateKey(String className, String methodName, int lineNumber) { - return className + "." + methodName + "." + lineNumber; - } - - /** The name of the class */ - private final String className; - /** The name of the method */ - private final String methodName; - /** The line number of the invocation which created this node */ - private final int lineNumber; - - public StackTraceNode(String className, String methodName, int lineNumber) { - this.className = className; - this.methodName = methodName; - this.lineNumber = lineNumber; - } - - @Override - protected void appendMetadata(JsonWriter writer) throws IOException { - writer.name("cl").value(this.className); - writer.name("m").value(this.methodName); - if (this.lineNumber >= 0) { - writer.name("ln").value(this.lineNumber); - } - } - - private String key() { - return generateKey(this.className, this.methodName, this.lineNumber); - } - - @Override - public int compareTo(StackTraceNode that) { - int i = -Long.compare(this.getTotalTime(), that.getTotalTime()); - if (i != 0) { - return i; - } - - return this.key().compareTo(that.key()); - } - -} diff --git a/spark-common/src/main/java/me/lucko/spark/sampler/node/ThreadNode.java b/spark-common/src/main/java/me/lucko/spark/sampler/node/ThreadNode.java deleted file mode 100644 index 2acce21..0000000 --- a/spark-common/src/main/java/me/lucko/spark/sampler/node/ThreadNode.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * This file is part of spark. - * - * Copyright (c) lucko (Luck) <luck@lucko.me> - * Copyright (c) contributors - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - */ - -package me.lucko.spark.sampler.node; - -import com.google.gson.stream.JsonWriter; - -import java.io.IOException; - -/** - * The root of a sampling stack for a given thread / thread group. - */ -public final class ThreadNode extends AbstractNode { - - /** - * The name of this thread - */ - private final String threadName; - - public ThreadNode(String threadName) { - this.threadName = threadName; - } - - protected void appendMetadata(JsonWriter writer) throws IOException { - writer.name("name").value(this.threadName); - } -} |