diff options
Diffstat (limited to 'spark-common/src/main/java/me/lucko/spark/common/sampler')
9 files changed, 99 insertions, 100 deletions
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java index 935329d..f0ed533 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java @@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit; /** * Builds {@link Sampler} instances. */ +@SuppressWarnings("UnusedReturnValue") public class SamplerBuilder { private double samplingInterval = 4; // milliseconds @@ -91,17 +92,15 @@ public class SamplerBuilder { } public Sampler start() { - Sampler sampler; - int intervalMicros = (int) (this.samplingInterval * 1000d); - if (this.ticksOver == -1 || this.tickHook == null) { - if (this.useAsyncProfiler && !(this.threadDumper instanceof ThreadDumper.Regex) && AsyncProfilerAccess.INSTANCE.isSupported()) { - sampler = new AsyncSampler(intervalMicros, this.threadDumper, this.threadGrouper, this.timeout); - } else { - sampler = new JavaSampler(intervalMicros, this.threadDumper, this.threadGrouper, this.timeout, this.ignoreSleeping, this.ignoreNative); - } - } else { + + Sampler sampler; + if (this.ticksOver != -1 && this.tickHook != null) { sampler = new JavaSampler(intervalMicros, this.threadDumper, this.threadGrouper, this.timeout, this.ignoreSleeping, this.ignoreNative, this.tickHook, this.ticksOver); + } else if (this.useAsyncProfiler && !(this.threadDumper instanceof ThreadDumper.Regex) && AsyncProfilerAccess.INSTANCE.isSupported()) { + sampler = new AsyncSampler(intervalMicros, this.threadDumper, this.threadGrouper, this.timeout); + } else { + sampler = new JavaSampler(intervalMicros, this.threadDumper, this.threadGrouper, this.timeout, this.ignoreSleeping, this.ignoreNative); } sampler.start(); diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncDataAggregator.java index cb3bd6f..594d56e 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncDataAggregator.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncDataAggregator.java @@ -22,6 +22,7 @@ package me.lucko.spark.common.sampler.async; import me.lucko.spark.common.sampler.ThreadGrouper; import me.lucko.spark.common.sampler.aggregator.AbstractDataAggregator; +import me.lucko.spark.common.sampler.node.StackTraceNode; import me.lucko.spark.common.sampler.node.ThreadNode; import me.lucko.spark.proto.SparkProtos.SamplerMetadata; @@ -29,6 +30,11 @@ import me.lucko.spark.proto.SparkProtos.SamplerMetadata; * Data aggregator for {@link AsyncSampler}. */ public class AsyncDataAggregator extends AbstractDataAggregator { + + /** A describer for async-profiler stack trace elements. */ + private static final StackTraceNode.Describer<AsyncStackTraceElement> STACK_TRACE_DESCRIBER = (element, parent) -> + new StackTraceNode.Description(element.getClassName(), element.getMethodName(), element.getMethodDescription()); + protected AsyncDataAggregator(ThreadGrouper threadGrouper) { super(threadGrouper); } @@ -44,7 +50,7 @@ public class AsyncDataAggregator extends AbstractDataAggregator { public void insertData(ProfileSegment element) { try { ThreadNode node = getNode(this.threadGrouper.getGroup(element.getNativeThreadId(), element.getThreadName())); - node.log(element.getStackTrace(), element.getTime()); + node.log(STACK_TRACE_DESCRIBER, element.getStackTrace(), element.getTime()); } catch (Exception e) { e.printStackTrace(); } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerAccess.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerAccess.java index cf9572a..2ed1aa4 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerAccess.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerAccess.java @@ -20,6 +20,8 @@ package me.lucko.spark.common.sampler.async; +import me.lucko.spark.common.util.TemporaryFiles; + import one.profiler.AsyncProfiler; import java.io.InputStream; @@ -58,8 +60,7 @@ public final class AsyncProfilerAccess { throw new IllegalStateException("Could not find libasyncProfiler.so in spark jar file"); } - Path extractPath = Files.createTempFile("spark-", "-libasyncProfiler.so.tmp"); - extractPath.toFile().deleteOnExit(); + Path extractPath = TemporaryFiles.create("spark-", "-libasyncProfiler.so.tmp"); try (InputStream in = profilerResource.openStream()) { Files.copy(in, extractPath, StandardCopyOption.REPLACE_EXISTING); diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java index ed09e46..4f3b3e4 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java @@ -31,6 +31,7 @@ import me.lucko.spark.common.sampler.async.jfr.JfrReader; import me.lucko.spark.common.sampler.node.MergeMode; import me.lucko.spark.common.sampler.node.ThreadNode; import me.lucko.spark.common.util.ClassSourceLookup; +import me.lucko.spark.common.util.TemporaryFiles; import me.lucko.spark.proto.SparkProtos; import one.profiler.AsyncProfiler; @@ -94,8 +95,7 @@ public class AsyncSampler extends AbstractSampler { this.startTime = System.currentTimeMillis(); try { - this.outputFile = Files.createTempFile("spark-profile-", ".jfr.tmp"); - this.outputFile.toFile().deleteOnExit(); + this.outputFile = TemporaryFiles.create("spark-profile-", ".jfr.tmp"); } catch (IOException e) { throw new RuntimeException("Unable to create temporary output file", e); } @@ -272,7 +272,7 @@ public class AsyncSampler extends AbstractSampler { if (className == null || className.length == 0) { // native call result = new AsyncStackTraceElement( - "native", + AsyncStackTraceElement.NATIVE_CALL, new String(methodName, StandardCharsets.UTF_8), null ); diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncStackTraceElement.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncStackTraceElement.java index cf66ded..1eb3432 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncStackTraceElement.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncStackTraceElement.java @@ -25,6 +25,9 @@ package me.lucko.spark.common.sampler.async; */ public class AsyncStackTraceElement { + /** The class name used for native method calls */ + public static final String NATIVE_CALL = "native"; + /** The name of the class */ private final String className; /** The name of the method */ diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/Dictionary.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/Dictionary.java index 55f78ba..38d1b00 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/Dictionary.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/jfr/Dictionary.java @@ -36,17 +36,21 @@ public class Dictionary<T> { throw new IllegalArgumentException("Zero key not allowed"); } - if (++size * 2 > keys.length) { - resize(keys.length * 2); - } - int mask = keys.length - 1; int i = hashCode(key) & mask; - while (keys[i] != 0 && keys[i] != key) { + while (keys[i] != 0) { + if (keys[i] == key) { + values[i] = value; + return; + } i = (i + 1) & mask; } keys[i] = key; values[i] = value; + + if (++size * 2 > keys.length) { + resize(keys.length * 2); + } } @SuppressWarnings("unchecked") @@ -69,9 +73,8 @@ public class Dictionary<T> { } public int preallocate(int count) { - int newSize = size + count; - if (newSize * 2 > keys.length) { - resize(Integer.highestOneBit(newSize * 4 - 1)); + if (count * 2 > keys.length) { + resize(Integer.highestOneBit(count * 4 - 1)); } return count; } @@ -98,6 +101,7 @@ public class Dictionary<T> { } private static int hashCode(long key) { + key *= 0xc6a4a7935bd1e995L; return (int) (key ^ (key >>> 32)); } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaDataAggregator.java index a81a5c1..54d9e1c 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaDataAggregator.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaDataAggregator.java @@ -23,6 +23,7 @@ package me.lucko.spark.common.sampler.java; import me.lucko.spark.common.sampler.ThreadGrouper; import me.lucko.spark.common.sampler.aggregator.AbstractDataAggregator; import me.lucko.spark.common.sampler.aggregator.DataAggregator; +import me.lucko.spark.common.sampler.node.StackTraceNode; import me.lucko.spark.common.sampler.node.ThreadNode; import java.lang.management.ThreadInfo; @@ -35,6 +36,12 @@ import java.util.concurrent.TimeUnit; */ public abstract class JavaDataAggregator extends AbstractDataAggregator { + /** A describer for java.lang.StackTraceElement */ + private static final StackTraceNode.Describer<StackTraceElement> STACK_TRACE_DESCRIBER = (element, parent) -> { + int parentLineNumber = parent == null ? StackTraceNode.NULL_LINE_NUMBER : parent.getLineNumber(); + return new StackTraceNode.Description(element.getClassName(), element.getMethodName(), element.getLineNumber(), parentLineNumber); + }; + /** The worker pool for inserting stack nodes */ protected final ExecutorService workerPool; @@ -72,7 +79,7 @@ public abstract class JavaDataAggregator extends AbstractDataAggregator { try { ThreadNode node = getNode(this.threadGrouper.getGroup(threadInfo.getThreadId(), threadInfo.getThreadName())); - node.log(threadInfo.getStackTrace(), this.interval); + node.log(STACK_TRACE_DESCRIBER, threadInfo.getStackTrace(), this.interval); } catch (Exception e) { e.printStackTrace(); } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/node/AbstractNode.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/node/AbstractNode.java index 73f7bd7..18f67ba 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/node/AbstractNode.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/node/AbstractNode.java @@ -21,8 +21,6 @@ package me.lucko.spark.common.sampler.node; -import me.lucko.spark.common.sampler.async.AsyncStackTraceElement; - import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -38,18 +36,14 @@ public abstract class AbstractNode { private static final int MAX_STACK_DEPTH = 300; - /** - * A map of this nodes children - */ + /** A map of the nodes children */ private final Map<StackTraceNode.Description, StackTraceNode> children = new ConcurrentHashMap<>(); - /** - * The accumulated sample time for this node, measured in microseconds - */ + /** The accumulated sample time for this node, measured in microseconds */ private final LongAdder totalTime = new LongAdder(); /** - * Returns the total sample time for this node in milliseconds. + * Gets the total sample time logged for this node in milliseconds. * * @return the total time */ @@ -62,87 +56,51 @@ public abstract class AbstractNode { } /** - * Merge {@code other} into {@code this}. + * Logs the given stack trace against this node and its children. * - * @param other the other node + * @param describer the function that describes the elements of the stack + * @param stack the stack + * @param time the total time to log + * @param <T> the stack trace element type */ - public void merge(AbstractNode other) { - this.totalTime.add(other.totalTime.longValue()); - for (Map.Entry<StackTraceNode.Description, StackTraceNode> child : other.children.entrySet()) { - resolveChild(child.getKey()).merge(child.getValue()); - } - } - - private AbstractNode resolveChild(StackTraceNode.Description description) { - StackTraceNode result = this.children.get(description); // fast path - if (result != null) { - return result; - } - return this.children.computeIfAbsent(description, name -> new StackTraceNode(description)); - } - - public void log(StackTraceElement[] elements, long time) { - log(elements, 0, time); - } - - private void log(StackTraceElement[] elements, int offset, long time) { - this.totalTime.add(time); - - if (offset >= MAX_STACK_DEPTH) { - return; - } - - if (elements.length - offset == 0) { + public <T> void log(StackTraceNode.Describer<T> describer, T[] stack, long time) { + if (stack.length == 0) { return; } - // the first element in the array is the top of the call stack, and the last is the root - // offset starts at 0. - - // pointer is determined by subtracting the offset from the index of the last element - int pointer = (elements.length - 1) - offset; - StackTraceElement element = elements[pointer]; + this.totalTime.add(time); - // the parent stack element is located at pointer+1. - // when the current offset is 0, we know the current pointer is at the last element in the - // array (the root) and therefore there is no parent. - StackTraceElement parent = offset == 0 ? null : elements[pointer + 1]; + AbstractNode node = this; + T previousElement = null; - // get the line number of the parent element - the line which called "us" - int parentLineNumber = parent == null ? StackTraceNode.NULL_LINE_NUMBER : parent.getLineNumber(); + for (int offset = 0; offset < Math.min(MAX_STACK_DEPTH, stack.length); offset++) { + T element = stack[(stack.length - 1) - offset]; - // resolve a child element within the structure for the element at pointer - AbstractNode child = resolveChild(new StackTraceNode.Description(element.getClassName(), element.getMethodName(), element.getLineNumber(), parentLineNumber)); - // call the log method on the found child, with an incremented offset. - child.log(elements, offset + 1, time); - } + node = node.resolveChild(describer.describe(element, previousElement)); + node.totalTime.add(time); - public void log(AsyncStackTraceElement[] elements, long time) { - log(elements, 0, time); + previousElement = element; + } } - private void log(AsyncStackTraceElement[] elements, int offset, long time) { - this.totalTime.add(time); - - if (offset >= MAX_STACK_DEPTH) { - return; + private StackTraceNode resolveChild(StackTraceNode.Description description) { + StackTraceNode result = this.children.get(description); // fast path + if (result != null) { + return result; } + return this.children.computeIfAbsent(description, StackTraceNode::new); + } - if (elements.length - offset == 0) { - return; + /** + * Merge {@code other} into {@code this}. + * + * @param other the other node + */ + protected void merge(AbstractNode other) { + this.totalTime.add(other.totalTime.longValue()); + for (Map.Entry<StackTraceNode.Description, StackTraceNode> child : other.children.entrySet()) { + resolveChild(child.getKey()).merge(child.getValue()); } - - // the first element in the array is the top of the call stack, and the last is the root - // offset starts at 0. - - // pointer is determined by subtracting the offset from the index of the last element - int pointer = (elements.length - 1) - offset; - AsyncStackTraceElement element = elements[pointer]; - - // resolve a child element within the structure for the element at pointer - AbstractNode child = resolveChild(new StackTraceNode.Description(element.getClassName(), element.getMethodName(), element.getMethodDescription())); - // call the log method on the found child, with an incremented offset. - child.log(elements, offset + 1, time); } protected List<StackTraceNode> exportChildren(MergeMode mergeMode) { diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/node/StackTraceNode.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/node/StackTraceNode.java index efc7f81..f935fb2 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/node/StackTraceNode.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/node/StackTraceNode.java @@ -24,6 +24,8 @@ package me.lucko.spark.common.sampler.node; import me.lucko.spark.common.util.MethodDisambiguator; import me.lucko.spark.proto.SparkProtos; +import org.checkerframework.checker.nullness.qual.Nullable; + import java.util.Objects; /** @@ -107,6 +109,25 @@ public final class StackTraceNode extends AbstractNode implements Comparable<Sta } /** + * Function to construct a {@link StackTraceNode.Description} from a stack trace element + * of type {@code T}. + * + * @param <T> the stack trace element type, e.g. {@link java.lang.StackTraceElement} + */ + @FunctionalInterface + public interface Describer<T> { + + /** + * Create a description for the given element. + * + * @param element the element + * @param parent the parent element + * @return the description + */ + Description describe(T element, @Nullable T parent); + } + + /** * Encapsulates the attributes of a {@link StackTraceNode}. */ public static final class Description implements Comparable<Description> { |