aboutsummaryrefslogtreecommitdiff
path: root/spark-common/src/main/java/me/lucko/spark/common/sampler
diff options
context:
space:
mode:
authorLuck <git@lucko.me>2022-11-13 21:24:57 +0000
committerLuck <git@lucko.me>2022-11-13 21:24:57 +0000
commitf2d77d875f32f107987c93da1f90529fc6812444 (patch)
tree1ceef4324fc9ece8f9c4389e2f4da07cb844c3d2 /spark-common/src/main/java/me/lucko/spark/common/sampler
parent76f43ab59d3839600bd9e040ff2d09199ebe778a (diff)
downloadspark-f2d77d875f32f107987c93da1f90529fc6812444.tar.gz
spark-f2d77d875f32f107987c93da1f90529fc6812444.tar.bz2
spark-f2d77d875f32f107987c93da1f90529fc6812444.zip
Background profiler
Diffstat (limited to 'spark-common/src/main/java/me/lucko/spark/common/sampler')
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/AbstractSampler.java19
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java7
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java28
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerContainer.java9
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerSettings.java61
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java14
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java20
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/node/ThreadNode.java1
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/window/WindowStatisticsCollector.java5
9 files changed, 130 insertions, 34 deletions
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/AbstractSampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/AbstractSampler.java
index c650738..feefd66 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/AbstractSampler.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/AbstractSampler.java
@@ -32,8 +32,6 @@ import me.lucko.spark.common.sampler.source.ClassSourceLookup;
import me.lucko.spark.common.sampler.source.SourceMetadata;
import me.lucko.spark.common.sampler.window.ProtoTimeEncoder;
import me.lucko.spark.common.sampler.window.WindowStatisticsCollector;
-import me.lucko.spark.common.tick.TickHook;
-import me.lucko.spark.proto.SparkProtos;
import me.lucko.spark.proto.SparkSamplerProtos.SamplerData;
import me.lucko.spark.proto.SparkSamplerProtos.SamplerMetadata;
@@ -64,6 +62,9 @@ public abstract class AbstractSampler implements Sampler {
/** The unix timestamp (in millis) when this sampler should automatically complete. */
protected final long autoEndTime; // -1 for nothing
+ /** If the sampler is running in the background */
+ protected boolean background;
+
/** Collects statistics for each window in the sample */
protected final WindowStatisticsCollector windowStatisticsCollector;
@@ -73,11 +74,12 @@ public abstract class AbstractSampler implements Sampler {
/** The garbage collector statistics when profiling started */
protected Map<String, GarbageCollectorStatistics> initialGcStats;
- protected AbstractSampler(SparkPlatform platform, int interval, ThreadDumper threadDumper, long autoEndTime) {
+ protected AbstractSampler(SparkPlatform platform, SamplerSettings settings) {
this.platform = platform;
- this.interval = interval;
- this.threadDumper = threadDumper;
- this.autoEndTime = autoEndTime;
+ this.interval = settings.interval();
+ this.threadDumper = settings.threadDumper();
+ this.autoEndTime = settings.autoEndTime();
+ this.background = settings.runningInBackground();
this.windowStatisticsCollector = new WindowStatisticsCollector(platform);
}
@@ -95,6 +97,11 @@ public abstract class AbstractSampler implements Sampler {
}
@Override
+ public boolean isRunningInBackground() {
+ return this.background;
+ }
+
+ @Override
public CompletableFuture<Sampler> getFuture() {
return this.future;
}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java
index e06cba6..5d2026d 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java
@@ -58,6 +58,13 @@ public interface Sampler {
long getAutoEndTime();
/**
+ * If this sampler is running in the background. (wasn't started by a specific user)
+ *
+ * @return true if the sampler is running in the background
+ */
+ boolean isRunningInBackground();
+
+ /**
* Gets a future to encapsulate the completion of the sampler
*
* @return a future
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java
index 382950a..ec635ef 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java
@@ -38,7 +38,8 @@ public class SamplerBuilder {
private boolean ignoreSleeping = false;
private boolean ignoreNative = false;
private boolean useAsyncProfiler = true;
- private long timeout = -1;
+ private long autoEndTime = -1;
+ private boolean background = false;
private ThreadDumper threadDumper = ThreadDumper.ALL;
private ThreadGrouper threadGrouper = ThreadGrouper.BY_NAME;
@@ -57,7 +58,12 @@ public class SamplerBuilder {
if (timeout <= 0) {
throw new IllegalArgumentException("timeout > 0");
}
- this.timeout = System.currentTimeMillis() + unit.toMillis(timeout);
+ this.autoEndTime = System.currentTimeMillis() + unit.toMillis(timeout);
+ return this;
+ }
+
+ public SamplerBuilder background(boolean background) {
+ this.background = background;
return this;
}
@@ -95,26 +101,22 @@ public class SamplerBuilder {
public Sampler start(SparkPlatform platform) {
boolean onlyTicksOverMode = this.ticksOver != -1 && this.tickHook != null;
boolean canUseAsyncProfiler = this.useAsyncProfiler &&
+ !onlyTicksOverMode &&
!(this.ignoreSleeping || this.ignoreNative) &&
!(this.threadDumper instanceof ThreadDumper.Regex) &&
AsyncProfilerAccess.getInstance(platform).checkSupported(platform);
int intervalMicros = (int) (this.samplingInterval * 1000d);
+ SamplerSettings settings = new SamplerSettings(intervalMicros, this.threadDumper, this.threadGrouper, this.autoEndTime, this.background);
Sampler sampler;
- if (onlyTicksOverMode) {
- sampler = new JavaSampler(platform, intervalMicros, this.threadDumper,
- this.threadGrouper, this.timeout, this.ignoreSleeping, this.ignoreNative,
- this.tickHook, this.ticksOver);
-
- } else if (canUseAsyncProfiler) {
- sampler = new AsyncSampler(platform, intervalMicros, this.threadDumper,
- this.threadGrouper, this.timeout);
-
+ if (canUseAsyncProfiler) {
+ sampler = new AsyncSampler(platform, settings);
+ } else if (onlyTicksOverMode) {
+ sampler = new JavaSampler(platform, settings, this.ignoreSleeping, this.ignoreNative, this.tickHook, this.ticksOver);
} else {
- sampler = new JavaSampler(platform, intervalMicros, this.threadDumper,
- this.threadGrouper, this.timeout, this.ignoreSleeping, this.ignoreNative);
+ sampler = new JavaSampler(platform, settings, this.ignoreSleeping, this.ignoreNative);
}
sampler.start();
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerContainer.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerContainer.java
index 55913d8..f56dee5 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerContainer.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerContainer.java
@@ -28,6 +28,11 @@ import java.util.concurrent.atomic.AtomicReference;
public class SamplerContainer implements AutoCloseable {
private final AtomicReference<Sampler> activeSampler = new AtomicReference<>();
+ private final boolean backgroundProfilerEnabled;
+
+ public SamplerContainer(boolean backgroundProfilerEnabled) {
+ this.backgroundProfilerEnabled = backgroundProfilerEnabled;
+ }
/**
* Gets the active sampler, or null if a sampler is not active.
@@ -68,6 +73,10 @@ public class SamplerContainer implements AutoCloseable {
}
}
+ public boolean isBackgroundProfilerEnabled() {
+ return this.backgroundProfilerEnabled;
+ }
+
@Override
public void close() {
stopActiveSampler();
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerSettings.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerSettings.java
new file mode 100644
index 0000000..6e55a43
--- /dev/null
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerSettings.java
@@ -0,0 +1,61 @@
+/*
+ * This file is part of spark.
+ *
+ * Copyright (c) lucko (Luck) <luck@lucko.me>
+ * Copyright (c) contributors
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+package me.lucko.spark.common.sampler;
+
+/**
+ * Base settings for all samplers
+ */
+public class SamplerSettings {
+
+ private final int interval;
+ private final ThreadDumper threadDumper;
+ private final ThreadGrouper threadGrouper;
+ private final long autoEndTime;
+ private final boolean runningInBackground;
+
+ public SamplerSettings(int interval, ThreadDumper threadDumper, ThreadGrouper threadGrouper, long autoEndTime, boolean runningInBackground) {
+ this.interval = interval;
+ this.threadDumper = threadDumper;
+ this.threadGrouper = threadGrouper;
+ this.autoEndTime = autoEndTime;
+ this.runningInBackground = runningInBackground;
+ }
+
+ public int interval() {
+ return this.interval;
+ }
+
+ public ThreadDumper threadDumper() {
+ return this.threadDumper;
+ }
+
+ public ThreadGrouper threadGrouper() {
+ return this.threadGrouper;
+ }
+
+ public long autoEndTime() {
+ return this.autoEndTime;
+ }
+
+ public boolean runningInBackground() {
+ return this.runningInBackground;
+ }
+}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java
index cbc81c7..d6cfd4f 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java
@@ -25,8 +25,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
import me.lucko.spark.common.SparkPlatform;
import me.lucko.spark.common.command.sender.CommandSender;
import me.lucko.spark.common.sampler.AbstractSampler;
-import me.lucko.spark.common.sampler.ThreadDumper;
-import me.lucko.spark.common.sampler.ThreadGrouper;
+import me.lucko.spark.common.sampler.SamplerSettings;
import me.lucko.spark.common.sampler.node.MergeMode;
import me.lucko.spark.common.sampler.source.ClassSourceLookup;
import me.lucko.spark.common.sampler.window.ProfilingWindowUtils;
@@ -36,6 +35,7 @@ import me.lucko.spark.proto.SparkSamplerProtos.SamplerData;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.function.IntPredicate;
/**
* A sampler implementation using async-profiler.
@@ -55,10 +55,10 @@ public class AsyncSampler extends AbstractSampler {
/** The executor used for scheduling and management */
private ScheduledExecutorService scheduler;
- public AsyncSampler(SparkPlatform platform, int interval, ThreadDumper threadDumper, ThreadGrouper threadGrouper, long endTime) {
- super(platform, interval, threadDumper, endTime);
+ public AsyncSampler(SparkPlatform platform, SamplerSettings settings) {
+ super(platform, settings);
this.profilerAccess = AsyncProfilerAccess.getInstance(platform);
- this.dataAggregator = new AsyncDataAggregator(threadGrouper);
+ this.dataAggregator = new AsyncDataAggregator(settings.threadGrouper());
this.scheduler = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat("spark-asyncsampler-worker-thread").build()
);
@@ -124,7 +124,9 @@ public class AsyncSampler extends AbstractSampler {
previousJob.aggregate(this.dataAggregator);
// prune data older than the history size
- this.dataAggregator.pruneData(ProfilingWindowUtils.keepHistoryBefore(window));
+ IntPredicate predicate = ProfilingWindowUtils.keepHistoryBefore(window);
+ this.dataAggregator.pruneData(predicate);
+ this.windowStatisticsCollector.pruneStatistics(predicate);
}
} catch (Throwable e) {
e.printStackTrace();
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java
index 6aad5e3..95c3508 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java
@@ -25,8 +25,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
import me.lucko.spark.common.SparkPlatform;
import me.lucko.spark.common.command.sender.CommandSender;
import me.lucko.spark.common.sampler.AbstractSampler;
-import me.lucko.spark.common.sampler.ThreadDumper;
-import me.lucko.spark.common.sampler.ThreadGrouper;
+import me.lucko.spark.common.sampler.SamplerSettings;
import me.lucko.spark.common.sampler.node.MergeMode;
import me.lucko.spark.common.sampler.source.ClassSourceLookup;
import me.lucko.spark.common.sampler.window.ProfilingWindowUtils;
@@ -42,6 +41,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.IntPredicate;
/**
* A sampler implementation using Java (WarmRoast).
@@ -66,14 +66,14 @@ public class JavaSampler extends AbstractSampler implements Runnable {
/** The last window that was profiled */
private final AtomicInteger lastWindow = new AtomicInteger();
- public JavaSampler(SparkPlatform platform, int interval, ThreadDumper threadDumper, ThreadGrouper threadGrouper, long endTime, boolean ignoreSleeping, boolean ignoreNative) {
- super(platform, interval, threadDumper, endTime);
- this.dataAggregator = new SimpleDataAggregator(this.workerPool, threadGrouper, interval, ignoreSleeping, ignoreNative);
+ public JavaSampler(SparkPlatform platform, SamplerSettings settings, boolean ignoreSleeping, boolean ignoreNative) {
+ super(platform, settings);
+ this.dataAggregator = new SimpleDataAggregator(this.workerPool, settings.threadGrouper(), settings.interval(), ignoreSleeping, ignoreNative);
}
- public JavaSampler(SparkPlatform platform, int interval, ThreadDumper threadDumper, ThreadGrouper threadGrouper, long endTime, boolean ignoreSleeping, boolean ignoreNative, TickHook tickHook, int tickLengthThreshold) {
- super(platform, interval, threadDumper, endTime);
- this.dataAggregator = new TickedDataAggregator(this.workerPool, threadGrouper, interval, ignoreSleeping, ignoreNative, tickHook, tickLengthThreshold);
+ public JavaSampler(SparkPlatform platform, SamplerSettings settings, boolean ignoreSleeping, boolean ignoreNative, TickHook tickHook, int tickLengthThreshold) {
+ super(platform, settings);
+ this.dataAggregator = new TickedDataAggregator(this.workerPool, settings.threadGrouper(), settings.interval(), ignoreSleeping, ignoreNative, tickHook, tickLengthThreshold);
}
@Override
@@ -151,7 +151,9 @@ public class JavaSampler extends AbstractSampler implements Runnable {
JavaSampler.this.windowStatisticsCollector.measureNow(previousWindow);
// prune data older than the history size
- JavaSampler.this.dataAggregator.pruneData(ProfilingWindowUtils.keepHistoryBefore(this.window));
+ IntPredicate predicate = ProfilingWindowUtils.keepHistoryBefore(this.window);
+ JavaSampler.this.dataAggregator.pruneData(predicate);
+ JavaSampler.this.windowStatisticsCollector.pruneStatistics(predicate);
}
}
}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/node/ThreadNode.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/node/ThreadNode.java
index 5035046..37ff359 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/node/ThreadNode.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/node/ThreadNode.java
@@ -130,6 +130,7 @@ public final class ThreadNode extends AbstractNode {
}
}
+ removeTimeWindows(predicate);
return getTimeWindows().isEmpty();
}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/window/WindowStatisticsCollector.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/window/WindowStatisticsCollector.java
index 47f739d..7da62fa 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/window/WindowStatisticsCollector.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/window/WindowStatisticsCollector.java
@@ -30,6 +30,7 @@ import me.lucko.spark.proto.SparkProtos;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.IntPredicate;
/**
* Collects statistics for each profiling window.
@@ -116,6 +117,10 @@ public class WindowStatisticsCollector {
}
}
+ public void pruneStatistics(IntPredicate predicate) {
+ this.stats.keySet().removeIf(predicate::test);
+ }
+
public Map<Integer, SparkProtos.WindowStatistics> export() {
return this.stats;
}