aboutsummaryrefslogtreecommitdiff
path: root/spark-common/src/main/java/me/lucko/spark/common/sampler
diff options
context:
space:
mode:
Diffstat (limited to 'spark-common/src/main/java/me/lucko/spark/common/sampler')
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java17
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncDataAggregator.java8
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerAccess.java5
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java6
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncStackTraceElement.java3
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/Dictionary.java20
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaDataAggregator.java9
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/node/AbstractNode.java110
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/node/StackTraceNode.java21
9 files changed, 99 insertions, 100 deletions
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java
index 935329d..f0ed533 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java
@@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit;
/**
* Builds {@link Sampler} instances.
*/
+@SuppressWarnings("UnusedReturnValue")
public class SamplerBuilder {
private double samplingInterval = 4; // milliseconds
@@ -91,17 +92,15 @@ public class SamplerBuilder {
}
public Sampler start() {
- Sampler sampler;
-
int intervalMicros = (int) (this.samplingInterval * 1000d);
- if (this.ticksOver == -1 || this.tickHook == null) {
- if (this.useAsyncProfiler && !(this.threadDumper instanceof ThreadDumper.Regex) && AsyncProfilerAccess.INSTANCE.isSupported()) {
- sampler = new AsyncSampler(intervalMicros, this.threadDumper, this.threadGrouper, this.timeout);
- } else {
- sampler = new JavaSampler(intervalMicros, this.threadDumper, this.threadGrouper, this.timeout, this.ignoreSleeping, this.ignoreNative);
- }
- } else {
+
+ Sampler sampler;
+ if (this.ticksOver != -1 && this.tickHook != null) {
sampler = new JavaSampler(intervalMicros, this.threadDumper, this.threadGrouper, this.timeout, this.ignoreSleeping, this.ignoreNative, this.tickHook, this.ticksOver);
+ } else if (this.useAsyncProfiler && !(this.threadDumper instanceof ThreadDumper.Regex) && AsyncProfilerAccess.INSTANCE.isSupported()) {
+ sampler = new AsyncSampler(intervalMicros, this.threadDumper, this.threadGrouper, this.timeout);
+ } else {
+ sampler = new JavaSampler(intervalMicros, this.threadDumper, this.threadGrouper, this.timeout, this.ignoreSleeping, this.ignoreNative);
}
sampler.start();
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncDataAggregator.java
index cb3bd6f..594d56e 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncDataAggregator.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncDataAggregator.java
@@ -22,6 +22,7 @@ package me.lucko.spark.common.sampler.async;
import me.lucko.spark.common.sampler.ThreadGrouper;
import me.lucko.spark.common.sampler.aggregator.AbstractDataAggregator;
+import me.lucko.spark.common.sampler.node.StackTraceNode;
import me.lucko.spark.common.sampler.node.ThreadNode;
import me.lucko.spark.proto.SparkProtos.SamplerMetadata;
@@ -29,6 +30,11 @@ import me.lucko.spark.proto.SparkProtos.SamplerMetadata;
* Data aggregator for {@link AsyncSampler}.
*/
public class AsyncDataAggregator extends AbstractDataAggregator {
+
+ /** A describer for async-profiler stack trace elements. */
+ private static final StackTraceNode.Describer<AsyncStackTraceElement> STACK_TRACE_DESCRIBER = (element, parent) ->
+ new StackTraceNode.Description(element.getClassName(), element.getMethodName(), element.getMethodDescription());
+
protected AsyncDataAggregator(ThreadGrouper threadGrouper) {
super(threadGrouper);
}
@@ -44,7 +50,7 @@ public class AsyncDataAggregator extends AbstractDataAggregator {
public void insertData(ProfileSegment element) {
try {
ThreadNode node = getNode(this.threadGrouper.getGroup(element.getNativeThreadId(), element.getThreadName()));
- node.log(element.getStackTrace(), element.getTime());
+ node.log(STACK_TRACE_DESCRIBER, element.getStackTrace(), element.getTime());
} catch (Exception e) {
e.printStackTrace();
}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerAccess.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerAccess.java
index cf9572a..2ed1aa4 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerAccess.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerAccess.java
@@ -20,6 +20,8 @@
package me.lucko.spark.common.sampler.async;
+import me.lucko.spark.common.util.TemporaryFiles;
+
import one.profiler.AsyncProfiler;
import java.io.InputStream;
@@ -58,8 +60,7 @@ public final class AsyncProfilerAccess {
throw new IllegalStateException("Could not find libasyncProfiler.so in spark jar file");
}
- Path extractPath = Files.createTempFile("spark-", "-libasyncProfiler.so.tmp");
- extractPath.toFile().deleteOnExit();
+ Path extractPath = TemporaryFiles.create("spark-", "-libasyncProfiler.so.tmp");
try (InputStream in = profilerResource.openStream()) {
Files.copy(in, extractPath, StandardCopyOption.REPLACE_EXISTING);
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java
index ed09e46..4f3b3e4 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java
@@ -31,6 +31,7 @@ import me.lucko.spark.common.sampler.async.jfr.JfrReader;
import me.lucko.spark.common.sampler.node.MergeMode;
import me.lucko.spark.common.sampler.node.ThreadNode;
import me.lucko.spark.common.util.ClassSourceLookup;
+import me.lucko.spark.common.util.TemporaryFiles;
import me.lucko.spark.proto.SparkProtos;
import one.profiler.AsyncProfiler;
@@ -94,8 +95,7 @@ public class AsyncSampler extends AbstractSampler {
this.startTime = System.currentTimeMillis();
try {
- this.outputFile = Files.createTempFile("spark-profile-", ".jfr.tmp");
- this.outputFile.toFile().deleteOnExit();
+ this.outputFile = TemporaryFiles.create("spark-profile-", ".jfr.tmp");
} catch (IOException e) {
throw new RuntimeException("Unable to create temporary output file", e);
}
@@ -272,7 +272,7 @@ public class AsyncSampler extends AbstractSampler {
if (className == null || className.length == 0) {
// native call
result = new AsyncStackTraceElement(
- "native",
+ AsyncStackTraceElement.NATIVE_CALL,
new String(methodName, StandardCharsets.UTF_8),
null
);
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncStackTraceElement.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncStackTraceElement.java
index cf66ded..1eb3432 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncStackTraceElement.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncStackTraceElement.java
@@ -25,6 +25,9 @@ package me.lucko.spark.common.sampler.async;
*/
public class AsyncStackTraceElement {
+ /** The class name used for native method calls */
+ public static final String NATIVE_CALL = "native";
+
/** The name of the class */
private final String className;
/** The name of the method */
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/Dictionary.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/Dictionary.java
index 55f78ba..38d1b00 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/Dictionary.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/Dictionary.java
@@ -36,17 +36,21 @@ public class Dictionary<T> {
throw new IllegalArgumentException("Zero key not allowed");
}
- if (++size * 2 > keys.length) {
- resize(keys.length * 2);
- }
-
int mask = keys.length - 1;
int i = hashCode(key) & mask;
- while (keys[i] != 0 && keys[i] != key) {
+ while (keys[i] != 0) {
+ if (keys[i] == key) {
+ values[i] = value;
+ return;
+ }
i = (i + 1) & mask;
}
keys[i] = key;
values[i] = value;
+
+ if (++size * 2 > keys.length) {
+ resize(keys.length * 2);
+ }
}
@SuppressWarnings("unchecked")
@@ -69,9 +73,8 @@ public class Dictionary<T> {
}
public int preallocate(int count) {
- int newSize = size + count;
- if (newSize * 2 > keys.length) {
- resize(Integer.highestOneBit(newSize * 4 - 1));
+ if (count * 2 > keys.length) {
+ resize(Integer.highestOneBit(count * 4 - 1));
}
return count;
}
@@ -98,6 +101,7 @@ public class Dictionary<T> {
}
private static int hashCode(long key) {
+ key *= 0xc6a4a7935bd1e995L;
return (int) (key ^ (key >>> 32));
}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaDataAggregator.java
index a81a5c1..54d9e1c 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaDataAggregator.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaDataAggregator.java
@@ -23,6 +23,7 @@ package me.lucko.spark.common.sampler.java;
import me.lucko.spark.common.sampler.ThreadGrouper;
import me.lucko.spark.common.sampler.aggregator.AbstractDataAggregator;
import me.lucko.spark.common.sampler.aggregator.DataAggregator;
+import me.lucko.spark.common.sampler.node.StackTraceNode;
import me.lucko.spark.common.sampler.node.ThreadNode;
import java.lang.management.ThreadInfo;
@@ -35,6 +36,12 @@ import java.util.concurrent.TimeUnit;
*/
public abstract class JavaDataAggregator extends AbstractDataAggregator {
+ /** A describer for java.lang.StackTraceElement */
+ private static final StackTraceNode.Describer<StackTraceElement> STACK_TRACE_DESCRIBER = (element, parent) -> {
+ int parentLineNumber = parent == null ? StackTraceNode.NULL_LINE_NUMBER : parent.getLineNumber();
+ return new StackTraceNode.Description(element.getClassName(), element.getMethodName(), element.getLineNumber(), parentLineNumber);
+ };
+
/** The worker pool for inserting stack nodes */
protected final ExecutorService workerPool;
@@ -72,7 +79,7 @@ public abstract class JavaDataAggregator extends AbstractDataAggregator {
try {
ThreadNode node = getNode(this.threadGrouper.getGroup(threadInfo.getThreadId(), threadInfo.getThreadName()));
- node.log(threadInfo.getStackTrace(), this.interval);
+ node.log(STACK_TRACE_DESCRIBER, threadInfo.getStackTrace(), this.interval);
} catch (Exception e) {
e.printStackTrace();
}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/node/AbstractNode.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/node/AbstractNode.java
index 73f7bd7..18f67ba 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/node/AbstractNode.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/node/AbstractNode.java
@@ -21,8 +21,6 @@
package me.lucko.spark.common.sampler.node;
-import me.lucko.spark.common.sampler.async.AsyncStackTraceElement;
-
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -38,18 +36,14 @@ public abstract class AbstractNode {
private static final int MAX_STACK_DEPTH = 300;
- /**
- * A map of this nodes children
- */
+ /** A map of the nodes children */
private final Map<StackTraceNode.Description, StackTraceNode> children = new ConcurrentHashMap<>();
- /**
- * The accumulated sample time for this node, measured in microseconds
- */
+ /** The accumulated sample time for this node, measured in microseconds */
private final LongAdder totalTime = new LongAdder();
/**
- * Returns the total sample time for this node in milliseconds.
+ * Gets the total sample time logged for this node in milliseconds.
*
* @return the total time
*/
@@ -62,87 +56,51 @@ public abstract class AbstractNode {
}
/**
- * Merge {@code other} into {@code this}.
+ * Logs the given stack trace against this node and its children.
*
- * @param other the other node
+ * @param describer the function that describes the elements of the stack
+ * @param stack the stack
+ * @param time the total time to log
+ * @param <T> the stack trace element type
*/
- public void merge(AbstractNode other) {
- this.totalTime.add(other.totalTime.longValue());
- for (Map.Entry<StackTraceNode.Description, StackTraceNode> child : other.children.entrySet()) {
- resolveChild(child.getKey()).merge(child.getValue());
- }
- }
-
- private AbstractNode resolveChild(StackTraceNode.Description description) {
- StackTraceNode result = this.children.get(description); // fast path
- if (result != null) {
- return result;
- }
- return this.children.computeIfAbsent(description, name -> new StackTraceNode(description));
- }
-
- public void log(StackTraceElement[] elements, long time) {
- log(elements, 0, time);
- }
-
- private void log(StackTraceElement[] elements, int offset, long time) {
- this.totalTime.add(time);
-
- if (offset >= MAX_STACK_DEPTH) {
- return;
- }
-
- if (elements.length - offset == 0) {
+ public <T> void log(StackTraceNode.Describer<T> describer, T[] stack, long time) {
+ if (stack.length == 0) {
return;
}
- // the first element in the array is the top of the call stack, and the last is the root
- // offset starts at 0.
-
- // pointer is determined by subtracting the offset from the index of the last element
- int pointer = (elements.length - 1) - offset;
- StackTraceElement element = elements[pointer];
+ this.totalTime.add(time);
- // the parent stack element is located at pointer+1.
- // when the current offset is 0, we know the current pointer is at the last element in the
- // array (the root) and therefore there is no parent.
- StackTraceElement parent = offset == 0 ? null : elements[pointer + 1];
+ AbstractNode node = this;
+ T previousElement = null;
- // get the line number of the parent element - the line which called "us"
- int parentLineNumber = parent == null ? StackTraceNode.NULL_LINE_NUMBER : parent.getLineNumber();
+ for (int offset = 0; offset < Math.min(MAX_STACK_DEPTH, stack.length); offset++) {
+ T element = stack[(stack.length - 1) - offset];
- // resolve a child element within the structure for the element at pointer
- AbstractNode child = resolveChild(new StackTraceNode.Description(element.getClassName(), element.getMethodName(), element.getLineNumber(), parentLineNumber));
- // call the log method on the found child, with an incremented offset.
- child.log(elements, offset + 1, time);
- }
+ node = node.resolveChild(describer.describe(element, previousElement));
+ node.totalTime.add(time);
- public void log(AsyncStackTraceElement[] elements, long time) {
- log(elements, 0, time);
+ previousElement = element;
+ }
}
- private void log(AsyncStackTraceElement[] elements, int offset, long time) {
- this.totalTime.add(time);
-
- if (offset >= MAX_STACK_DEPTH) {
- return;
+ private StackTraceNode resolveChild(StackTraceNode.Description description) {
+ StackTraceNode result = this.children.get(description); // fast path
+ if (result != null) {
+ return result;
}
+ return this.children.computeIfAbsent(description, StackTraceNode::new);
+ }
- if (elements.length - offset == 0) {
- return;
+ /**
+ * Merge {@code other} into {@code this}.
+ *
+ * @param other the other node
+ */
+ protected void merge(AbstractNode other) {
+ this.totalTime.add(other.totalTime.longValue());
+ for (Map.Entry<StackTraceNode.Description, StackTraceNode> child : other.children.entrySet()) {
+ resolveChild(child.getKey()).merge(child.getValue());
}
-
- // the first element in the array is the top of the call stack, and the last is the root
- // offset starts at 0.
-
- // pointer is determined by subtracting the offset from the index of the last element
- int pointer = (elements.length - 1) - offset;
- AsyncStackTraceElement element = elements[pointer];
-
- // resolve a child element within the structure for the element at pointer
- AbstractNode child = resolveChild(new StackTraceNode.Description(element.getClassName(), element.getMethodName(), element.getMethodDescription()));
- // call the log method on the found child, with an incremented offset.
- child.log(elements, offset + 1, time);
}
protected List<StackTraceNode> exportChildren(MergeMode mergeMode) {
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/node/StackTraceNode.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/node/StackTraceNode.java
index efc7f81..f935fb2 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/node/StackTraceNode.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/node/StackTraceNode.java
@@ -24,6 +24,8 @@ package me.lucko.spark.common.sampler.node;
import me.lucko.spark.common.util.MethodDisambiguator;
import me.lucko.spark.proto.SparkProtos;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
import java.util.Objects;
/**
@@ -107,6 +109,25 @@ public final class StackTraceNode extends AbstractNode implements Comparable<Sta
}
/**
+ * Function to construct a {@link StackTraceNode.Description} from a stack trace element
+ * of type {@code T}.
+ *
+ * @param <T> the stack trace element type, e.g. {@link java.lang.StackTraceElement}
+ */
+ @FunctionalInterface
+ public interface Describer<T> {
+
+ /**
+ * Create a description for the given element.
+ *
+ * @param element the element
+ * @param parent the parent element
+ * @return the description
+ */
+ Description describe(T element, @Nullable T parent);
+ }
+
+ /**
* Encapsulates the attributes of a {@link StackTraceNode}.
*/
public static final class Description implements Comparable<Description> {