diff options
author | Luck <git@lucko.me> | 2022-11-13 19:15:54 +0000 |
---|---|---|
committer | Luck <git@lucko.me> | 2022-11-13 19:15:54 +0000 |
commit | 76f43ab59d3839600bd9e040ff2d09199ebe778a (patch) | |
tree | e6e6605200b2eb6149c78a602fdbd9373e4494c7 /spark-common/src/main/java/me/lucko/spark/common/sampler | |
parent | 5af2e6fb4cbd21f836c7ad56100b3c4535a831de (diff) | |
download | spark-76f43ab59d3839600bd9e040ff2d09199ebe778a.tar.gz spark-76f43ab59d3839600bd9e040ff2d09199ebe778a.tar.bz2 spark-76f43ab59d3839600bd9e040ff2d09199ebe778a.zip |
Limit profile length to 1 hour
Diffstat (limited to 'spark-common/src/main/java/me/lucko/spark/common/sampler')
10 files changed, 226 insertions, 27 deletions
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java index 52a7387..382950a 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java @@ -93,15 +93,28 @@ public class SamplerBuilder { } public Sampler start(SparkPlatform platform) { + boolean onlyTicksOverMode = this.ticksOver != -1 && this.tickHook != null; + boolean canUseAsyncProfiler = this.useAsyncProfiler && + !(this.ignoreSleeping || this.ignoreNative) && + !(this.threadDumper instanceof ThreadDumper.Regex) && + AsyncProfilerAccess.getInstance(platform).checkSupported(platform); + + int intervalMicros = (int) (this.samplingInterval * 1000d); Sampler sampler; - if (this.ticksOver != -1 && this.tickHook != null) { - sampler = new JavaSampler(platform, intervalMicros, this.threadDumper, this.threadGrouper, this.timeout, this.ignoreSleeping, this.ignoreNative, this.tickHook, this.ticksOver); - } else if (this.useAsyncProfiler && !(this.threadDumper instanceof ThreadDumper.Regex) && AsyncProfilerAccess.getInstance(platform).checkSupported(platform)) { - sampler = new AsyncSampler(platform, intervalMicros, this.threadDumper, this.threadGrouper, this.timeout); + if (onlyTicksOverMode) { + sampler = new JavaSampler(platform, intervalMicros, this.threadDumper, + this.threadGrouper, this.timeout, this.ignoreSleeping, this.ignoreNative, + this.tickHook, this.ticksOver); + + } else if (canUseAsyncProfiler) { + sampler = new AsyncSampler(platform, intervalMicros, this.threadDumper, + this.threadGrouper, this.timeout); + } else { - sampler = new JavaSampler(platform, intervalMicros, this.threadDumper, this.threadGrouper, this.timeout, this.ignoreSleeping, this.ignoreNative); + sampler = new JavaSampler(platform, intervalMicros, this.threadDumper, + this.threadGrouper, this.timeout, this.ignoreSleeping, this.ignoreNative); } sampler.start(); diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerContainer.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerContainer.java new file mode 100644 index 0000000..55913d8 --- /dev/null +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerContainer.java @@ -0,0 +1,76 @@ +/* + * This file is part of spark. + * + * Copyright (c) lucko (Luck) <luck@lucko.me> + * Copyright (c) contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +package me.lucko.spark.common.sampler; + +import java.util.concurrent.atomic.AtomicReference; + +/** + * Container for the active sampler. + */ +public class SamplerContainer implements AutoCloseable { + + private final AtomicReference<Sampler> activeSampler = new AtomicReference<>(); + + /** + * Gets the active sampler, or null if a sampler is not active. + * + * @return the active sampler + */ + public Sampler getActiveSampler() { + return this.activeSampler.get(); + } + + /** + * Sets the active sampler, throwing an exception if another sampler is already active. + * + * @param sampler the sampler + */ + public void setActiveSampler(Sampler sampler) { + if (!this.activeSampler.compareAndSet(null, sampler)) { + throw new IllegalStateException("Attempted to set active sampler when another was already active!"); + } + } + + /** + * Unsets the active sampler, if the provided sampler is active. + * + * @param sampler the sampler + */ + public void unsetActiveSampler(Sampler sampler) { + this.activeSampler.compareAndSet(sampler, null); + } + + /** + * Stops the active sampler, if there is one. + */ + public void stopActiveSampler() { + Sampler sampler = this.activeSampler.getAndSet(null); + if (sampler != null) { + sampler.stop(); + } + } + + @Override + public void close() { + stopActiveSampler(); + } + +} diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/AbstractDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/AbstractDataAggregator.java index ad9dee4..2c003e5 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/AbstractDataAggregator.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/AbstractDataAggregator.java @@ -27,6 +27,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.IntPredicate; /** * Abstract implementation of {@link DataAggregator}. @@ -52,6 +53,11 @@ public abstract class AbstractDataAggregator implements DataAggregator { } @Override + public void pruneData(IntPredicate timeWindowPredicate) { + this.threadData.values().removeIf(node -> node.removeTimeWindowsRecursively(timeWindowPredicate)); + } + + @Override public List<ThreadNode> exportData() { List<ThreadNode> data = new ArrayList<>(this.threadData.values()); for (ThreadNode node : data) { diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/DataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/DataAggregator.java index 5590a96..ed33204 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/DataAggregator.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/DataAggregator.java @@ -24,6 +24,7 @@ import me.lucko.spark.common.sampler.node.ThreadNode; import me.lucko.spark.proto.SparkSamplerProtos.SamplerMetadata; import java.util.List; +import java.util.function.IntPredicate; /** * Aggregates sampling data. @@ -38,6 +39,13 @@ public interface DataAggregator { List<ThreadNode> exportData(); /** + * Prunes windows of data from this aggregator if the given {@code timeWindowPredicate} returns true. + * + * @param timeWindowPredicate the predicate + */ + void pruneData(IntPredicate timeWindowPredicate); + + /** * Gets metadata about the data aggregator instance. */ SamplerMetadata.DataAggregator getMetadata(); diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java index 2c9bb5f..cbc81c7 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java @@ -76,15 +76,20 @@ public class AsyncSampler extends AbstractSampler { this.windowStatisticsCollector.startCountingTicks(tickHook); } - int window = ProfilingWindowUtils.unixMillisToWindow(System.currentTimeMillis()); + int window = ProfilingWindowUtils.windowNow(); AsyncProfilerJob job = this.profilerAccess.startNewProfilerJob(); job.init(this.platform, this.interval, this.threadDumper, window); job.start(); this.currentJob = job; - // rotate the sampler job every minute to put data into a new window - this.scheduler.scheduleAtFixedRate(this::rotateProfilerJob, 1, 1, TimeUnit.MINUTES); + // rotate the sampler job to put data into a new window + this.scheduler.scheduleAtFixedRate( + this::rotateProfilerJob, + ProfilingWindowUtils.WINDOW_SIZE_SECONDS, + ProfilingWindowUtils.WINDOW_SIZE_SECONDS, + TimeUnit.SECONDS + ); recordInitialGcStats(); scheduleTimeout(); @@ -117,6 +122,9 @@ public class AsyncSampler extends AbstractSampler { // aggregate the output of the previous job previousJob.aggregate(this.dataAggregator); + + // prune data older than the history size + this.dataAggregator.pruneData(ProfilingWindowUtils.keepHistoryBefore(window)); } } catch (Throwable e) { e.printStackTrace(); diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java index 8c96fd3..6aad5e3 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java @@ -34,8 +34,6 @@ import me.lucko.spark.common.sampler.window.WindowStatisticsCollector; import me.lucko.spark.common.tick.TickHook; import me.lucko.spark.proto.SparkSamplerProtos.SamplerData; -import org.checkerframework.checker.units.qual.A; - import java.lang.management.ManagementFactory; import java.lang.management.ThreadInfo; import java.lang.management.ThreadMXBean; @@ -145,10 +143,15 @@ public class JavaSampler extends AbstractSampler implements Runnable { JavaSampler.this.dataAggregator.insertData(threadInfo, this.window); } - // if we have just stepped over into a new window, collect statistics for the previous window + // if we have just stepped over into a new window... int previousWindow = JavaSampler.this.lastWindow.getAndSet(this.window); if (previousWindow != 0 && previousWindow != this.window) { + + // collect statistics for the previous window JavaSampler.this.windowStatisticsCollector.measureNow(previousWindow); + + // prune data older than the history size + JavaSampler.this.dataAggregator.pruneData(ProfilingWindowUtils.keepHistoryBefore(this.window)); } } } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/node/AbstractNode.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/node/AbstractNode.java index e6f6cf5..2e4b055 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/node/AbstractNode.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/node/AbstractNode.java @@ -20,17 +20,18 @@ package me.lucko.spark.common.sampler.node; -import me.lucko.spark.common.sampler.async.jfr.Dictionary; import me.lucko.spark.common.sampler.window.ProtoTimeEncoder; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.LongAdder; -import java.util.stream.IntStream; +import java.util.function.IntPredicate; /** * Encapsulates a timed node in the sampling stack. @@ -43,9 +44,9 @@ public abstract class AbstractNode { private final Map<StackTraceNode.Description, StackTraceNode> children = new ConcurrentHashMap<>(); /** The accumulated sample time for this node, measured in microseconds */ - // long key = the window (effectively System.currentTimeMillis() / 60_000) + // Integer key = the window (effectively System.currentTimeMillis() / 60_000) // LongAdder value = accumulated time in microseconds - private final Dictionary<LongAdder> times = new Dictionary<>(); + private final Map<Integer, LongAdder> times = new HashMap<>(); /** * Gets the time accumulator for a given window @@ -67,10 +68,18 @@ public abstract class AbstractNode { * * @return the time windows */ - public IntStream getTimeWindows() { - IntStream.Builder keys = IntStream.builder(); - this.times.forEach((key, value) -> keys.add((int) key)); - return keys.build(); + public Set<Integer> getTimeWindows() { + return this.times.keySet(); + } + + /** + * Removes time windows from this node if they pass the given {@code predicate} test. + * + * @param predicate the predicate + * @return true if any time windows were removed + */ + public boolean removeTimeWindows(IntPredicate predicate) { + return this.times.keySet().removeIf(predicate::test); } /** @@ -100,7 +109,7 @@ public abstract class AbstractNode { * @param other the other node */ protected void merge(AbstractNode other) { - other.times.forEach((key, value) -> getTimeAccumulator((int) key).add(value.longValue())); + other.times.forEach((key, value) -> getTimeAccumulator(key).add(value.longValue())); for (Map.Entry<StackTraceNode.Description, StackTraceNode> child : other.children.entrySet()) { resolveChild(child.getKey()).merge(child.getValue()); } @@ -127,7 +136,6 @@ public abstract class AbstractNode { list.add(child); } - //list.sort(null); return list; } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/node/ThreadNode.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/node/ThreadNode.java index 9faece6..5035046 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/node/ThreadNode.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/node/ThreadNode.java @@ -25,9 +25,13 @@ import me.lucko.spark.common.util.IndexedListBuilder; import me.lucko.spark.proto.SparkSamplerProtos; import java.util.ArrayDeque; +import java.util.Collection; import java.util.Deque; +import java.util.Iterator; import java.util.LinkedList; import java.util.List; +import java.util.Queue; +import java.util.function.IntPredicate; /** * The root of a sampling stack for a given thread / thread group. @@ -89,6 +93,46 @@ public final class ThreadNode extends AbstractNode { } } + /** + * Removes time windows that match the given {@code predicate}. + * + * @param predicate the predicate to use to test the time windows + * @return true if this node is now empty + */ + public boolean removeTimeWindowsRecursively(IntPredicate predicate) { + Queue<AbstractNode> queue = new ArrayDeque<>(); + queue.add(this); + + while (!queue.isEmpty()) { + AbstractNode node = queue.remove(); + Collection<StackTraceNode> children = node.getChildren(); + + boolean needToProcessChildren = false; + + for (Iterator<StackTraceNode> it = children.iterator(); it.hasNext(); ) { + StackTraceNode child = it.next(); + + boolean windowsWereRemoved = child.removeTimeWindows(predicate); + boolean childIsNowEmpty = child.getTimeWindows().isEmpty(); + + if (childIsNowEmpty) { + it.remove(); + continue; + } + + if (windowsWereRemoved) { + needToProcessChildren = true; + } + } + + if (needToProcessChildren) { + queue.addAll(children); + } + } + + return getTimeWindows().isEmpty(); + } + public SparkSamplerProtos.ThreadNode toProto(MergeMode mergeMode, ProtoTimeEncoder timeEncoder) { SparkSamplerProtos.ThreadNode.Builder proto = SparkSamplerProtos.ThreadNode.newBuilder() .setName(getThreadLabel()); diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/window/ProfilingWindowUtils.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/window/ProfilingWindowUtils.java index 109adb3..be6f08a 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/window/ProfilingWindowUtils.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/window/ProfilingWindowUtils.java @@ -20,17 +20,51 @@ package me.lucko.spark.common.sampler.window; +import me.lucko.spark.common.sampler.aggregator.DataAggregator; + +import java.util.function.IntPredicate; + public enum ProfilingWindowUtils { ; /** + * The size/duration of a profiling window in seconds. + * (1 window = 1 minute) + */ + public static final int WINDOW_SIZE_SECONDS = 60; + + /** + * The number of windows to record in continuous profiling before data is dropped. + * (60 windows * 1 minute = 1 hour of profiling data) + */ + public static final int HISTORY_SIZE = Integer.getInteger("spark.continuousProfilingHistorySize", 60); + + /** * Gets the profiling window for the given time in unix-millis. * * @param time the time in milliseconds * @return the window */ public static int unixMillisToWindow(long time) { - // one window per minute - return (int) (time / 60_000); + return (int) (time / (WINDOW_SIZE_SECONDS * 1000L)); + } + + /** + * Gets the window at the current time. + * + * @return the window + */ + public static int windowNow() { + return unixMillisToWindow(System.currentTimeMillis()); + } + + /** + * Gets a prune predicate that can be passed to {@link DataAggregator#pruneData(IntPredicate)}. + * + * @return the prune predicate + */ + public static IntPredicate keepHistoryBefore(int currentWindow) { + // windows that were earlier than (currentWindow minus history size) should be pruned + return window -> window < (currentWindow - HISTORY_SIZE); } } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/window/ProtoTimeEncoder.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/window/ProtoTimeEncoder.java index edb2309..03da075 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/window/ProtoTimeEncoder.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/window/ProtoTimeEncoder.java @@ -21,7 +21,6 @@ package me.lucko.spark.common.sampler.window; import me.lucko.spark.common.sampler.async.jfr.Dictionary; -import me.lucko.spark.common.sampler.node.AbstractNode; import me.lucko.spark.common.sampler.node.ThreadNode; import java.util.HashMap; @@ -42,7 +41,7 @@ public class ProtoTimeEncoder { public ProtoTimeEncoder(List<ThreadNode> sourceData) { // get an array of all keys that show up in the source data this.keys = sourceData.stream() - .map(AbstractNode::getTimeWindows) + .map(n -> n.getTimeWindows().stream().mapToInt(i -> i)) .reduce(IntStream.empty(), IntStream::concat) .distinct() .sorted() @@ -70,14 +69,14 @@ public class ProtoTimeEncoder { * @param times a dictionary of times (unix-time millis -> duration in microseconds) * @return the times encoded as a double array */ - public double[] encode(Dictionary<LongAdder> times) { + public double[] encode(Map<Integer, LongAdder> times) { // construct an array of values - length needs to exactly match the // number of keys, even if some values are zero. double[] array = new double[this.keys.length]; times.forEach((key, value) -> { // get the index for the given key - Integer idx = this.keysToIndex.get((int) key); + Integer idx = this.keysToIndex.get(key); if (idx == null) { throw new RuntimeException("No index for key " + key + " in " + this.keysToIndex.keySet()); } |