From a9d25c5c5c54b2d6c9ec64562bd29456fd909768 Mon Sep 17 00:00:00 2001 From: Luck Date: Sat, 9 Jun 2018 23:20:01 +0100 Subject: include line numbers --- .../me/lucko/spark/profiler/DataAggregator.java | 32 ----- .../main/java/me/lucko/spark/profiler/Sampler.java | 9 +- .../lucko/spark/profiler/SimpleDataAggregator.java | 55 ------- .../java/me/lucko/spark/profiler/StackNode.java | 141 ------------------ .../me/lucko/spark/profiler/StackTraceNode.java | 71 --------- .../lucko/spark/profiler/TickedDataAggregator.java | 154 -------------------- .../spark/profiler/aggregator/DataAggregator.java | 34 +++++ .../profiler/aggregator/SimpleDataAggregator.java | 58 ++++++++ .../profiler/aggregator/TickedDataAggregator.java | 159 +++++++++++++++++++++ .../me/lucko/spark/profiler/node/AbstractNode.java | 113 +++++++++++++++ .../lucko/spark/profiler/node/StackTraceNode.java | 78 ++++++++++ .../me/lucko/spark/profiler/node/ThreadNode.java | 24 ++++ 12 files changed, 473 insertions(+), 455 deletions(-) delete mode 100644 spark-common/src/main/java/me/lucko/spark/profiler/DataAggregator.java delete mode 100644 spark-common/src/main/java/me/lucko/spark/profiler/SimpleDataAggregator.java delete mode 100644 spark-common/src/main/java/me/lucko/spark/profiler/StackNode.java delete mode 100644 spark-common/src/main/java/me/lucko/spark/profiler/StackTraceNode.java delete mode 100644 spark-common/src/main/java/me/lucko/spark/profiler/TickedDataAggregator.java create mode 100644 spark-common/src/main/java/me/lucko/spark/profiler/aggregator/DataAggregator.java create mode 100644 spark-common/src/main/java/me/lucko/spark/profiler/aggregator/SimpleDataAggregator.java create mode 100644 spark-common/src/main/java/me/lucko/spark/profiler/aggregator/TickedDataAggregator.java create mode 100644 spark-common/src/main/java/me/lucko/spark/profiler/node/AbstractNode.java create mode 100644 spark-common/src/main/java/me/lucko/spark/profiler/node/StackTraceNode.java create mode 100644 spark-common/src/main/java/me/lucko/spark/profiler/node/ThreadNode.java (limited to 'spark-common/src') diff --git a/spark-common/src/main/java/me/lucko/spark/profiler/DataAggregator.java b/spark-common/src/main/java/me/lucko/spark/profiler/DataAggregator.java deleted file mode 100644 index 1afa52c..0000000 --- a/spark-common/src/main/java/me/lucko/spark/profiler/DataAggregator.java +++ /dev/null @@ -1,32 +0,0 @@ -package me.lucko.spark.profiler; - -import java.util.Map; - -/** - * Aggregates sampling data. - */ -public interface DataAggregator { - - /** - * Called before the sampler begins to insert data - */ - default void start() { - - } - - /** - * Forms the output data - * - * @return the output data - */ - Map getData(); - - /** - * Inserts sampling data into this aggregator - * - * @param threadName the name of the thread - * @param stack the call stack - */ - void insertData(String threadName, StackTraceElement[] stack); - -} diff --git a/spark-common/src/main/java/me/lucko/spark/profiler/Sampler.java b/spark-common/src/main/java/me/lucko/spark/profiler/Sampler.java index 14e82c8..10e7a22 100644 --- a/spark-common/src/main/java/me/lucko/spark/profiler/Sampler.java +++ b/spark-common/src/main/java/me/lucko/spark/profiler/Sampler.java @@ -21,6 +21,11 @@ package me.lucko.spark.profiler; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.gson.stream.JsonWriter; +import me.lucko.spark.profiler.aggregator.DataAggregator; +import me.lucko.spark.profiler.aggregator.SimpleDataAggregator; +import me.lucko.spark.profiler.aggregator.TickedDataAggregator; +import me.lucko.spark.profiler.node.ThreadNode; + import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.OutputStreamWriter; @@ -144,10 +149,10 @@ public class Sampler implements Runnable { writer.name("threads").beginArray(); - List> data = new ArrayList<>(this.dataAggregator.getData().entrySet()); + List> data = new ArrayList<>(this.dataAggregator.getData().entrySet()); data.sort(Map.Entry.comparingByKey()); - for (Map.Entry entry : data) { + for (Map.Entry entry : data) { writer.beginObject(); writer.name("threadName").value(entry.getKey()); writer.name("totalTime").value(entry.getValue().getTotalTime()); diff --git a/spark-common/src/main/java/me/lucko/spark/profiler/SimpleDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/profiler/SimpleDataAggregator.java deleted file mode 100644 index f4138af..0000000 --- a/spark-common/src/main/java/me/lucko/spark/profiler/SimpleDataAggregator.java +++ /dev/null @@ -1,55 +0,0 @@ -package me.lucko.spark.profiler; - -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; - -/** - * Implementation of {@link DataAggregator} that makes use of a "worker" thread pool for inserting - * data. - */ -public class SimpleDataAggregator implements DataAggregator { - - /** A map of root stack nodes for each thread with sampling data */ - private final Map threadData = new ConcurrentHashMap<>(); - - /** The worker pool used for sampling */ - private final ExecutorService workerPool; - - /** The instance used to group threads together */ - private final ThreadGrouper threadGrouper; - - /** The interval to wait between sampling, in milliseconds */ - private final int interval; - - public SimpleDataAggregator(ExecutorService workerPool, ThreadGrouper threadGrouper, int interval) { - this.workerPool = workerPool; - this.threadGrouper = threadGrouper; - this.interval = interval; - } - - @Override - public void insertData(String threadName, StackTraceElement[] stack) { - try { - String group = this.threadGrouper.getGroup(threadName); - StackNode node = this.threadData.computeIfAbsent(group, StackNode::new); - node.log(stack, this.interval); - } catch (Exception e) { - e.printStackTrace(); - } - } - - @Override - public Map getData() { - // wait for all pending data to be inserted - this.workerPool.shutdown(); - try { - this.workerPool.awaitTermination(15, TimeUnit.SECONDS); - } catch (InterruptedException e) { - e.printStackTrace(); - } - - return this.threadData; - } -} diff --git a/spark-common/src/main/java/me/lucko/spark/profiler/StackNode.java b/spark-common/src/main/java/me/lucko/spark/profiler/StackNode.java deleted file mode 100644 index 575400a..0000000 --- a/spark-common/src/main/java/me/lucko/spark/profiler/StackNode.java +++ /dev/null @@ -1,141 +0,0 @@ -/* - * WarmRoast - * Copyright (C) 2013 Albert Pham - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . -*/ - -package me.lucko.spark.profiler; - -import com.google.gson.stream.JsonWriter; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.LongAdder; - -/** - * Represents a node in the overall sampling stack. - * - *

The base implementation of this class is only used for the root of node structures. The - * {@link StackTraceNode} class is used for representing method calls in the structure.

- */ -public class StackNode implements Comparable { - - private static final int MAX_STACK_DEPTH = 300; - - /** - * The name of this node - */ - private final String name; - - /** - * A map of this nodes children - */ - private final Map children = new ConcurrentHashMap<>(); - - /** - * The accumulated sample time for this node - */ - private final LongAdder totalTime = new LongAdder(); - - public StackNode(String name) { - this.name = name; - } - - public String getName() { - return this.name; - } - - public Collection getChildren() { - if (this.children.isEmpty()) { - return Collections.emptyList(); - } - - List list = new ArrayList<>(this.children.values()); - list.sort(null); - return list; - } - - private StackNode resolveChild(String name) { - return this.children.computeIfAbsent(name, StackNode::new); - } - - private StackNode resolveChild(String className, String methodName) { - return this.children.computeIfAbsent(StackTraceNode.formName(className, methodName), name -> new StackTraceNode(className, methodName)); - } - - public long getTotalTime() { - return this.totalTime.longValue(); - } - - public void accumulateTime(long time) { - this.totalTime.add(time); - } - - private void log(StackTraceElement[] elements, int skip, long time) { - accumulateTime(time); - - if (skip >= MAX_STACK_DEPTH) { - return; - } - - if (elements.length - skip == 0) { - return; - } - - StackTraceElement bottom = elements[elements.length - (skip + 1)]; - resolveChild(bottom.getClassName(), bottom.getMethodName()).log(elements, skip + 1, time); - } - - public void log(StackTraceElement[] elements, long time) { - log(elements, 0, time); - } - - @Override - public int compareTo(StackNode o) { - return getName().compareTo(o.getName()); - } - - public void serializeTo(JsonWriter writer) throws IOException { - writer.beginObject(); - - // append metadata about this node - appendMetadata(writer); - - // include the total time recorded for this node - writer.name("totalTime").value(getTotalTime()); - - // append child nodes, if any are present - Collection childNodes = getChildren(); - if (!childNodes.isEmpty()) { - writer.name("children").beginArray(); - for (StackNode child : childNodes) { - child.serializeTo(writer); - } - writer.endArray(); - } - - writer.endObject(); - } - - protected void appendMetadata(JsonWriter writer) throws IOException { - writer.name("name").value(getName()); - } - -} diff --git a/spark-common/src/main/java/me/lucko/spark/profiler/StackTraceNode.java b/spark-common/src/main/java/me/lucko/spark/profiler/StackTraceNode.java deleted file mode 100644 index d46a547..0000000 --- a/spark-common/src/main/java/me/lucko/spark/profiler/StackTraceNode.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * WarmRoast - * Copyright (C) 2013 Albert Pham - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . -*/ - -package me.lucko.spark.profiler; - -import com.google.gson.stream.JsonWriter; - -import java.io.IOException; - -/** - * Represents a {@link StackNode node} for a method call. - */ -public class StackTraceNode extends StackNode { - - /** - * Forms the {@link StackNode#getName()} for a {@link StackTraceNode}. - * - * @param className the name of the class - * @param methodName the name of the method - * @return the name - */ - static String formName(String className, String methodName) { - return className + "." + methodName + "()"; - } - - /** The name of the class */ - private final String className; - /** The name of the method */ - private final String methodName; - - public StackTraceNode(String className, String methodName) { - super(formName(className, methodName)); - this.className = className; - this.methodName = methodName; - } - - public String getClassName() { - return this.className; - } - - public String getMethodName() { - return this.methodName; - } - - @Override - protected void appendMetadata(JsonWriter writer) throws IOException { - writer.name("className").value(this.className); - writer.name("methodName").value(this.methodName); - } - - @Override - public int compareTo(StackNode that) { - return Long.compare(that.getTotalTime(), this.getTotalTime()); - } - -} diff --git a/spark-common/src/main/java/me/lucko/spark/profiler/TickedDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/profiler/TickedDataAggregator.java deleted file mode 100644 index 1d23d37..0000000 --- a/spark-common/src/main/java/me/lucko/spark/profiler/TickedDataAggregator.java +++ /dev/null @@ -1,154 +0,0 @@ -package me.lucko.spark.profiler; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; - -/** - * Implementation of {@link DataAggregator} which supports only including sampling data from "ticks" - * which exceed a certain threshold in duration. - */ -public class TickedDataAggregator implements DataAggregator { - - /** A map of root stack nodes for each thread with sampling data */ - private final Map threadData = new ConcurrentHashMap<>(); - - /** The worker pool for inserting stack nodes */ - private final ExecutorService workerPool; - - /** Used to monitor the current "tick" of the server */ - private final TickCounter tickCounter; - - /** The instance used to group threads together */ - private final ThreadGrouper threadGrouper; - - /** The interval to wait between sampling, in milliseconds */ - private final int interval; - - /** Tick durations under this threshold will not be inserted */ - private final int tickLengthThreshold; - - /** The expected number of samples in each tick */ - private final int expectedSize; - - private final Object mutex = new Object(); - - // state - private long currentTick = -1; - private TickList currentData = new TickList(0); - - public TickedDataAggregator(ExecutorService workerPool, TickCounter tickCounter, ThreadGrouper threadGrouper, int interval, int tickLengthThreshold) { - this.workerPool = workerPool; - this.tickCounter = tickCounter; - this.threadGrouper = threadGrouper; - this.interval = interval; - this.tickLengthThreshold = tickLengthThreshold; - // 50 millis in a tick, plus 10 so we have a bit of room to go over - this.expectedSize = (50 / interval) + 10; - } - - @Override - public void insertData(String threadName, StackTraceElement[] stack) { - synchronized (this.mutex) { - long tick = this.tickCounter.getCurrentTick(); - if (this.currentTick != tick) { - pushCurrentTick(); - this.currentTick = tick; - this.currentData = new TickList(this.expectedSize); - } - - // form the queued data - QueuedThreadInfo queuedData = new QueuedThreadInfo(threadName, stack); - // insert it - this.currentData.addData(queuedData); - } - } - - // guarded by 'mutex' - private void pushCurrentTick() { - TickList currentData = this.currentData; - - // approximate how long the tick lasted - int tickLengthMillis = currentData.getList().size() * this.interval; - - // don't push data below the threshold - if (tickLengthMillis < this.tickLengthThreshold) { - return; - } - - this.workerPool.submit(currentData); - } - - @Override - public void start() { - this.tickCounter.start(); - } - - @Override - public Map getData() { - // push the current tick - synchronized (this.mutex) { - pushCurrentTick(); - } - - // close the tick counter - this.tickCounter.close(); - - // wait for all pending data to be inserted - this.workerPool.shutdown(); - try { - this.workerPool.awaitTermination(15, TimeUnit.SECONDS); - } catch (InterruptedException e) { - e.printStackTrace(); - } - - return this.threadData; - } - - // called by TickList - void insertData(List dataList) { - for (QueuedThreadInfo data : dataList) { - try { - String group = this.threadGrouper.getGroup(data.threadName); - StackNode node = this.threadData.computeIfAbsent(group, StackNode::new); - node.log(data.stack, this.interval); - } catch (Exception e) { - e.printStackTrace(); - } - } - } - - private final class TickList implements Runnable { - private final List list; - - TickList(int expectedSize) { - this.list = new ArrayList<>(expectedSize); - } - - @Override - public void run() { - insertData(this.list); - } - - public List getList() { - return this.list; - } - - public void addData(QueuedThreadInfo data) { - this.list.add(data); - } - } - - private static final class QueuedThreadInfo { - private final String threadName; - private final StackTraceElement[] stack; - - QueuedThreadInfo(String threadName, StackTraceElement[] stack) { - this.threadName = threadName; - this.stack = stack; - } - } -} diff --git a/spark-common/src/main/java/me/lucko/spark/profiler/aggregator/DataAggregator.java b/spark-common/src/main/java/me/lucko/spark/profiler/aggregator/DataAggregator.java new file mode 100644 index 0000000..ab4ede0 --- /dev/null +++ b/spark-common/src/main/java/me/lucko/spark/profiler/aggregator/DataAggregator.java @@ -0,0 +1,34 @@ +package me.lucko.spark.profiler.aggregator; + +import me.lucko.spark.profiler.node.ThreadNode; + +import java.util.Map; + +/** + * Aggregates sampling data. + */ +public interface DataAggregator { + + /** + * Called before the sampler begins to insert data + */ + default void start() { + + } + + /** + * Forms the output data + * + * @return the output data + */ + Map getData(); + + /** + * Inserts sampling data into this aggregator + * + * @param threadName the name of the thread + * @param stack the call stack + */ + void insertData(String threadName, StackTraceElement[] stack); + +} diff --git a/spark-common/src/main/java/me/lucko/spark/profiler/aggregator/SimpleDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/profiler/aggregator/SimpleDataAggregator.java new file mode 100644 index 0000000..f46924e --- /dev/null +++ b/spark-common/src/main/java/me/lucko/spark/profiler/aggregator/SimpleDataAggregator.java @@ -0,0 +1,58 @@ +package me.lucko.spark.profiler.aggregator; + +import me.lucko.spark.profiler.ThreadGrouper; +import me.lucko.spark.profiler.node.AbstractNode; +import me.lucko.spark.profiler.node.ThreadNode; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * Basic implementation of {@link DataAggregator}. + */ +public class SimpleDataAggregator implements DataAggregator { + + /** A map of root stack nodes for each thread with sampling data */ + private final Map threadData = new ConcurrentHashMap<>(); + + /** The worker pool used for sampling */ + private final ExecutorService workerPool; + + /** The instance used to group threads together */ + private final ThreadGrouper threadGrouper; + + /** The interval to wait between sampling, in milliseconds */ + private final int interval; + + public SimpleDataAggregator(ExecutorService workerPool, ThreadGrouper threadGrouper, int interval) { + this.workerPool = workerPool; + this.threadGrouper = threadGrouper; + this.interval = interval; + } + + @Override + public void insertData(String threadName, StackTraceElement[] stack) { + try { + String group = this.threadGrouper.getGroup(threadName); + AbstractNode node = this.threadData.computeIfAbsent(group, ThreadNode::new); + node.log(stack, this.interval); + } catch (Exception e) { + e.printStackTrace(); + } + } + + @Override + public Map getData() { + // wait for all pending data to be inserted + this.workerPool.shutdown(); + try { + this.workerPool.awaitTermination(15, TimeUnit.SECONDS); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + return this.threadData; + } +} diff --git a/spark-common/src/main/java/me/lucko/spark/profiler/aggregator/TickedDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/profiler/aggregator/TickedDataAggregator.java new file mode 100644 index 0000000..a66cf91 --- /dev/null +++ b/spark-common/src/main/java/me/lucko/spark/profiler/aggregator/TickedDataAggregator.java @@ -0,0 +1,159 @@ +package me.lucko.spark.profiler.aggregator; + +import me.lucko.spark.profiler.ThreadGrouper; +import me.lucko.spark.profiler.TickCounter; +import me.lucko.spark.profiler.node.AbstractNode; +import me.lucko.spark.profiler.node.ThreadNode; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * Implementation of {@link DataAggregator} which supports only including sampling data from "ticks" + * which exceed a certain threshold in duration. + */ +public class TickedDataAggregator implements DataAggregator { + + /** A map of root stack nodes for each thread with sampling data */ + private final Map threadData = new ConcurrentHashMap<>(); + + /** The worker pool for inserting stack nodes */ + private final ExecutorService workerPool; + + /** Used to monitor the current "tick" of the server */ + private final TickCounter tickCounter; + + /** The instance used to group threads together */ + private final ThreadGrouper threadGrouper; + + /** The interval to wait between sampling, in milliseconds */ + private final int interval; + + /** Tick durations under this threshold will not be inserted */ + private final int tickLengthThreshold; + + /** The expected number of samples in each tick */ + private final int expectedSize; + + private final Object mutex = new Object(); + + // state + private long currentTick = -1; + private TickList currentData = new TickList(0); + + public TickedDataAggregator(ExecutorService workerPool, TickCounter tickCounter, ThreadGrouper threadGrouper, int interval, int tickLengthThreshold) { + this.workerPool = workerPool; + this.tickCounter = tickCounter; + this.threadGrouper = threadGrouper; + this.interval = interval; + this.tickLengthThreshold = tickLengthThreshold; + // 50 millis in a tick, plus 10 so we have a bit of room to go over + this.expectedSize = (50 / interval) + 10; + } + + @Override + public void insertData(String threadName, StackTraceElement[] stack) { + synchronized (this.mutex) { + long tick = this.tickCounter.getCurrentTick(); + if (this.currentTick != tick) { + pushCurrentTick(); + this.currentTick = tick; + this.currentData = new TickList(this.expectedSize); + } + + // form the queued data + QueuedThreadInfo queuedData = new QueuedThreadInfo(threadName, stack); + // insert it + this.currentData.addData(queuedData); + } + } + + // guarded by 'mutex' + private void pushCurrentTick() { + TickList currentData = this.currentData; + + // approximate how long the tick lasted + int tickLengthMillis = currentData.getList().size() * this.interval; + + // don't push data below the threshold + if (tickLengthMillis < this.tickLengthThreshold) { + return; + } + + this.workerPool.submit(currentData); + } + + @Override + public void start() { + this.tickCounter.start(); + } + + @Override + public Map getData() { + // push the current tick + synchronized (this.mutex) { + pushCurrentTick(); + } + + // close the tick counter + this.tickCounter.close(); + + // wait for all pending data to be inserted + this.workerPool.shutdown(); + try { + this.workerPool.awaitTermination(15, TimeUnit.SECONDS); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + return this.threadData; + } + + // called by TickList + void insertData(List dataList) { + for (QueuedThreadInfo data : dataList) { + try { + String group = this.threadGrouper.getGroup(data.threadName); + AbstractNode node = this.threadData.computeIfAbsent(group, ThreadNode::new); + node.log(data.stack, this.interval); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + + private final class TickList implements Runnable { + private final List list; + + TickList(int expectedSize) { + this.list = new ArrayList<>(expectedSize); + } + + @Override + public void run() { + insertData(this.list); + } + + public List getList() { + return this.list; + } + + public void addData(QueuedThreadInfo data) { + this.list.add(data); + } + } + + private static final class QueuedThreadInfo { + private final String threadName; + private final StackTraceElement[] stack; + + QueuedThreadInfo(String threadName, StackTraceElement[] stack) { + this.threadName = threadName; + this.stack = stack; + } + } +} diff --git a/spark-common/src/main/java/me/lucko/spark/profiler/node/AbstractNode.java b/spark-common/src/main/java/me/lucko/spark/profiler/node/AbstractNode.java new file mode 100644 index 0000000..04425c6 --- /dev/null +++ b/spark-common/src/main/java/me/lucko/spark/profiler/node/AbstractNode.java @@ -0,0 +1,113 @@ +/* + * WarmRoast + * Copyright (C) 2013 Albert Pham + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . +*/ + +package me.lucko.spark.profiler.node; + +import com.google.gson.stream.JsonWriter; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.LongAdder; + +/** + * Encapsulates a timed node in the sampling stack. + */ +public abstract class AbstractNode { + + private static final int MAX_STACK_DEPTH = 300; + + /** + * A map of this nodes children + */ + private final Map children = new ConcurrentHashMap<>(); + + /** + * The accumulated sample time for this node + */ + private final LongAdder totalTime = new LongAdder(); + + public long getTotalTime() { + return this.totalTime.longValue(); + } + + private AbstractNode resolveChild(String className, String methodName, int lineNumber) { + return this.children.computeIfAbsent( + StackTraceNode.generateKey(className, methodName, lineNumber), + name -> new StackTraceNode(className, methodName, lineNumber) + ); + } + + public void log(StackTraceElement[] elements, long time) { + log(elements, 0, time); + } + + private void log(StackTraceElement[] elements, int skip, long time) { + this.totalTime.add(time); + + if (skip >= MAX_STACK_DEPTH) { + return; + } + + if (elements.length - skip == 0) { + return; + } + + StackTraceElement bottom = elements[elements.length - (skip + 1)]; + resolveChild(bottom.getClassName(), bottom.getMethodName(), Math.max(0, bottom.getLineNumber())).log(elements, skip + 1, time); + } + + private Collection getChildren() { + if (this.children.isEmpty()) { + return Collections.emptyList(); + } + + List list = new ArrayList<>(this.children.values()); + list.sort(null); + return list; + } + + public void serializeTo(JsonWriter writer) throws IOException { + writer.beginObject(); + + // append metadata about this node + appendMetadata(writer); + + // include the total time recorded for this node + writer.name("totalTime").value(getTotalTime()); + + // append child nodes, if any are present + Collection childNodes = getChildren(); + if (!childNodes.isEmpty()) { + writer.name("children").beginArray(); + for (AbstractNode child : childNodes) { + child.serializeTo(writer); + } + writer.endArray(); + } + + writer.endObject(); + } + + protected abstract void appendMetadata(JsonWriter writer) throws IOException; + +} diff --git a/spark-common/src/main/java/me/lucko/spark/profiler/node/StackTraceNode.java b/spark-common/src/main/java/me/lucko/spark/profiler/node/StackTraceNode.java new file mode 100644 index 0000000..706d2e0 --- /dev/null +++ b/spark-common/src/main/java/me/lucko/spark/profiler/node/StackTraceNode.java @@ -0,0 +1,78 @@ +/* + * WarmRoast + * Copyright (C) 2013 Albert Pham + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . +*/ + +package me.lucko.spark.profiler.node; + +import com.google.gson.stream.JsonWriter; + +import java.io.IOException; + +/** + * Represents a stack trace element within the {@link AbstractNode node} structure. + */ +public final class StackTraceNode extends AbstractNode implements Comparable { + + /** + * Forms a key to represent the given node. + * + * @param className the name of the class + * @param methodName the name of the method + * @param lineNumber the line number + * @return the key + */ + static String generateKey(String className, String methodName, int lineNumber) { + return className + "." + methodName + "#" + lineNumber; + } + + /** The name of the class */ + private final String className; + /** The name of the method */ + private final String methodName; + /** The line number of the call */ + private final int lineNumber; + + public StackTraceNode(String className, String methodName, int lineNumber) { + this.className = className; + this.methodName = methodName; + this.lineNumber = lineNumber; + } + + @Override + protected void appendMetadata(JsonWriter writer) throws IOException { + writer.name("className").value(this.className); + writer.name("methodName").value(this.methodName); + if (this.lineNumber != 0) { + writer.name("lineNumber").value(this.lineNumber); + } + } + + private String key() { + return generateKey(this.className, this.methodName, this.lineNumber); + } + + @Override + public int compareTo(StackTraceNode that) { + int i = -Long.compare(this.getTotalTime(), that.getTotalTime()); + if (i != 0) { + return i; + } + + return this.key().compareTo(that.key()); + } + +} diff --git a/spark-common/src/main/java/me/lucko/spark/profiler/node/ThreadNode.java b/spark-common/src/main/java/me/lucko/spark/profiler/node/ThreadNode.java new file mode 100644 index 0000000..10ea67f --- /dev/null +++ b/spark-common/src/main/java/me/lucko/spark/profiler/node/ThreadNode.java @@ -0,0 +1,24 @@ +package me.lucko.spark.profiler.node; + +import com.google.gson.stream.JsonWriter; + +import java.io.IOException; + +/** + * The root of a sampling stack for a given thread / thread group. + */ +public final class ThreadNode extends AbstractNode { + + /** + * The name of this thread + */ + private final String threadName; + + public ThreadNode(String threadName) { + this.threadName = threadName; + } + + protected void appendMetadata(JsonWriter writer) throws IOException { + writer.name("name").value(this.threadName); + } +} -- cgit