aboutsummaryrefslogtreecommitdiff
path: root/spark-common/src/main/java/me/lucko/spark
diff options
context:
space:
mode:
Diffstat (limited to 'spark-common/src/main/java/me/lucko/spark')
-rw-r--r--spark-common/src/main/java/me/lucko/spark/profiler/Sampler.java9
-rw-r--r--spark-common/src/main/java/me/lucko/spark/profiler/aggregator/DataAggregator.java (renamed from spark-common/src/main/java/me/lucko/spark/profiler/DataAggregator.java)6
-rw-r--r--spark-common/src/main/java/me/lucko/spark/profiler/aggregator/SimpleDataAggregator.java (renamed from spark-common/src/main/java/me/lucko/spark/profiler/SimpleDataAggregator.java)15
-rw-r--r--spark-common/src/main/java/me/lucko/spark/profiler/aggregator/TickedDataAggregator.java (renamed from spark-common/src/main/java/me/lucko/spark/profiler/TickedDataAggregator.java)13
-rw-r--r--spark-common/src/main/java/me/lucko/spark/profiler/node/AbstractNode.java (renamed from spark-common/src/main/java/me/lucko/spark/profiler/StackNode.java)80
-rw-r--r--spark-common/src/main/java/me/lucko/spark/profiler/node/StackTraceNode.java (renamed from spark-common/src/main/java/me/lucko/spark/profiler/StackTraceNode.java)45
-rw-r--r--spark-common/src/main/java/me/lucko/spark/profiler/node/ThreadNode.java24
7 files changed, 105 insertions, 87 deletions
diff --git a/spark-common/src/main/java/me/lucko/spark/profiler/Sampler.java b/spark-common/src/main/java/me/lucko/spark/profiler/Sampler.java
index 14e82c8..10e7a22 100644
--- a/spark-common/src/main/java/me/lucko/spark/profiler/Sampler.java
+++ b/spark-common/src/main/java/me/lucko/spark/profiler/Sampler.java
@@ -21,6 +21,11 @@ package me.lucko.spark.profiler;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.gson.stream.JsonWriter;
+import me.lucko.spark.profiler.aggregator.DataAggregator;
+import me.lucko.spark.profiler.aggregator.SimpleDataAggregator;
+import me.lucko.spark.profiler.aggregator.TickedDataAggregator;
+import me.lucko.spark.profiler.node.ThreadNode;
+
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
@@ -144,10 +149,10 @@ public class Sampler implements Runnable {
writer.name("threads").beginArray();
- List<Map.Entry<String, StackNode>> data = new ArrayList<>(this.dataAggregator.getData().entrySet());
+ List<Map.Entry<String, ThreadNode>> data = new ArrayList<>(this.dataAggregator.getData().entrySet());
data.sort(Map.Entry.comparingByKey());
- for (Map.Entry<String, StackNode> entry : data) {
+ for (Map.Entry<String, ThreadNode> entry : data) {
writer.beginObject();
writer.name("threadName").value(entry.getKey());
writer.name("totalTime").value(entry.getValue().getTotalTime());
diff --git a/spark-common/src/main/java/me/lucko/spark/profiler/DataAggregator.java b/spark-common/src/main/java/me/lucko/spark/profiler/aggregator/DataAggregator.java
index 1afa52c..ab4ede0 100644
--- a/spark-common/src/main/java/me/lucko/spark/profiler/DataAggregator.java
+++ b/spark-common/src/main/java/me/lucko/spark/profiler/aggregator/DataAggregator.java
@@ -1,4 +1,6 @@
-package me.lucko.spark.profiler;
+package me.lucko.spark.profiler.aggregator;
+
+import me.lucko.spark.profiler.node.ThreadNode;
import java.util.Map;
@@ -19,7 +21,7 @@ public interface DataAggregator {
*
* @return the output data
*/
- Map<String, StackNode> getData();
+ Map<String, ThreadNode> getData();
/**
* Inserts sampling data into this aggregator
diff --git a/spark-common/src/main/java/me/lucko/spark/profiler/SimpleDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/profiler/aggregator/SimpleDataAggregator.java
index f4138af..f46924e 100644
--- a/spark-common/src/main/java/me/lucko/spark/profiler/SimpleDataAggregator.java
+++ b/spark-common/src/main/java/me/lucko/spark/profiler/aggregator/SimpleDataAggregator.java
@@ -1,4 +1,8 @@
-package me.lucko.spark.profiler;
+package me.lucko.spark.profiler.aggregator;
+
+import me.lucko.spark.profiler.ThreadGrouper;
+import me.lucko.spark.profiler.node.AbstractNode;
+import me.lucko.spark.profiler.node.ThreadNode;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -6,13 +10,12 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
/**
- * Implementation of {@link DataAggregator} that makes use of a "worker" thread pool for inserting
- * data.
+ * Basic implementation of {@link DataAggregator}.
*/
public class SimpleDataAggregator implements DataAggregator {
/** A map of root stack nodes for each thread with sampling data */
- private final Map<String, StackNode> threadData = new ConcurrentHashMap<>();
+ private final Map<String, ThreadNode> threadData = new ConcurrentHashMap<>();
/** The worker pool used for sampling */
private final ExecutorService workerPool;
@@ -33,7 +36,7 @@ public class SimpleDataAggregator implements DataAggregator {
public void insertData(String threadName, StackTraceElement[] stack) {
try {
String group = this.threadGrouper.getGroup(threadName);
- StackNode node = this.threadData.computeIfAbsent(group, StackNode::new);
+ AbstractNode node = this.threadData.computeIfAbsent(group, ThreadNode::new);
node.log(stack, this.interval);
} catch (Exception e) {
e.printStackTrace();
@@ -41,7 +44,7 @@ public class SimpleDataAggregator implements DataAggregator {
}
@Override
- public Map<String, StackNode> getData() {
+ public Map<String, ThreadNode> getData() {
// wait for all pending data to be inserted
this.workerPool.shutdown();
try {
diff --git a/spark-common/src/main/java/me/lucko/spark/profiler/TickedDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/profiler/aggregator/TickedDataAggregator.java
index 1d23d37..a66cf91 100644
--- a/spark-common/src/main/java/me/lucko/spark/profiler/TickedDataAggregator.java
+++ b/spark-common/src/main/java/me/lucko/spark/profiler/aggregator/TickedDataAggregator.java
@@ -1,4 +1,9 @@
-package me.lucko.spark.profiler;
+package me.lucko.spark.profiler.aggregator;
+
+import me.lucko.spark.profiler.ThreadGrouper;
+import me.lucko.spark.profiler.TickCounter;
+import me.lucko.spark.profiler.node.AbstractNode;
+import me.lucko.spark.profiler.node.ThreadNode;
import java.util.ArrayList;
import java.util.List;
@@ -14,7 +19,7 @@ import java.util.concurrent.TimeUnit;
public class TickedDataAggregator implements DataAggregator {
/** A map of root stack nodes for each thread with sampling data */
- private final Map<String, StackNode> threadData = new ConcurrentHashMap<>();
+ private final Map<String, ThreadNode> threadData = new ConcurrentHashMap<>();
/** The worker pool for inserting stack nodes */
private final ExecutorService workerPool;
@@ -88,7 +93,7 @@ public class TickedDataAggregator implements DataAggregator {
}
@Override
- public Map<String, StackNode> getData() {
+ public Map<String, ThreadNode> getData() {
// push the current tick
synchronized (this.mutex) {
pushCurrentTick();
@@ -113,7 +118,7 @@ public class TickedDataAggregator implements DataAggregator {
for (QueuedThreadInfo data : dataList) {
try {
String group = this.threadGrouper.getGroup(data.threadName);
- StackNode node = this.threadData.computeIfAbsent(group, StackNode::new);
+ AbstractNode node = this.threadData.computeIfAbsent(group, ThreadNode::new);
node.log(data.stack, this.interval);
} catch (Exception e) {
e.printStackTrace();
diff --git a/spark-common/src/main/java/me/lucko/spark/profiler/StackNode.java b/spark-common/src/main/java/me/lucko/spark/profiler/node/AbstractNode.java
index 575400a..04425c6 100644
--- a/spark-common/src/main/java/me/lucko/spark/profiler/StackNode.java
+++ b/spark-common/src/main/java/me/lucko/spark/profiler/node/AbstractNode.java
@@ -16,7 +16,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-package me.lucko.spark.profiler;
+package me.lucko.spark.profiler.node;
import com.google.gson.stream.JsonWriter;
@@ -30,66 +30,39 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.LongAdder;
/**
- * Represents a node in the overall sampling stack.
- *
- * <p>The base implementation of this class is only used for the root of node structures. The
- * {@link StackTraceNode} class is used for representing method calls in the structure.</p>
+ * Encapsulates a timed node in the sampling stack.
*/
-public class StackNode implements Comparable<StackNode> {
+public abstract class AbstractNode {
private static final int MAX_STACK_DEPTH = 300;
/**
- * The name of this node
- */
- private final String name;
-
- /**
* A map of this nodes children
*/
- private final Map<String, StackNode> children = new ConcurrentHashMap<>();
+ private final Map<String, StackTraceNode> children = new ConcurrentHashMap<>();
/**
* The accumulated sample time for this node
*/
private final LongAdder totalTime = new LongAdder();
-
- public StackNode(String name) {
- this.name = name;
- }
-
- public String getName() {
- return this.name;
- }
-
- public Collection<StackNode> getChildren() {
- if (this.children.isEmpty()) {
- return Collections.emptyList();
- }
-
- List<StackNode> list = new ArrayList<>(this.children.values());
- list.sort(null);
- return list;
- }
-
- private StackNode resolveChild(String name) {
- return this.children.computeIfAbsent(name, StackNode::new);
- }
-
- private StackNode resolveChild(String className, String methodName) {
- return this.children.computeIfAbsent(StackTraceNode.formName(className, methodName), name -> new StackTraceNode(className, methodName));
- }
public long getTotalTime() {
return this.totalTime.longValue();
}
- public void accumulateTime(long time) {
- this.totalTime.add(time);
+ private AbstractNode resolveChild(String className, String methodName, int lineNumber) {
+ return this.children.computeIfAbsent(
+ StackTraceNode.generateKey(className, methodName, lineNumber),
+ name -> new StackTraceNode(className, methodName, lineNumber)
+ );
+ }
+
+ public void log(StackTraceElement[] elements, long time) {
+ log(elements, 0, time);
}
private void log(StackTraceElement[] elements, int skip, long time) {
- accumulateTime(time);
+ this.totalTime.add(time);
if (skip >= MAX_STACK_DEPTH) {
return;
@@ -100,16 +73,17 @@ public class StackNode implements Comparable<StackNode> {
}
StackTraceElement bottom = elements[elements.length - (skip + 1)];
- resolveChild(bottom.getClassName(), bottom.getMethodName()).log(elements, skip + 1, time);
- }
-
- public void log(StackTraceElement[] elements, long time) {
- log(elements, 0, time);
+ resolveChild(bottom.getClassName(), bottom.getMethodName(), Math.max(0, bottom.getLineNumber())).log(elements, skip + 1, time);
}
- @Override
- public int compareTo(StackNode o) {
- return getName().compareTo(o.getName());
+ private Collection<? extends AbstractNode> getChildren() {
+ if (this.children.isEmpty()) {
+ return Collections.emptyList();
+ }
+
+ List<StackTraceNode> list = new ArrayList<>(this.children.values());
+ list.sort(null);
+ return list;
}
public void serializeTo(JsonWriter writer) throws IOException {
@@ -122,10 +96,10 @@ public class StackNode implements Comparable<StackNode> {
writer.name("totalTime").value(getTotalTime());
// append child nodes, if any are present
- Collection<StackNode> childNodes = getChildren();
+ Collection<? extends AbstractNode> childNodes = getChildren();
if (!childNodes.isEmpty()) {
writer.name("children").beginArray();
- for (StackNode child : childNodes) {
+ for (AbstractNode child : childNodes) {
child.serializeTo(writer);
}
writer.endArray();
@@ -134,8 +108,6 @@ public class StackNode implements Comparable<StackNode> {
writer.endObject();
}
- protected void appendMetadata(JsonWriter writer) throws IOException {
- writer.name("name").value(getName());
- }
+ protected abstract void appendMetadata(JsonWriter writer) throws IOException;
}
diff --git a/spark-common/src/main/java/me/lucko/spark/profiler/StackTraceNode.java b/spark-common/src/main/java/me/lucko/spark/profiler/node/StackTraceNode.java
index d46a547..706d2e0 100644
--- a/spark-common/src/main/java/me/lucko/spark/profiler/StackTraceNode.java
+++ b/spark-common/src/main/java/me/lucko/spark/profiler/node/StackTraceNode.java
@@ -16,56 +16,63 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-package me.lucko.spark.profiler;
+package me.lucko.spark.profiler.node;
import com.google.gson.stream.JsonWriter;
import java.io.IOException;
/**
- * Represents a {@link StackNode node} for a method call.
+ * Represents a stack trace element within the {@link AbstractNode node} structure.
*/
-public class StackTraceNode extends StackNode {
+public final class StackTraceNode extends AbstractNode implements Comparable<StackTraceNode> {
/**
- * Forms the {@link StackNode#getName()} for a {@link StackTraceNode}.
+ * Forms a key to represent the given node.
*
* @param className the name of the class
* @param methodName the name of the method
- * @return the name
+ * @param lineNumber the line number
+ * @return the key
*/
- static String formName(String className, String methodName) {
- return className + "." + methodName + "()";
+ static String generateKey(String className, String methodName, int lineNumber) {
+ return className + "." + methodName + "#" + lineNumber;
}
/** The name of the class */
private final String className;
/** The name of the method */
private final String methodName;
+ /** The line number of the call */
+ private final int lineNumber;
- public StackTraceNode(String className, String methodName) {
- super(formName(className, methodName));
+ public StackTraceNode(String className, String methodName, int lineNumber) {
this.className = className;
this.methodName = methodName;
- }
-
- public String getClassName() {
- return this.className;
- }
-
- public String getMethodName() {
- return this.methodName;
+ this.lineNumber = lineNumber;
}
@Override
protected void appendMetadata(JsonWriter writer) throws IOException {
writer.name("className").value(this.className);
writer.name("methodName").value(this.methodName);
+ if (this.lineNumber != 0) {
+ writer.name("lineNumber").value(this.lineNumber);
+ }
+ }
+
+ private String key() {
+ return generateKey(this.className, this.methodName, this.lineNumber);
}
@Override
- public int compareTo(StackNode that) {
- return Long.compare(that.getTotalTime(), this.getTotalTime());
+ public int compareTo(StackTraceNode that) {
+ int i = -Long.compare(this.getTotalTime(), that.getTotalTime());
+ if (i != 0) {
+ return i;
+ }
+
+ return this.key().compareTo(that.key());
}
}
diff --git a/spark-common/src/main/java/me/lucko/spark/profiler/node/ThreadNode.java b/spark-common/src/main/java/me/lucko/spark/profiler/node/ThreadNode.java
new file mode 100644
index 0000000..10ea67f
--- /dev/null
+++ b/spark-common/src/main/java/me/lucko/spark/profiler/node/ThreadNode.java
@@ -0,0 +1,24 @@
+package me.lucko.spark.profiler.node;
+
+import com.google.gson.stream.JsonWriter;
+
+import java.io.IOException;
+
+/**
+ * The root of a sampling stack for a given thread / thread group.
+ */
+public final class ThreadNode extends AbstractNode {
+
+ /**
+ * The name of this thread
+ */
+ private final String threadName;
+
+ public ThreadNode(String threadName) {
+ this.threadName = threadName;
+ }
+
+ protected void appendMetadata(JsonWriter writer) throws IOException {
+ writer.name("name").value(this.threadName);
+ }
+}