diff options
author | Luck <git@lucko.me> | 2022-11-13 21:24:57 +0000 |
---|---|---|
committer | Luck <git@lucko.me> | 2022-11-13 21:24:57 +0000 |
commit | f2d77d875f32f107987c93da1f90529fc6812444 (patch) | |
tree | 1ceef4324fc9ece8f9c4389e2f4da07cb844c3d2 /spark-common/src/main/java/me/lucko/spark/common/sampler | |
parent | 76f43ab59d3839600bd9e040ff2d09199ebe778a (diff) | |
download | spark-f2d77d875f32f107987c93da1f90529fc6812444.tar.gz spark-f2d77d875f32f107987c93da1f90529fc6812444.tar.bz2 spark-f2d77d875f32f107987c93da1f90529fc6812444.zip |
Background profiler
Diffstat (limited to 'spark-common/src/main/java/me/lucko/spark/common/sampler')
9 files changed, 130 insertions, 34 deletions
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/AbstractSampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/AbstractSampler.java index c650738..feefd66 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/AbstractSampler.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/AbstractSampler.java @@ -32,8 +32,6 @@ import me.lucko.spark.common.sampler.source.ClassSourceLookup; import me.lucko.spark.common.sampler.source.SourceMetadata; import me.lucko.spark.common.sampler.window.ProtoTimeEncoder; import me.lucko.spark.common.sampler.window.WindowStatisticsCollector; -import me.lucko.spark.common.tick.TickHook; -import me.lucko.spark.proto.SparkProtos; import me.lucko.spark.proto.SparkSamplerProtos.SamplerData; import me.lucko.spark.proto.SparkSamplerProtos.SamplerMetadata; @@ -64,6 +62,9 @@ public abstract class AbstractSampler implements Sampler { /** The unix timestamp (in millis) when this sampler should automatically complete. */ protected final long autoEndTime; // -1 for nothing + /** If the sampler is running in the background */ + protected boolean background; + /** Collects statistics for each window in the sample */ protected final WindowStatisticsCollector windowStatisticsCollector; @@ -73,11 +74,12 @@ public abstract class AbstractSampler implements Sampler { /** The garbage collector statistics when profiling started */ protected Map<String, GarbageCollectorStatistics> initialGcStats; - protected AbstractSampler(SparkPlatform platform, int interval, ThreadDumper threadDumper, long autoEndTime) { + protected AbstractSampler(SparkPlatform platform, SamplerSettings settings) { this.platform = platform; - this.interval = interval; - this.threadDumper = threadDumper; - this.autoEndTime = autoEndTime; + this.interval = settings.interval(); + this.threadDumper = settings.threadDumper(); + this.autoEndTime = settings.autoEndTime(); + this.background = settings.runningInBackground(); this.windowStatisticsCollector = new WindowStatisticsCollector(platform); } @@ -95,6 +97,11 @@ public abstract class AbstractSampler implements Sampler { } @Override + public boolean isRunningInBackground() { + return this.background; + } + + @Override public CompletableFuture<Sampler> getFuture() { return this.future; } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java index e06cba6..5d2026d 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java @@ -58,6 +58,13 @@ public interface Sampler { long getAutoEndTime(); /** + * If this sampler is running in the background. (wasn't started by a specific user) + * + * @return true if the sampler is running in the background + */ + boolean isRunningInBackground(); + + /** * Gets a future to encapsulate the completion of the sampler * * @return a future diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java index 382950a..ec635ef 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java @@ -38,7 +38,8 @@ public class SamplerBuilder { private boolean ignoreSleeping = false; private boolean ignoreNative = false; private boolean useAsyncProfiler = true; - private long timeout = -1; + private long autoEndTime = -1; + private boolean background = false; private ThreadDumper threadDumper = ThreadDumper.ALL; private ThreadGrouper threadGrouper = ThreadGrouper.BY_NAME; @@ -57,7 +58,12 @@ public class SamplerBuilder { if (timeout <= 0) { throw new IllegalArgumentException("timeout > 0"); } - this.timeout = System.currentTimeMillis() + unit.toMillis(timeout); + this.autoEndTime = System.currentTimeMillis() + unit.toMillis(timeout); + return this; + } + + public SamplerBuilder background(boolean background) { + this.background = background; return this; } @@ -95,26 +101,22 @@ public class SamplerBuilder { public Sampler start(SparkPlatform platform) { boolean onlyTicksOverMode = this.ticksOver != -1 && this.tickHook != null; boolean canUseAsyncProfiler = this.useAsyncProfiler && + !onlyTicksOverMode && !(this.ignoreSleeping || this.ignoreNative) && !(this.threadDumper instanceof ThreadDumper.Regex) && AsyncProfilerAccess.getInstance(platform).checkSupported(platform); int intervalMicros = (int) (this.samplingInterval * 1000d); + SamplerSettings settings = new SamplerSettings(intervalMicros, this.threadDumper, this.threadGrouper, this.autoEndTime, this.background); Sampler sampler; - if (onlyTicksOverMode) { - sampler = new JavaSampler(platform, intervalMicros, this.threadDumper, - this.threadGrouper, this.timeout, this.ignoreSleeping, this.ignoreNative, - this.tickHook, this.ticksOver); - - } else if (canUseAsyncProfiler) { - sampler = new AsyncSampler(platform, intervalMicros, this.threadDumper, - this.threadGrouper, this.timeout); - + if (canUseAsyncProfiler) { + sampler = new AsyncSampler(platform, settings); + } else if (onlyTicksOverMode) { + sampler = new JavaSampler(platform, settings, this.ignoreSleeping, this.ignoreNative, this.tickHook, this.ticksOver); } else { - sampler = new JavaSampler(platform, intervalMicros, this.threadDumper, - this.threadGrouper, this.timeout, this.ignoreSleeping, this.ignoreNative); + sampler = new JavaSampler(platform, settings, this.ignoreSleeping, this.ignoreNative); } sampler.start(); diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerContainer.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerContainer.java index 55913d8..f56dee5 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerContainer.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerContainer.java @@ -28,6 +28,11 @@ import java.util.concurrent.atomic.AtomicReference; public class SamplerContainer implements AutoCloseable { private final AtomicReference<Sampler> activeSampler = new AtomicReference<>(); + private final boolean backgroundProfilerEnabled; + + public SamplerContainer(boolean backgroundProfilerEnabled) { + this.backgroundProfilerEnabled = backgroundProfilerEnabled; + } /** * Gets the active sampler, or null if a sampler is not active. @@ -68,6 +73,10 @@ public class SamplerContainer implements AutoCloseable { } } + public boolean isBackgroundProfilerEnabled() { + return this.backgroundProfilerEnabled; + } + @Override public void close() { stopActiveSampler(); diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerSettings.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerSettings.java new file mode 100644 index 0000000..6e55a43 --- /dev/null +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerSettings.java @@ -0,0 +1,61 @@ +/* + * This file is part of spark. + * + * Copyright (c) lucko (Luck) <luck@lucko.me> + * Copyright (c) contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +package me.lucko.spark.common.sampler; + +/** + * Base settings for all samplers + */ +public class SamplerSettings { + + private final int interval; + private final ThreadDumper threadDumper; + private final ThreadGrouper threadGrouper; + private final long autoEndTime; + private final boolean runningInBackground; + + public SamplerSettings(int interval, ThreadDumper threadDumper, ThreadGrouper threadGrouper, long autoEndTime, boolean runningInBackground) { + this.interval = interval; + this.threadDumper = threadDumper; + this.threadGrouper = threadGrouper; + this.autoEndTime = autoEndTime; + this.runningInBackground = runningInBackground; + } + + public int interval() { + return this.interval; + } + + public ThreadDumper threadDumper() { + return this.threadDumper; + } + + public ThreadGrouper threadGrouper() { + return this.threadGrouper; + } + + public long autoEndTime() { + return this.autoEndTime; + } + + public boolean runningInBackground() { + return this.runningInBackground; + } +} diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java index cbc81c7..d6cfd4f 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java @@ -25,8 +25,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import me.lucko.spark.common.SparkPlatform; import me.lucko.spark.common.command.sender.CommandSender; import me.lucko.spark.common.sampler.AbstractSampler; -import me.lucko.spark.common.sampler.ThreadDumper; -import me.lucko.spark.common.sampler.ThreadGrouper; +import me.lucko.spark.common.sampler.SamplerSettings; import me.lucko.spark.common.sampler.node.MergeMode; import me.lucko.spark.common.sampler.source.ClassSourceLookup; import me.lucko.spark.common.sampler.window.ProfilingWindowUtils; @@ -36,6 +35,7 @@ import me.lucko.spark.proto.SparkSamplerProtos.SamplerData; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.function.IntPredicate; /** * A sampler implementation using async-profiler. @@ -55,10 +55,10 @@ public class AsyncSampler extends AbstractSampler { /** The executor used for scheduling and management */ private ScheduledExecutorService scheduler; - public AsyncSampler(SparkPlatform platform, int interval, ThreadDumper threadDumper, ThreadGrouper threadGrouper, long endTime) { - super(platform, interval, threadDumper, endTime); + public AsyncSampler(SparkPlatform platform, SamplerSettings settings) { + super(platform, settings); this.profilerAccess = AsyncProfilerAccess.getInstance(platform); - this.dataAggregator = new AsyncDataAggregator(threadGrouper); + this.dataAggregator = new AsyncDataAggregator(settings.threadGrouper()); this.scheduler = Executors.newSingleThreadScheduledExecutor( new ThreadFactoryBuilder().setNameFormat("spark-asyncsampler-worker-thread").build() ); @@ -124,7 +124,9 @@ public class AsyncSampler extends AbstractSampler { previousJob.aggregate(this.dataAggregator); // prune data older than the history size - this.dataAggregator.pruneData(ProfilingWindowUtils.keepHistoryBefore(window)); + IntPredicate predicate = ProfilingWindowUtils.keepHistoryBefore(window); + this.dataAggregator.pruneData(predicate); + this.windowStatisticsCollector.pruneStatistics(predicate); } } catch (Throwable e) { e.printStackTrace(); diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java index 6aad5e3..95c3508 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java @@ -25,8 +25,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import me.lucko.spark.common.SparkPlatform; import me.lucko.spark.common.command.sender.CommandSender; import me.lucko.spark.common.sampler.AbstractSampler; -import me.lucko.spark.common.sampler.ThreadDumper; -import me.lucko.spark.common.sampler.ThreadGrouper; +import me.lucko.spark.common.sampler.SamplerSettings; import me.lucko.spark.common.sampler.node.MergeMode; import me.lucko.spark.common.sampler.source.ClassSourceLookup; import me.lucko.spark.common.sampler.window.ProfilingWindowUtils; @@ -42,6 +41,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.IntPredicate; /** * A sampler implementation using Java (WarmRoast). @@ -66,14 +66,14 @@ public class JavaSampler extends AbstractSampler implements Runnable { /** The last window that was profiled */ private final AtomicInteger lastWindow = new AtomicInteger(); - public JavaSampler(SparkPlatform platform, int interval, ThreadDumper threadDumper, ThreadGrouper threadGrouper, long endTime, boolean ignoreSleeping, boolean ignoreNative) { - super(platform, interval, threadDumper, endTime); - this.dataAggregator = new SimpleDataAggregator(this.workerPool, threadGrouper, interval, ignoreSleeping, ignoreNative); + public JavaSampler(SparkPlatform platform, SamplerSettings settings, boolean ignoreSleeping, boolean ignoreNative) { + super(platform, settings); + this.dataAggregator = new SimpleDataAggregator(this.workerPool, settings.threadGrouper(), settings.interval(), ignoreSleeping, ignoreNative); } - public JavaSampler(SparkPlatform platform, int interval, ThreadDumper threadDumper, ThreadGrouper threadGrouper, long endTime, boolean ignoreSleeping, boolean ignoreNative, TickHook tickHook, int tickLengthThreshold) { - super(platform, interval, threadDumper, endTime); - this.dataAggregator = new TickedDataAggregator(this.workerPool, threadGrouper, interval, ignoreSleeping, ignoreNative, tickHook, tickLengthThreshold); + public JavaSampler(SparkPlatform platform, SamplerSettings settings, boolean ignoreSleeping, boolean ignoreNative, TickHook tickHook, int tickLengthThreshold) { + super(platform, settings); + this.dataAggregator = new TickedDataAggregator(this.workerPool, settings.threadGrouper(), settings.interval(), ignoreSleeping, ignoreNative, tickHook, tickLengthThreshold); } @Override @@ -151,7 +151,9 @@ public class JavaSampler extends AbstractSampler implements Runnable { JavaSampler.this.windowStatisticsCollector.measureNow(previousWindow); // prune data older than the history size - JavaSampler.this.dataAggregator.pruneData(ProfilingWindowUtils.keepHistoryBefore(this.window)); + IntPredicate predicate = ProfilingWindowUtils.keepHistoryBefore(this.window); + JavaSampler.this.dataAggregator.pruneData(predicate); + JavaSampler.this.windowStatisticsCollector.pruneStatistics(predicate); } } } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/node/ThreadNode.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/node/ThreadNode.java index 5035046..37ff359 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/node/ThreadNode.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/node/ThreadNode.java @@ -130,6 +130,7 @@ public final class ThreadNode extends AbstractNode { } } + removeTimeWindows(predicate); return getTimeWindows().isEmpty(); } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/window/WindowStatisticsCollector.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/window/WindowStatisticsCollector.java index 47f739d..7da62fa 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/window/WindowStatisticsCollector.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/window/WindowStatisticsCollector.java @@ -30,6 +30,7 @@ import me.lucko.spark.proto.SparkProtos; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.IntPredicate; /** * Collects statistics for each profiling window. @@ -116,6 +117,10 @@ public class WindowStatisticsCollector { } } + public void pruneStatistics(IntPredicate predicate) { + this.stats.keySet().removeIf(predicate::test); + } + public Map<Integer, SparkProtos.WindowStatistics> export() { return this.stats; } |