diff options
author | Luck <git@lucko.me> | 2022-11-13 19:15:54 +0000 |
---|---|---|
committer | Luck <git@lucko.me> | 2022-11-13 19:15:54 +0000 |
commit | 76f43ab59d3839600bd9e040ff2d09199ebe778a (patch) | |
tree | e6e6605200b2eb6149c78a602fdbd9373e4494c7 /spark-common/src/main/java/me/lucko/spark/common | |
parent | 5af2e6fb4cbd21f836c7ad56100b3c4535a831de (diff) | |
download | spark-76f43ab59d3839600bd9e040ff2d09199ebe778a.tar.gz spark-76f43ab59d3839600bd9e040ff2d09199ebe778a.tar.bz2 spark-76f43ab59d3839600bd9e040ff2d09199ebe778a.zip |
Limit profile length to 1 hour
Diffstat (limited to 'spark-common/src/main/java/me/lucko/spark/common')
12 files changed, 254 insertions, 57 deletions
diff --git a/spark-common/src/main/java/me/lucko/spark/common/SparkPlatform.java b/spark-common/src/main/java/me/lucko/spark/common/SparkPlatform.java index 4c3875c..a015e42 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/SparkPlatform.java +++ b/spark-common/src/main/java/me/lucko/spark/common/SparkPlatform.java @@ -45,6 +45,7 @@ import me.lucko.spark.common.monitor.ping.PingStatistics; import me.lucko.spark.common.monitor.ping.PlayerPingProvider; import me.lucko.spark.common.monitor.tick.TickStatistics; import me.lucko.spark.common.platform.PlatformStatisticsProvider; +import me.lucko.spark.common.sampler.SamplerContainer; import me.lucko.spark.common.sampler.source.ClassSourceLookup; import me.lucko.spark.common.tick.TickHook; import me.lucko.spark.common.tick.TickReporter; @@ -98,6 +99,7 @@ public class SparkPlatform { private final List<Command> commands; private final ReentrantLock commandExecuteLock = new ReentrantLock(true); private final ActivityLog activityLog; + private final SamplerContainer samplerContainer; private final TickHook tickHook; private final TickReporter tickReporter; private final TickStatistics tickStatistics; @@ -137,6 +139,8 @@ public class SparkPlatform { this.activityLog = new ActivityLog(plugin.getPluginDirectory().resolve("activity.json")); this.activityLog.load(); + this.samplerContainer = new SamplerContainer(); + this.tickHook = plugin.createTickHook(); this.tickReporter = plugin.createTickReporter(); this.tickStatistics = this.tickHook != null || this.tickReporter != null ? new TickStatistics() : null; @@ -229,6 +233,10 @@ public class SparkPlatform { return this.activityLog; } + public SamplerContainer getSamplerContainer() { + return this.samplerContainer; + } + public TickHook getTickHook() { return this.tickHook; } diff --git a/spark-common/src/main/java/me/lucko/spark/common/command/modules/SamplerModule.java b/spark-common/src/main/java/me/lucko/spark/common/command/modules/SamplerModule.java index 6dbf913..00bf1a9 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/command/modules/SamplerModule.java +++ b/spark-common/src/main/java/me/lucko/spark/common/command/modules/SamplerModule.java @@ -68,17 +68,6 @@ import static net.kyori.adventure.text.format.NamedTextColor.WHITE; public class SamplerModule implements CommandModule { private static final String SPARK_SAMPLER_MEDIA_TYPE = "application/x-spark-sampler"; - /** The sampler instance currently running, if any */ - private Sampler activeSampler = null; - - @Override - public void close() { - if (this.activeSampler != null) { - this.activeSampler.stop(); - this.activeSampler = null; - } - } - @Override public void registerCommands(Consumer<Command> consumer) { consumer.accept(Command.builder() @@ -121,7 +110,7 @@ public class SamplerModule implements CommandModule { } if (arguments.boolFlag("cancel")) { - profilerCancel(resp); + profilerCancel(platform, resp); return; } @@ -134,7 +123,7 @@ public class SamplerModule implements CommandModule { } private void profilerStart(SparkPlatform platform, CommandSender sender, CommandResponseHandler resp, Arguments arguments) { - if (this.activeSampler != null) { + if (platform.getSamplerContainer().getActiveSampler() != null) { profilerInfo(platform, resp); return; } @@ -210,7 +199,8 @@ public class SamplerModule implements CommandModule { if (ticksOver != -1) { builder.ticksOver(ticksOver, tickHook); } - Sampler sampler = this.activeSampler = builder.start(platform); + Sampler sampler = builder.start(platform); + platform.getSamplerContainer().setActiveSampler(sampler); resp.broadcastPrefixed(text() .append(text("Profiler is now running!", GOLD)) @@ -227,7 +217,7 @@ public class SamplerModule implements CommandModule { resp.broadcastPrefixed(text("The results will be automatically returned after the profiler has been running for " + timeoutSeconds + " seconds.")); } - CompletableFuture<Sampler> future = this.activeSampler.getFuture(); + CompletableFuture<Sampler> future = sampler.getFuture(); // send message if profiling fails future.whenCompleteAsync((s, throwable) -> { @@ -238,11 +228,7 @@ public class SamplerModule implements CommandModule { }); // set activeSampler to null when complete. - future.whenCompleteAsync((s, throwable) -> { - if (sampler == this.activeSampler) { - this.activeSampler = null; - } - }); + sampler.getFuture().whenCompleteAsync((s, throwable) -> platform.getSamplerContainer().unsetActiveSampler(s)); // await the result if (timeoutSeconds != -1) { @@ -258,17 +244,18 @@ public class SamplerModule implements CommandModule { } private void profilerInfo(SparkPlatform platform, CommandResponseHandler resp) { - if (this.activeSampler == null) { + Sampler sampler = platform.getSamplerContainer().getActiveSampler(); + if (sampler == null) { resp.replyPrefixed(text("The profiler isn't running!")); resp.replyPrefixed(text("To start a new one, run:")); resp.replyPrefixed(cmdPrompt("/" + platform.getPlugin().getCommandName() + " profiler")); } else { resp.replyPrefixed(text("Profiler is already running!", GOLD)); - long runningTime = (System.currentTimeMillis() - this.activeSampler.getStartTime()) / 1000L; + long runningTime = (System.currentTimeMillis() - sampler.getStartTime()) / 1000L; resp.replyPrefixed(text("So far, it has profiled for " + runningTime + " seconds.")); - long timeout = this.activeSampler.getAutoEndTime(); + long timeout = sampler.getAutoEndTime(); if (timeout == -1) { resp.replyPrefixed(text("To stop the profiler and upload the results, run:")); resp.replyPrefixed(cmdPrompt("/" + platform.getPlugin().getCommandName() + " profiler --stop")); @@ -282,20 +269,24 @@ public class SamplerModule implements CommandModule { } } - private void profilerCancel(CommandResponseHandler resp) { - if (this.activeSampler == null) { + private void profilerCancel(SparkPlatform platform, CommandResponseHandler resp) { + Sampler sampler = platform.getSamplerContainer().getActiveSampler(); + if (sampler == null) { resp.replyPrefixed(text("There isn't an active profiler running.")); } else { - close(); + platform.getSamplerContainer().stopActiveSampler(); resp.broadcastPrefixed(text("Profiler has been cancelled.", GOLD)); } } private void profilerStop(SparkPlatform platform, CommandSender sender, CommandResponseHandler resp, Arguments arguments) { - if (this.activeSampler == null) { + Sampler sampler = platform.getSamplerContainer().getActiveSampler(); + + if (sampler == null) { resp.replyPrefixed(text("There isn't an active profiler running.")); } else { - this.activeSampler.stop(); + platform.getSamplerContainer().unsetActiveSampler(sampler); + sampler.stop(); boolean saveToFile = arguments.boolFlag("save-to-file"); if (saveToFile) { @@ -307,8 +298,7 @@ public class SamplerModule implements CommandModule { String comment = Iterables.getFirst(arguments.stringFlag("comment"), null); MethodDisambiguator methodDisambiguator = new MethodDisambiguator(); MergeMode mergeMode = arguments.boolFlag("separate-parent-calls") ? MergeMode.separateParentCalls(methodDisambiguator) : MergeMode.sameMethod(methodDisambiguator); - handleUpload(platform, resp, this.activeSampler, comment, mergeMode, saveToFile); - this.activeSampler = null; + handleUpload(platform, resp, sampler, comment, mergeMode, saveToFile); } } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java index 52a7387..382950a 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java @@ -93,15 +93,28 @@ public class SamplerBuilder { } public Sampler start(SparkPlatform platform) { + boolean onlyTicksOverMode = this.ticksOver != -1 && this.tickHook != null; + boolean canUseAsyncProfiler = this.useAsyncProfiler && + !(this.ignoreSleeping || this.ignoreNative) && + !(this.threadDumper instanceof ThreadDumper.Regex) && + AsyncProfilerAccess.getInstance(platform).checkSupported(platform); + + int intervalMicros = (int) (this.samplingInterval * 1000d); Sampler sampler; - if (this.ticksOver != -1 && this.tickHook != null) { - sampler = new JavaSampler(platform, intervalMicros, this.threadDumper, this.threadGrouper, this.timeout, this.ignoreSleeping, this.ignoreNative, this.tickHook, this.ticksOver); - } else if (this.useAsyncProfiler && !(this.threadDumper instanceof ThreadDumper.Regex) && AsyncProfilerAccess.getInstance(platform).checkSupported(platform)) { - sampler = new AsyncSampler(platform, intervalMicros, this.threadDumper, this.threadGrouper, this.timeout); + if (onlyTicksOverMode) { + sampler = new JavaSampler(platform, intervalMicros, this.threadDumper, + this.threadGrouper, this.timeout, this.ignoreSleeping, this.ignoreNative, + this.tickHook, this.ticksOver); + + } else if (canUseAsyncProfiler) { + sampler = new AsyncSampler(platform, intervalMicros, this.threadDumper, + this.threadGrouper, this.timeout); + } else { - sampler = new JavaSampler(platform, intervalMicros, this.threadDumper, this.threadGrouper, this.timeout, this.ignoreSleeping, this.ignoreNative); + sampler = new JavaSampler(platform, intervalMicros, this.threadDumper, + this.threadGrouper, this.timeout, this.ignoreSleeping, this.ignoreNative); } sampler.start(); diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerContainer.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerContainer.java new file mode 100644 index 0000000..55913d8 --- /dev/null +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerContainer.java @@ -0,0 +1,76 @@ +/* + * This file is part of spark. + * + * Copyright (c) lucko (Luck) <luck@lucko.me> + * Copyright (c) contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +package me.lucko.spark.common.sampler; + +import java.util.concurrent.atomic.AtomicReference; + +/** + * Container for the active sampler. + */ +public class SamplerContainer implements AutoCloseable { + + private final AtomicReference<Sampler> activeSampler = new AtomicReference<>(); + + /** + * Gets the active sampler, or null if a sampler is not active. + * + * @return the active sampler + */ + public Sampler getActiveSampler() { + return this.activeSampler.get(); + } + + /** + * Sets the active sampler, throwing an exception if another sampler is already active. + * + * @param sampler the sampler + */ + public void setActiveSampler(Sampler sampler) { + if (!this.activeSampler.compareAndSet(null, sampler)) { + throw new IllegalStateException("Attempted to set active sampler when another was already active!"); + } + } + + /** + * Unsets the active sampler, if the provided sampler is active. + * + * @param sampler the sampler + */ + public void unsetActiveSampler(Sampler sampler) { + this.activeSampler.compareAndSet(sampler, null); + } + + /** + * Stops the active sampler, if there is one. + */ + public void stopActiveSampler() { + Sampler sampler = this.activeSampler.getAndSet(null); + if (sampler != null) { + sampler.stop(); + } + } + + @Override + public void close() { + stopActiveSampler(); + } + +} diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/AbstractDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/AbstractDataAggregator.java index ad9dee4..2c003e5 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/AbstractDataAggregator.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/AbstractDataAggregator.java @@ -27,6 +27,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.IntPredicate; /** * Abstract implementation of {@link DataAggregator}. @@ -52,6 +53,11 @@ public abstract class AbstractDataAggregator implements DataAggregator { } @Override + public void pruneData(IntPredicate timeWindowPredicate) { + this.threadData.values().removeIf(node -> node.removeTimeWindowsRecursively(timeWindowPredicate)); + } + + @Override public List<ThreadNode> exportData() { List<ThreadNode> data = new ArrayList<>(this.threadData.values()); for (ThreadNode node : data) { diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/DataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/DataAggregator.java index 5590a96..ed33204 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/DataAggregator.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/DataAggregator.java @@ -24,6 +24,7 @@ import me.lucko.spark.common.sampler.node.ThreadNode; import me.lucko.spark.proto.SparkSamplerProtos.SamplerMetadata; import java.util.List; +import java.util.function.IntPredicate; /** * Aggregates sampling data. @@ -38,6 +39,13 @@ public interface DataAggregator { List<ThreadNode> exportData(); /** + * Prunes windows of data from this aggregator if the given {@code timeWindowPredicate} returns true. + * + * @param timeWindowPredicate the predicate + */ + void pruneData(IntPredicate timeWindowPredicate); + + /** * Gets metadata about the data aggregator instance. */ SamplerMetadata.DataAggregator getMetadata(); diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java index 2c9bb5f..cbc81c7 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java @@ -76,15 +76,20 @@ public class AsyncSampler extends AbstractSampler { this.windowStatisticsCollector.startCountingTicks(tickHook); } - int window = ProfilingWindowUtils.unixMillisToWindow(System.currentTimeMillis()); + int window = ProfilingWindowUtils.windowNow(); AsyncProfilerJob job = this.profilerAccess.startNewProfilerJob(); job.init(this.platform, this.interval, this.threadDumper, window); job.start(); this.currentJob = job; - // rotate the sampler job every minute to put data into a new window - this.scheduler.scheduleAtFixedRate(this::rotateProfilerJob, 1, 1, TimeUnit.MINUTES); + // rotate the sampler job to put data into a new window + this.scheduler.scheduleAtFixedRate( + this::rotateProfilerJob, + ProfilingWindowUtils.WINDOW_SIZE_SECONDS, + ProfilingWindowUtils.WINDOW_SIZE_SECONDS, + TimeUnit.SECONDS + ); recordInitialGcStats(); scheduleTimeout(); @@ -117,6 +122,9 @@ public class AsyncSampler extends AbstractSampler { // aggregate the output of the previous job previousJob.aggregate(this.dataAggregator); + + // prune data older than the history size + this.dataAggregator.pruneData(ProfilingWindowUtils.keepHistoryBefore(window)); } } catch (Throwable e) { e.printStackTrace(); diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java index 8c96fd3..6aad5e3 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java @@ -34,8 +34,6 @@ import me.lucko.spark.common.sampler.window.WindowStatisticsCollector; import me.lucko.spark.common.tick.TickHook; import me.lucko.spark.proto.SparkSamplerProtos.SamplerData; -import org.checkerframework.checker.units.qual.A; - import java.lang.management.ManagementFactory; import java.lang.management.ThreadInfo; import java.lang.management.ThreadMXBean; @@ -145,10 +143,15 @@ public class JavaSampler extends AbstractSampler implements Runnable { JavaSampler.this.dataAggregator.insertData(threadInfo, this.window); } - // if we have just stepped over into a new window, collect statistics for the previous window + // if we have just stepped over into a new window... int previousWindow = JavaSampler.this.lastWindow.getAndSet(this.window); if (previousWindow != 0 && previousWindow != this.window) { + + // collect statistics for the previous window JavaSampler.this.windowStatisticsCollector.measureNow(previousWindow); + + // prune data older than the history size + JavaSampler.this.dataAggregator.pruneData(ProfilingWindowUtils.keepHistoryBefore(this.window)); } } } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/node/AbstractNode.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/node/AbstractNode.java index e6f6cf5..2e4b055 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/node/AbstractNode.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/node/AbstractNode.java @@ -20,17 +20,18 @@ package me.lucko.spark.common.sampler.node; -import me.lucko.spark.common.sampler.async.jfr.Dictionary; import me.lucko.spark.common.sampler.window.ProtoTimeEncoder; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.LongAdder; -import java.util.stream.IntStream; +import java.util.function.IntPredicate; /** * Encapsulates a timed node in the sampling stack. @@ -43,9 +44,9 @@ public abstract class AbstractNode { private final Map<StackTraceNode.Description, StackTraceNode> children = new ConcurrentHashMap<>(); /** The accumulated sample time for this node, measured in microseconds */ - // long key = the window (effectively System.currentTimeMillis() / 60_000) + // Integer key = the window (effectively System.currentTimeMillis() / 60_000) // LongAdder value = accumulated time in microseconds - private final Dictionary<LongAdder> times = new Dictionary<>(); + private final Map<Integer, LongAdder> times = new HashMap<>(); /** * Gets the time accumulator for a given window @@ -67,10 +68,18 @@ public abstract class AbstractNode { * * @return the time windows */ - public IntStream getTimeWindows() { - IntStream.Builder keys = IntStream.builder(); - this.times.forEach((key, value) -> keys.add((int) key)); - return keys.build(); + public Set<Integer> getTimeWindows() { + return this.times.keySet(); + } + + /** + * Removes time windows from this node if they pass the given {@code predicate} test. + * + * @param predicate the predicate + * @return true if any time windows were removed + */ + public boolean removeTimeWindows(IntPredicate predicate) { + return this.times.keySet().removeIf(predicate::test); } /** @@ -100,7 +109,7 @@ public abstract class AbstractNode { * @param other the other node */ protected void merge(AbstractNode other) { - other.times.forEach((key, value) -> getTimeAccumulator((int) key).add(value.longValue())); + other.times.forEach((key, value) -> getTimeAccumulator(key).add(value.longValue())); for (Map.Entry<StackTraceNode.Description, StackTraceNode> child : other.children.entrySet()) { resolveChild(child.getKey()).merge(child.getValue()); } @@ -127,7 +136,6 @@ public abstract class AbstractNode { list.add(child); } - //list.sort(null); return list; } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/node/ThreadNode.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/node/ThreadNode.java index 9faece6..5035046 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/node/ThreadNode.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/node/ThreadNode.java @@ -25,9 +25,13 @@ import me.lucko.spark.common.util.IndexedListBuilder; import me.lucko.spark.proto.SparkSamplerProtos; import java.util.ArrayDeque; +import java.util.Collection; import java.util.Deque; +import java.util.Iterator; import java.util.LinkedList; import java.util.List; +import java.util.Queue; +import java.util.function.IntPredicate; /** * The root of a sampling stack for a given thread / thread group. @@ -89,6 +93,46 @@ public final class ThreadNode extends AbstractNode { } } + /** + * Removes time windows that match the given {@code predicate}. + * + * @param predicate the predicate to use to test the time windows + * @return true if this node is now empty + */ + public boolean removeTimeWindowsRecursively(IntPredicate predicate) { + Queue<AbstractNode> queue = new ArrayDeque<>(); + queue.add(this); + + while (!queue.isEmpty()) { + AbstractNode node = queue.remove(); + Collection<StackTraceNode> children = node.getChildren(); + + boolean needToProcessChildren = false; + + for (Iterator<StackTraceNode> it = children.iterator(); it.hasNext(); ) { + StackTraceNode child = it.next(); + + boolean windowsWereRemoved = child.removeTimeWindows(predicate); + boolean childIsNowEmpty = child.getTimeWindows().isEmpty(); + + if (childIsNowEmpty) { + it.remove(); + continue; + } + + if (windowsWereRemoved) { + needToProcessChildren = true; + } + } + + if (needToProcessChildren) { + queue.addAll(children); + } + } + + return getTimeWindows().isEmpty(); + } + public SparkSamplerProtos.ThreadNode toProto(MergeMode mergeMode, ProtoTimeEncoder timeEncoder) { SparkSamplerProtos.ThreadNode.Builder proto = SparkSamplerProtos.ThreadNode.newBuilder() .setName(getThreadLabel()); diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/window/ProfilingWindowUtils.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/window/ProfilingWindowUtils.java index 109adb3..be6f08a 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/window/ProfilingWindowUtils.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/window/ProfilingWindowUtils.java @@ -20,17 +20,51 @@ package me.lucko.spark.common.sampler.window; +import me.lucko.spark.common.sampler.aggregator.DataAggregator; + +import java.util.function.IntPredicate; + public enum ProfilingWindowUtils { ; /** + * The size/duration of a profiling window in seconds. + * (1 window = 1 minute) + */ + public static final int WINDOW_SIZE_SECONDS = 60; + + /** + * The number of windows to record in continuous profiling before data is dropped. + * (60 windows * 1 minute = 1 hour of profiling data) + */ + public static final int HISTORY_SIZE = Integer.getInteger("spark.continuousProfilingHistorySize", 60); + + /** * Gets the profiling window for the given time in unix-millis. * * @param time the time in milliseconds * @return the window */ public static int unixMillisToWindow(long time) { - // one window per minute - return (int) (time / 60_000); + return (int) (time / (WINDOW_SIZE_SECONDS * 1000L)); + } + + /** + * Gets the window at the current time. + * + * @return the window + */ + public static int windowNow() { + return unixMillisToWindow(System.currentTimeMillis()); + } + + /** + * Gets a prune predicate that can be passed to {@link DataAggregator#pruneData(IntPredicate)}. + * + * @return the prune predicate + */ + public static IntPredicate keepHistoryBefore(int currentWindow) { + // windows that were earlier than (currentWindow minus history size) should be pruned + return window -> window < (currentWindow - HISTORY_SIZE); } } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/window/ProtoTimeEncoder.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/window/ProtoTimeEncoder.java index edb2309..03da075 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/window/ProtoTimeEncoder.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/window/ProtoTimeEncoder.java @@ -21,7 +21,6 @@ package me.lucko.spark.common.sampler.window; import me.lucko.spark.common.sampler.async.jfr.Dictionary; -import me.lucko.spark.common.sampler.node.AbstractNode; import me.lucko.spark.common.sampler.node.ThreadNode; import java.util.HashMap; @@ -42,7 +41,7 @@ public class ProtoTimeEncoder { public ProtoTimeEncoder(List<ThreadNode> sourceData) { // get an array of all keys that show up in the source data this.keys = sourceData.stream() - .map(AbstractNode::getTimeWindows) + .map(n -> n.getTimeWindows().stream().mapToInt(i -> i)) .reduce(IntStream.empty(), IntStream::concat) .distinct() .sorted() @@ -70,14 +69,14 @@ public class ProtoTimeEncoder { * @param times a dictionary of times (unix-time millis -> duration in microseconds) * @return the times encoded as a double array */ - public double[] encode(Dictionary<LongAdder> times) { + public double[] encode(Map<Integer, LongAdder> times) { // construct an array of values - length needs to exactly match the // number of keys, even if some values are zero. double[] array = new double[this.keys.length]; times.forEach((key, value) -> { // get the index for the given key - Integer idx = this.keysToIndex.get((int) key); + Integer idx = this.keysToIndex.get(key); if (idx == null) { throw new RuntimeException("No index for key " + key + " in " + this.keysToIndex.keySet()); } |