diff options
Diffstat (limited to 'spark-common/src/main/java/me/lucko/spark')
7 files changed, 105 insertions, 87 deletions
diff --git a/spark-common/src/main/java/me/lucko/spark/profiler/Sampler.java b/spark-common/src/main/java/me/lucko/spark/profiler/Sampler.java index 14e82c8..10e7a22 100644 --- a/spark-common/src/main/java/me/lucko/spark/profiler/Sampler.java +++ b/spark-common/src/main/java/me/lucko/spark/profiler/Sampler.java @@ -21,6 +21,11 @@ package me.lucko.spark.profiler; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.gson.stream.JsonWriter; +import me.lucko.spark.profiler.aggregator.DataAggregator; +import me.lucko.spark.profiler.aggregator.SimpleDataAggregator; +import me.lucko.spark.profiler.aggregator.TickedDataAggregator; +import me.lucko.spark.profiler.node.ThreadNode; + import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.OutputStreamWriter; @@ -144,10 +149,10 @@ public class Sampler implements Runnable { writer.name("threads").beginArray(); - List<Map.Entry<String, StackNode>> data = new ArrayList<>(this.dataAggregator.getData().entrySet()); + List<Map.Entry<String, ThreadNode>> data = new ArrayList<>(this.dataAggregator.getData().entrySet()); data.sort(Map.Entry.comparingByKey()); - for (Map.Entry<String, StackNode> entry : data) { + for (Map.Entry<String, ThreadNode> entry : data) { writer.beginObject(); writer.name("threadName").value(entry.getKey()); writer.name("totalTime").value(entry.getValue().getTotalTime()); diff --git a/spark-common/src/main/java/me/lucko/spark/profiler/DataAggregator.java b/spark-common/src/main/java/me/lucko/spark/profiler/aggregator/DataAggregator.java index 1afa52c..ab4ede0 100644 --- a/spark-common/src/main/java/me/lucko/spark/profiler/DataAggregator.java +++ b/spark-common/src/main/java/me/lucko/spark/profiler/aggregator/DataAggregator.java @@ -1,4 +1,6 @@ -package me.lucko.spark.profiler; +package me.lucko.spark.profiler.aggregator; + +import me.lucko.spark.profiler.node.ThreadNode; import java.util.Map; @@ -19,7 +21,7 @@ public interface DataAggregator { * * @return the output data */ - Map<String, StackNode> getData(); + Map<String, ThreadNode> getData(); /** * Inserts sampling data into this aggregator diff --git a/spark-common/src/main/java/me/lucko/spark/profiler/SimpleDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/profiler/aggregator/SimpleDataAggregator.java index f4138af..f46924e 100644 --- a/spark-common/src/main/java/me/lucko/spark/profiler/SimpleDataAggregator.java +++ b/spark-common/src/main/java/me/lucko/spark/profiler/aggregator/SimpleDataAggregator.java @@ -1,4 +1,8 @@ -package me.lucko.spark.profiler; +package me.lucko.spark.profiler.aggregator; + +import me.lucko.spark.profiler.ThreadGrouper; +import me.lucko.spark.profiler.node.AbstractNode; +import me.lucko.spark.profiler.node.ThreadNode; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -6,13 +10,12 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; /** - * Implementation of {@link DataAggregator} that makes use of a "worker" thread pool for inserting - * data. + * Basic implementation of {@link DataAggregator}. */ public class SimpleDataAggregator implements DataAggregator { /** A map of root stack nodes for each thread with sampling data */ - private final Map<String, StackNode> threadData = new ConcurrentHashMap<>(); + private final Map<String, ThreadNode> threadData = new ConcurrentHashMap<>(); /** The worker pool used for sampling */ private final ExecutorService workerPool; @@ -33,7 +36,7 @@ public class SimpleDataAggregator implements DataAggregator { public void insertData(String threadName, StackTraceElement[] stack) { try { String group = this.threadGrouper.getGroup(threadName); - StackNode node = this.threadData.computeIfAbsent(group, StackNode::new); + AbstractNode node = this.threadData.computeIfAbsent(group, ThreadNode::new); node.log(stack, this.interval); } catch (Exception e) { e.printStackTrace(); @@ -41,7 +44,7 @@ public class SimpleDataAggregator implements DataAggregator { } @Override - public Map<String, StackNode> getData() { + public Map<String, ThreadNode> getData() { // wait for all pending data to be inserted this.workerPool.shutdown(); try { diff --git a/spark-common/src/main/java/me/lucko/spark/profiler/TickedDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/profiler/aggregator/TickedDataAggregator.java index 1d23d37..a66cf91 100644 --- a/spark-common/src/main/java/me/lucko/spark/profiler/TickedDataAggregator.java +++ b/spark-common/src/main/java/me/lucko/spark/profiler/aggregator/TickedDataAggregator.java @@ -1,4 +1,9 @@ -package me.lucko.spark.profiler; +package me.lucko.spark.profiler.aggregator; + +import me.lucko.spark.profiler.ThreadGrouper; +import me.lucko.spark.profiler.TickCounter; +import me.lucko.spark.profiler.node.AbstractNode; +import me.lucko.spark.profiler.node.ThreadNode; import java.util.ArrayList; import java.util.List; @@ -14,7 +19,7 @@ import java.util.concurrent.TimeUnit; public class TickedDataAggregator implements DataAggregator { /** A map of root stack nodes for each thread with sampling data */ - private final Map<String, StackNode> threadData = new ConcurrentHashMap<>(); + private final Map<String, ThreadNode> threadData = new ConcurrentHashMap<>(); /** The worker pool for inserting stack nodes */ private final ExecutorService workerPool; @@ -88,7 +93,7 @@ public class TickedDataAggregator implements DataAggregator { } @Override - public Map<String, StackNode> getData() { + public Map<String, ThreadNode> getData() { // push the current tick synchronized (this.mutex) { pushCurrentTick(); @@ -113,7 +118,7 @@ public class TickedDataAggregator implements DataAggregator { for (QueuedThreadInfo data : dataList) { try { String group = this.threadGrouper.getGroup(data.threadName); - StackNode node = this.threadData.computeIfAbsent(group, StackNode::new); + AbstractNode node = this.threadData.computeIfAbsent(group, ThreadNode::new); node.log(data.stack, this.interval); } catch (Exception e) { e.printStackTrace(); diff --git a/spark-common/src/main/java/me/lucko/spark/profiler/StackNode.java b/spark-common/src/main/java/me/lucko/spark/profiler/node/AbstractNode.java index 575400a..04425c6 100644 --- a/spark-common/src/main/java/me/lucko/spark/profiler/StackNode.java +++ b/spark-common/src/main/java/me/lucko/spark/profiler/node/AbstractNode.java @@ -16,7 +16,7 @@ * along with this program. If not, see <http://www.gnu.org/licenses/>. */ -package me.lucko.spark.profiler; +package me.lucko.spark.profiler.node; import com.google.gson.stream.JsonWriter; @@ -30,66 +30,39 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.LongAdder; /** - * Represents a node in the overall sampling stack. - * - * <p>The base implementation of this class is only used for the root of node structures. The - * {@link StackTraceNode} class is used for representing method calls in the structure.</p> + * Encapsulates a timed node in the sampling stack. */ -public class StackNode implements Comparable<StackNode> { +public abstract class AbstractNode { private static final int MAX_STACK_DEPTH = 300; /** - * The name of this node - */ - private final String name; - - /** * A map of this nodes children */ - private final Map<String, StackNode> children = new ConcurrentHashMap<>(); + private final Map<String, StackTraceNode> children = new ConcurrentHashMap<>(); /** * The accumulated sample time for this node */ private final LongAdder totalTime = new LongAdder(); - - public StackNode(String name) { - this.name = name; - } - - public String getName() { - return this.name; - } - - public Collection<StackNode> getChildren() { - if (this.children.isEmpty()) { - return Collections.emptyList(); - } - - List<StackNode> list = new ArrayList<>(this.children.values()); - list.sort(null); - return list; - } - - private StackNode resolveChild(String name) { - return this.children.computeIfAbsent(name, StackNode::new); - } - - private StackNode resolveChild(String className, String methodName) { - return this.children.computeIfAbsent(StackTraceNode.formName(className, methodName), name -> new StackTraceNode(className, methodName)); - } public long getTotalTime() { return this.totalTime.longValue(); } - public void accumulateTime(long time) { - this.totalTime.add(time); + private AbstractNode resolveChild(String className, String methodName, int lineNumber) { + return this.children.computeIfAbsent( + StackTraceNode.generateKey(className, methodName, lineNumber), + name -> new StackTraceNode(className, methodName, lineNumber) + ); + } + + public void log(StackTraceElement[] elements, long time) { + log(elements, 0, time); } private void log(StackTraceElement[] elements, int skip, long time) { - accumulateTime(time); + this.totalTime.add(time); if (skip >= MAX_STACK_DEPTH) { return; @@ -100,16 +73,17 @@ public class StackNode implements Comparable<StackNode> { } StackTraceElement bottom = elements[elements.length - (skip + 1)]; - resolveChild(bottom.getClassName(), bottom.getMethodName()).log(elements, skip + 1, time); - } - - public void log(StackTraceElement[] elements, long time) { - log(elements, 0, time); + resolveChild(bottom.getClassName(), bottom.getMethodName(), Math.max(0, bottom.getLineNumber())).log(elements, skip + 1, time); } - @Override - public int compareTo(StackNode o) { - return getName().compareTo(o.getName()); + private Collection<? extends AbstractNode> getChildren() { + if (this.children.isEmpty()) { + return Collections.emptyList(); + } + + List<StackTraceNode> list = new ArrayList<>(this.children.values()); + list.sort(null); + return list; } public void serializeTo(JsonWriter writer) throws IOException { @@ -122,10 +96,10 @@ public class StackNode implements Comparable<StackNode> { writer.name("totalTime").value(getTotalTime()); // append child nodes, if any are present - Collection<StackNode> childNodes = getChildren(); + Collection<? extends AbstractNode> childNodes = getChildren(); if (!childNodes.isEmpty()) { writer.name("children").beginArray(); - for (StackNode child : childNodes) { + for (AbstractNode child : childNodes) { child.serializeTo(writer); } writer.endArray(); @@ -134,8 +108,6 @@ public class StackNode implements Comparable<StackNode> { writer.endObject(); } - protected void appendMetadata(JsonWriter writer) throws IOException { - writer.name("name").value(getName()); - } + protected abstract void appendMetadata(JsonWriter writer) throws IOException; } diff --git a/spark-common/src/main/java/me/lucko/spark/profiler/StackTraceNode.java b/spark-common/src/main/java/me/lucko/spark/profiler/node/StackTraceNode.java index d46a547..706d2e0 100644 --- a/spark-common/src/main/java/me/lucko/spark/profiler/StackTraceNode.java +++ b/spark-common/src/main/java/me/lucko/spark/profiler/node/StackTraceNode.java @@ -16,56 +16,63 @@ * along with this program. If not, see <http://www.gnu.org/licenses/>. */ -package me.lucko.spark.profiler; +package me.lucko.spark.profiler.node; import com.google.gson.stream.JsonWriter; import java.io.IOException; /** - * Represents a {@link StackNode node} for a method call. + * Represents a stack trace element within the {@link AbstractNode node} structure. */ -public class StackTraceNode extends StackNode { +public final class StackTraceNode extends AbstractNode implements Comparable<StackTraceNode> { /** - * Forms the {@link StackNode#getName()} for a {@link StackTraceNode}. + * Forms a key to represent the given node. * * @param className the name of the class * @param methodName the name of the method - * @return the name + * @param lineNumber the line number + * @return the key */ - static String formName(String className, String methodName) { - return className + "." + methodName + "()"; + static String generateKey(String className, String methodName, int lineNumber) { + return className + "." + methodName + "#" + lineNumber; } /** The name of the class */ private final String className; /** The name of the method */ private final String methodName; + /** The line number of the call */ + private final int lineNumber; - public StackTraceNode(String className, String methodName) { - super(formName(className, methodName)); + public StackTraceNode(String className, String methodName, int lineNumber) { this.className = className; this.methodName = methodName; - } - - public String getClassName() { - return this.className; - } - - public String getMethodName() { - return this.methodName; + this.lineNumber = lineNumber; } @Override protected void appendMetadata(JsonWriter writer) throws IOException { writer.name("className").value(this.className); writer.name("methodName").value(this.methodName); + if (this.lineNumber != 0) { + writer.name("lineNumber").value(this.lineNumber); + } + } + + private String key() { + return generateKey(this.className, this.methodName, this.lineNumber); } @Override - public int compareTo(StackNode that) { - return Long.compare(that.getTotalTime(), this.getTotalTime()); + public int compareTo(StackTraceNode that) { + int i = -Long.compare(this.getTotalTime(), that.getTotalTime()); + if (i != 0) { + return i; + } + + return this.key().compareTo(that.key()); } } diff --git a/spark-common/src/main/java/me/lucko/spark/profiler/node/ThreadNode.java b/spark-common/src/main/java/me/lucko/spark/profiler/node/ThreadNode.java new file mode 100644 index 0000000..10ea67f --- /dev/null +++ b/spark-common/src/main/java/me/lucko/spark/profiler/node/ThreadNode.java @@ -0,0 +1,24 @@ +package me.lucko.spark.profiler.node; + +import com.google.gson.stream.JsonWriter; + +import java.io.IOException; + +/** + * The root of a sampling stack for a given thread / thread group. + */ +public final class ThreadNode extends AbstractNode { + + /** + * The name of this thread + */ + private final String threadName; + + public ThreadNode(String threadName) { + this.threadName = threadName; + } + + protected void appendMetadata(JsonWriter writer) throws IOException { + writer.name("name").value(this.threadName); + } +} |