aboutsummaryrefslogtreecommitdiff
path: root/spark-common/src/main/java/me/lucko
diff options
context:
space:
mode:
authorlucko <git@lucko.me>2023-01-08 15:21:32 +0000
committerGitHub <noreply@github.com>2023-01-08 15:21:32 +0000
commitd83e49128ad59308f4b3ff19cf4b22b53236be8d (patch)
treebbd118c94f50f00b1cfc5da61e91b8c0ff0eedc7 /spark-common/src/main/java/me/lucko
parentdcdaacd7deb40be939bf91379f7391c02481cc48 (diff)
downloadspark-d83e49128ad59308f4b3ff19cf4b22b53236be8d.tar.gz
spark-d83e49128ad59308f4b3ff19cf4b22b53236be8d.tar.bz2
spark-d83e49128ad59308f4b3ff19cf4b22b53236be8d.zip
Allocation profiler (#290)
Diffstat (limited to 'spark-common/src/main/java/me/lucko')
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/command/modules/SamplerModule.java29
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/AbstractSampler.java3
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java7
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java38
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerMode.java74
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncDataAggregator.java2
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerAccess.java25
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerJob.java55
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java45
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/async/ProfileSegment.java14
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/async/SampleCollector.java154
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java9
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/window/ProtoTimeEncoder.java14
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/window/WindowStatisticsCollector.java32
14 files changed, 428 insertions, 73 deletions
diff --git a/spark-common/src/main/java/me/lucko/spark/common/command/modules/SamplerModule.java b/spark-common/src/main/java/me/lucko/spark/common/command/modules/SamplerModule.java
index cd00f0d..041cacf 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/command/modules/SamplerModule.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/command/modules/SamplerModule.java
@@ -33,6 +33,7 @@ import me.lucko.spark.common.command.tabcomplete.CompletionSupplier;
import me.lucko.spark.common.command.tabcomplete.TabCompleter;
import me.lucko.spark.common.sampler.Sampler;
import me.lucko.spark.common.sampler.SamplerBuilder;
+import me.lucko.spark.common.sampler.SamplerMode;
import me.lucko.spark.common.sampler.ThreadDumper;
import me.lucko.spark.common.sampler.ThreadGrouper;
import me.lucko.spark.common.sampler.async.AsyncSampler;
@@ -80,6 +81,7 @@ public class SamplerModule implements CommandModule {
.argumentUsage("start", "thread", "thread name")
.argumentUsage("start", "only-ticks-over", "tick length millis")
.argumentUsage("start", "interval", "interval millis")
+ .argumentUsage("start", "alloc", null)
.argumentUsage("stop", "", null)
.argumentUsage("cancel", "", null)
.executor(this::profiler)
@@ -94,7 +96,7 @@ public class SamplerModule implements CommandModule {
}
if (subCommand.equals("start")) {
opts = new ArrayList<>(Arrays.asList("--timeout", "--regex", "--combine-all",
- "--not-combined", "--interval", "--only-ticks-over", "--force-java-sampler"));
+ "--not-combined", "--interval", "--only-ticks-over", "--force-java-sampler", "--alloc", "--alloc-live-only"));
opts.removeAll(arguments);
opts.add("--thread"); // allowed multiple times
}
@@ -166,9 +168,12 @@ public class SamplerModule implements CommandModule {
"Consider setting a timeout value over 30 seconds."));
}
- double intervalMillis = arguments.doubleFlag("interval");
- if (intervalMillis <= 0) {
- intervalMillis = 4;
+ SamplerMode mode = arguments.boolFlag("alloc") ? SamplerMode.ALLOCATION : SamplerMode.EXECUTION;
+ boolean allocLiveOnly = arguments.boolFlag("alloc-live-only");
+
+ double interval = arguments.doubleFlag("interval");
+ if (interval <= 0) {
+ interval = mode.defaultInterval();
}
boolean ignoreSleeping = arguments.boolFlag("ignore-sleeping");
@@ -213,23 +218,33 @@ public class SamplerModule implements CommandModule {
resp.broadcastPrefixed(text("Starting a new profiler, please wait..."));
SamplerBuilder builder = new SamplerBuilder();
+ builder.mode(mode);
builder.threadDumper(threadDumper);
builder.threadGrouper(threadGrouper);
if (timeoutSeconds != -1) {
builder.completeAfter(timeoutSeconds, TimeUnit.SECONDS);
}
- builder.samplingInterval(intervalMillis);
+ builder.samplingInterval(interval);
builder.ignoreSleeping(ignoreSleeping);
builder.ignoreNative(ignoreNative);
builder.forceJavaSampler(forceJavaSampler);
+ builder.allocLiveOnly(allocLiveOnly);
if (ticksOver != -1) {
builder.ticksOver(ticksOver, tickHook);
}
- Sampler sampler = builder.start(platform);
+
+ Sampler sampler;
+ try {
+ sampler = builder.start(platform);
+ } catch (UnsupportedOperationException e) {
+ resp.replyPrefixed(text(e.getMessage(), RED));
+ return;
+ }
+
platform.getSamplerContainer().setActiveSampler(sampler);
resp.broadcastPrefixed(text()
- .append(text("Profiler is now running!", GOLD))
+ .append(text((mode == SamplerMode.ALLOCATION ? "Allocation Profiler" : "Profiler") + " is now running!", GOLD))
.append(space())
.append(text("(" + (sampler instanceof AsyncSampler ? "async" : "built-in java") + ")", DARK_GRAY))
.build()
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/AbstractSampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/AbstractSampler.java
index e324fd3..5abe71f 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/AbstractSampler.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/AbstractSampler.java
@@ -126,6 +126,7 @@ public abstract class AbstractSampler implements Sampler {
protected void writeMetadataToProto(SamplerData.Builder proto, SparkPlatform platform, CommandSender creator, String comment, DataAggregator dataAggregator) {
SamplerMetadata.Builder metadata = SamplerMetadata.newBuilder()
+ .setSamplerMode(getMode().asProto())
.setPlatformMetadata(platform.getPlugin().getPlatformInfo().toData().toProto())
.setCreator(creator.toData().toProto())
.setStartTime(this.startTime)
@@ -187,7 +188,7 @@ public abstract class AbstractSampler implements Sampler {
ClassSourceLookup.Visitor classSourceVisitor = ClassSourceLookup.createVisitor(classSourceLookup);
- ProtoTimeEncoder timeEncoder = new ProtoTimeEncoder(data);
+ ProtoTimeEncoder timeEncoder = new ProtoTimeEncoder(getMode().valueTransformer(), data);
int[] timeWindows = timeEncoder.getKeys();
for (int timeWindow : timeWindows) {
proto.addTimeWindows(timeWindow);
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java
index 36a63f1..aaf4f38 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java
@@ -65,6 +65,13 @@ public interface Sampler {
boolean isRunningInBackground();
/**
+ * Gets the sampler mode.
+ *
+ * @return the sampler mode
+ */
+ SamplerMode getMode();
+
+ /**
* Gets a future to encapsulate the completion of the sampler
*
* @return a future
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java
index ec635ef..7891a98 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java
@@ -23,6 +23,7 @@ package me.lucko.spark.common.sampler;
import me.lucko.spark.common.SparkPlatform;
import me.lucko.spark.common.sampler.async.AsyncProfilerAccess;
import me.lucko.spark.common.sampler.async.AsyncSampler;
+import me.lucko.spark.common.sampler.async.SampleCollector;
import me.lucko.spark.common.sampler.java.JavaSampler;
import me.lucko.spark.common.tick.TickHook;
@@ -34,10 +35,12 @@ import java.util.concurrent.TimeUnit;
@SuppressWarnings("UnusedReturnValue")
public class SamplerBuilder {
- private double samplingInterval = 4; // milliseconds
+ private SamplerMode mode = SamplerMode.EXECUTION;
+ private double samplingInterval = -1;
private boolean ignoreSleeping = false;
private boolean ignoreNative = false;
private boolean useAsyncProfiler = true;
+ private boolean allocLiveOnly = false;
private long autoEndTime = -1;
private boolean background = false;
private ThreadDumper threadDumper = ThreadDumper.ALL;
@@ -49,6 +52,11 @@ public class SamplerBuilder {
public SamplerBuilder() {
}
+ public SamplerBuilder mode(SamplerMode mode) {
+ this.mode = mode;
+ return this;
+ }
+
public SamplerBuilder samplingInterval(double samplingInterval) {
this.samplingInterval = samplingInterval;
return this;
@@ -98,7 +106,16 @@ public class SamplerBuilder {
return this;
}
- public Sampler start(SparkPlatform platform) {
+ public SamplerBuilder allocLiveOnly(boolean allocLiveOnly) {
+ this.allocLiveOnly = allocLiveOnly;
+ return this;
+ }
+
+ public Sampler start(SparkPlatform platform) throws UnsupportedOperationException {
+ if (this.samplingInterval <= 0) {
+ throw new IllegalArgumentException("samplingInterval = " + this.samplingInterval);
+ }
+
boolean onlyTicksOverMode = this.ticksOver != -1 && this.tickHook != null;
boolean canUseAsyncProfiler = this.useAsyncProfiler &&
!onlyTicksOverMode &&
@@ -106,13 +123,22 @@ public class SamplerBuilder {
!(this.threadDumper instanceof ThreadDumper.Regex) &&
AsyncProfilerAccess.getInstance(platform).checkSupported(platform);
+ if (this.mode == SamplerMode.ALLOCATION && (!canUseAsyncProfiler || !AsyncProfilerAccess.getInstance(platform).checkAllocationProfilingSupported(platform))) {
+ throw new UnsupportedOperationException("Allocation profiling is not supported on your system. Check the console for more info.");
+ }
+
+ int interval = (int) (this.mode == SamplerMode.EXECUTION ?
+ this.samplingInterval * 1000d : // convert to microseconds
+ this.samplingInterval
+ );
- int intervalMicros = (int) (this.samplingInterval * 1000d);
- SamplerSettings settings = new SamplerSettings(intervalMicros, this.threadDumper, this.threadGrouper, this.autoEndTime, this.background);
+ SamplerSettings settings = new SamplerSettings(interval, this.threadDumper, this.threadGrouper, this.autoEndTime, this.background);
Sampler sampler;
- if (canUseAsyncProfiler) {
- sampler = new AsyncSampler(platform, settings);
+ if (this.mode == SamplerMode.ALLOCATION) {
+ sampler = new AsyncSampler(platform, settings, new SampleCollector.Allocation(interval, this.allocLiveOnly));
+ } else if (canUseAsyncProfiler) {
+ sampler = new AsyncSampler(platform, settings, new SampleCollector.Execution(interval));
} else if (onlyTicksOverMode) {
sampler = new JavaSampler(platform, settings, this.ignoreSleeping, this.ignoreNative, this.tickHook, this.ticksOver);
} else {
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerMode.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerMode.java
new file mode 100644
index 0000000..f9a6e41
--- /dev/null
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerMode.java
@@ -0,0 +1,74 @@
+/*
+ * This file is part of spark.
+ *
+ * Copyright (c) lucko (Luck) <luck@lucko.me>
+ * Copyright (c) contributors
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+package me.lucko.spark.common.sampler;
+
+import me.lucko.spark.proto.SparkSamplerProtos.SamplerMetadata;
+
+import java.util.function.LongToDoubleFunction;
+
+public enum SamplerMode {
+
+ EXECUTION(
+ value -> {
+ // convert the duration from microseconds -> milliseconds
+ return value / 1000d;
+ },
+ 4, // ms
+ SamplerMetadata.SamplerMode.EXECUTION
+ ),
+
+ ALLOCATION(
+ value -> {
+ // do nothing
+ return value;
+ },
+ 524287, // 512 KiB
+ SamplerMetadata.SamplerMode.ALLOCATION
+ );
+
+ private final LongToDoubleFunction valueTransformer;
+ private final int defaultInterval;
+ private final SamplerMetadata.SamplerMode proto;
+
+ SamplerMode(LongToDoubleFunction valueTransformer, int defaultInterval, SamplerMetadata.SamplerMode proto) {
+ this.valueTransformer = valueTransformer;
+ this.defaultInterval = defaultInterval;
+ this.proto = proto;
+ }
+
+ public LongToDoubleFunction valueTransformer() {
+ return this.valueTransformer;
+ }
+
+ public int defaultInterval() {
+ return this.defaultInterval;
+ }
+
+ /**
+ * Gets the metadata enum instance for this sampler mode.
+ *
+ * @return proto metadata
+ */
+ public SamplerMetadata.SamplerMode asProto() {
+ return this.proto;
+ }
+
+}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncDataAggregator.java
index 402330a..b9a80e0 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncDataAggregator.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncDataAggregator.java
@@ -50,7 +50,7 @@ public class AsyncDataAggregator extends AbstractDataAggregator {
public void insertData(ProfileSegment element, int window) {
try {
ThreadNode node = getNode(this.threadGrouper.getGroup(element.getNativeThreadId(), element.getThreadName()));
- node.log(STACK_TRACE_DESCRIBER, element.getStackTrace(), element.getTime(), window);
+ node.log(STACK_TRACE_DESCRIBER, element.getStackTrace(), element.getValue(), window);
} catch (Exception e) {
e.printStackTrace();
}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerAccess.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerAccess.java
index 1480650..5bee56f 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerAccess.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerAccess.java
@@ -61,6 +61,8 @@ public class AsyncProfilerAccess {
/** The event to use for profiling */
private final ProfilingEvent profilingEvent;
+ /** The event to use for allocation profiling */
+ private final ProfilingEvent allocationProfilingEvent;
/** If profiler is null, contains the reason why setup failed */
private final Exception setupException;
@@ -68,10 +70,16 @@ public class AsyncProfilerAccess {
AsyncProfilerAccess(SparkPlatform platform) {
AsyncProfiler profiler;
ProfilingEvent profilingEvent = null;
+ ProfilingEvent allocationProfilingEvent = null;
Exception setupException = null;
try {
profiler = load(platform);
+
+ if (isEventSupported(profiler, ProfilingEvent.ALLOC, false)) {
+ allocationProfilingEvent = ProfilingEvent.ALLOC;
+ }
+
if (isEventSupported(profiler, ProfilingEvent.CPU, false)) {
profilingEvent = ProfilingEvent.CPU;
} else if (isEventSupported(profiler, ProfilingEvent.WALL, true)) {
@@ -84,6 +92,7 @@ public class AsyncProfilerAccess {
this.profiler = profiler;
this.profilingEvent = profilingEvent;
+ this.allocationProfilingEvent = allocationProfilingEvent;
this.setupException = setupException;
}
@@ -98,6 +107,10 @@ public class AsyncProfilerAccess {
return this.profilingEvent;
}
+ public ProfilingEvent getAllocationProfilingEvent() {
+ return this.allocationProfilingEvent;
+ }
+
public boolean checkSupported(SparkPlatform platform) {
if (this.setupException != null) {
if (this.setupException instanceof UnsupportedSystemException) {
@@ -116,6 +129,15 @@ public class AsyncProfilerAccess {
return this.profiler != null;
}
+ public boolean checkAllocationProfilingSupported(SparkPlatform platform) {
+ boolean supported = this.allocationProfilingEvent != null;
+ if (!supported && this.profiler != null) {
+ platform.getPlugin().log(Level.WARNING, "The allocation profiling mode is not supported on your system. This is most likely because Hotspot debug symbols are not available.");
+ platform.getPlugin().log(Level.WARNING, "To resolve, try installing the 'openjdk-11-dbg' or 'openjdk-8-dbg' package using your OS package manager.");
+ }
+ return supported;
+ }
+
private static AsyncProfiler load(SparkPlatform platform) throws Exception {
// check compatibility
String os = System.getProperty("os.name").toLowerCase(Locale.ROOT).replace(" ", "");
@@ -183,7 +205,8 @@ public class AsyncProfilerAccess {
enum ProfilingEvent {
CPU(Events.CPU),
- WALL(Events.WALL);
+ WALL(Events.WALL),
+ ALLOC(Events.ALLOC);
private final String id;
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerJob.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerJob.java
index d74b75f..039d4ba 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerJob.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerJob.java
@@ -20,6 +20,8 @@
package me.lucko.spark.common.sampler.async;
+import com.google.common.collect.ImmutableList;
+
import me.lucko.spark.common.SparkPlatform;
import me.lucko.spark.common.sampler.ThreadDumper;
import me.lucko.spark.common.sampler.async.jfr.JfrReader;
@@ -29,8 +31,8 @@ import one.profiler.AsyncProfiler;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
+import java.util.Collection;
import java.util.List;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
@@ -77,8 +79,8 @@ public class AsyncProfilerJob {
// Set on init
/** The platform */
private SparkPlatform platform;
- /** The sampling interval in microseconds */
- private int interval;
+ /** The sample collector */
+ private SampleCollector<?> sampleCollector;
/** The thread dumper */
private ThreadDumper threadDumper;
/** The profiling window */
@@ -100,9 +102,9 @@ public class AsyncProfilerJob {
* @param command the command
* @return the output
*/
- private String execute(String command) {
+ private String execute(Collection<String> command) {
try {
- return this.profiler.execute(command);
+ return this.profiler.execute(String.join(",", command));
} catch (IOException e) {
throw new RuntimeException("Exception whilst executing profiler command", e);
}
@@ -118,9 +120,9 @@ public class AsyncProfilerJob {
}
// Initialise the job
- public void init(SparkPlatform platform, int interval, ThreadDumper threadDumper, int window, boolean quiet) {
+ public void init(SparkPlatform platform, SampleCollector<?> collector, ThreadDumper threadDumper, int window, boolean quiet) {
this.platform = platform;
- this.interval = interval;
+ this.sampleCollector = collector;
this.threadDumper = threadDumper;
this.window = window;
this.quiet = quiet;
@@ -141,16 +143,20 @@ public class AsyncProfilerJob {
}
// construct a command to send to async-profiler
- String command = "start,event=" + this.access.getProfilingEvent() + ",interval=" + this.interval + "us,threads,jfr,file=" + this.outputFile.toString();
+ ImmutableList.Builder<String> command = ImmutableList.<String>builder()
+ .add("start")
+ .addAll(this.sampleCollector.initArguments(this.access))
+ .add("threads").add("jfr").add("file=" + this.outputFile.toString());
+
if (this.quiet) {
- command += ",loglevel=NONE";
+ command.add("loglevel=NONE");
}
if (this.threadDumper instanceof ThreadDumper.Specific) {
- command += ",filter";
+ command.add("filter");
}
// start the profiler
- String resp = execute(command).trim();
+ String resp = execute(command.build()).trim();
if (!resp.equalsIgnoreCase("profiling started")) {
throw new RuntimeException("Unexpected response: " + resp);
@@ -208,7 +214,7 @@ public class AsyncProfilerJob {
// read the jfr file produced by async-profiler
try (JfrReader reader = new JfrReader(this.outputFile)) {
- readSegments(reader, threadFilter, dataAggregator, this.window);
+ readSegments(reader, this.sampleCollector, threadFilter, dataAggregator);
} catch (Exception e) {
boolean fileExists;
try {
@@ -235,22 +241,9 @@ public class AsyncProfilerJob {
}
}
- private void readSegments(JfrReader reader, Predicate<String> threadFilter, AsyncDataAggregator dataAggregator, int window) throws IOException {
- List<JfrReader.ExecutionSample> samples = reader.readAllEvents(JfrReader.ExecutionSample.class);
- for (int i = 0; i < samples.size(); i++) {
- JfrReader.ExecutionSample sample = samples.get(i);
-
- long duration;
- if (i == 0) {
- // we don't really know the duration of the first sample, so just use the sampling
- // interval
- duration = this.interval;
- } else {
- // calculate the duration of the sample by calculating the time elapsed since the
- // previous sample
- duration = TimeUnit.NANOSECONDS.toMicros(sample.time - samples.get(i - 1).time);
- }
-
+ private <E extends JfrReader.Event> void readSegments(JfrReader reader, SampleCollector<E> collector, Predicate<String> threadFilter, AsyncDataAggregator dataAggregator) throws IOException {
+ List<E> samples = reader.readAllEvents(collector.eventClass());
+ for (E sample : samples) {
String threadName = reader.threads.get((long) sample.tid);
if (threadName == null) {
continue;
@@ -260,9 +253,11 @@ public class AsyncProfilerJob {
continue;
}
+ long value = collector.measure(sample);
+
// parse the segment and give it to the data aggregator
- ProfileSegment segment = ProfileSegment.parseSegment(reader, sample, threadName, duration);
- dataAggregator.insertData(segment, window);
+ ProfileSegment segment = ProfileSegment.parseSegment(reader, sample, threadName, value);
+ dataAggregator.insertData(segment, this.window);
}
}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java
index 178f055..2328582 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java
@@ -25,6 +25,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
import me.lucko.spark.common.SparkPlatform;
import me.lucko.spark.common.command.sender.CommandSender;
import me.lucko.spark.common.sampler.AbstractSampler;
+import me.lucko.spark.common.sampler.SamplerMode;
import me.lucko.spark.common.sampler.SamplerSettings;
import me.lucko.spark.common.sampler.node.MergeMode;
import me.lucko.spark.common.sampler.source.ClassSourceLookup;
@@ -41,6 +42,11 @@ import java.util.function.IntPredicate;
* A sampler implementation using async-profiler.
*/
public class AsyncSampler extends AbstractSampler {
+
+ /** Function to collect and measure samples - either execution or allocation */
+ private final SampleCollector<?> sampleCollector;
+
+ /** Object that provides access to the async-profiler API */
private final AsyncProfilerAccess profilerAccess;
/** Responsible for aggregating and then outputting collected sampling data */
@@ -55,8 +61,9 @@ public class AsyncSampler extends AbstractSampler {
/** The executor used for scheduling and management */
private ScheduledExecutorService scheduler;
- public AsyncSampler(SparkPlatform platform, SamplerSettings settings) {
+ public AsyncSampler(SparkPlatform platform, SamplerSettings settings, SampleCollector<?> collector) {
super(platform, settings);
+ this.sampleCollector = collector;
this.profilerAccess = AsyncProfilerAccess.getInstance(platform);
this.dataAggregator = new AsyncDataAggregator(settings.threadGrouper());
this.scheduler = Executors.newSingleThreadScheduledExecutor(
@@ -79,17 +86,21 @@ public class AsyncSampler extends AbstractSampler {
int window = ProfilingWindowUtils.windowNow();
AsyncProfilerJob job = this.profilerAccess.startNewProfilerJob();
- job.init(this.platform, this.interval, this.threadDumper, window, this.background);
+ job.init(this.platform, this.sampleCollector, this.threadDumper, window, this.background);
job.start();
+ this.windowStatisticsCollector.recordWindowStartTime(window);
this.currentJob = job;
// rotate the sampler job to put data into a new window
- this.scheduler.scheduleAtFixedRate(
- this::rotateProfilerJob,
- ProfilingWindowUtils.WINDOW_SIZE_SECONDS,
- ProfilingWindowUtils.WINDOW_SIZE_SECONDS,
- TimeUnit.SECONDS
- );
+ boolean shouldNotRotate = this.sampleCollector instanceof SampleCollector.Allocation && ((SampleCollector.Allocation) this.sampleCollector).isLiveOnly();
+ if (!shouldNotRotate) {
+ this.scheduler.scheduleAtFixedRate(
+ this::rotateProfilerJob,
+ ProfilingWindowUtils.WINDOW_SIZE_SECONDS,
+ ProfilingWindowUtils.WINDOW_SIZE_SECONDS,
+ TimeUnit.SECONDS
+ );
+ }
recordInitialGcStats();
scheduleTimeout();
@@ -106,9 +117,6 @@ public class AsyncSampler extends AbstractSampler {
try {
// stop the previous job
previousJob.stop();
-
- // collect statistics for the window
- this.windowStatisticsCollector.measureNow(previousJob.getWindow());
} catch (Exception e) {
e.printStackTrace();
}
@@ -116,10 +124,18 @@ public class AsyncSampler extends AbstractSampler {
// start a new job
int window = previousJob.getWindow() + 1;
AsyncProfilerJob newJob = this.profilerAccess.startNewProfilerJob();
- newJob.init(this.platform, this.interval, this.threadDumper, window, this.background);
+ newJob.init(this.platform, this.sampleCollector, this.threadDumper, window, this.background);
newJob.start();
+ this.windowStatisticsCollector.recordWindowStartTime(window);
this.currentJob = newJob;
+ // collect statistics for the previous window
+ try {
+ this.windowStatisticsCollector.measureNow(previousJob.getWindow());
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
// aggregate the output of the previous job
previousJob.aggregate(this.dataAggregator);
@@ -174,6 +190,11 @@ public class AsyncSampler extends AbstractSampler {
}
@Override
+ public SamplerMode getMode() {
+ return this.sampleCollector.getMode();
+ }
+
+ @Override
public SamplerData toProto(SparkPlatform platform, CommandSender creator, String comment, MergeMode mergeMode, ClassSourceLookup classSourceLookup) {
SamplerData.Builder proto = SamplerData.newBuilder();
writeMetadataToProto(proto, platform, creator, comment, this.dataAggregator);
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/ProfileSegment.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/ProfileSegment.java
index 26debaf..0804ccf 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/ProfileSegment.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/ProfileSegment.java
@@ -38,13 +38,13 @@ public class ProfileSegment {
/** The stack trace for this segment */
private final AsyncStackTraceElement[] stackTrace;
/** The time spent executing this segment in microseconds */
- private final long time;
+ private final long value;
- public ProfileSegment(int nativeThreadId, String threadName, AsyncStackTraceElement[] stackTrace, long time) {
+ public ProfileSegment(int nativeThreadId, String threadName, AsyncStackTraceElement[] stackTrace, long value) {
this.nativeThreadId = nativeThreadId;
this.threadName = threadName;
this.stackTrace = stackTrace;
- this.time = time;
+ this.value = value;
}
public int getNativeThreadId() {
@@ -59,11 +59,11 @@ public class ProfileSegment {
return this.stackTrace;
}
- public long getTime() {
- return this.time;
+ public long getValue() {
+ return this.value;
}
- public static ProfileSegment parseSegment(JfrReader reader, JfrReader.ExecutionSample sample, String threadName, long duration) {
+ public static ProfileSegment parseSegment(JfrReader reader, JfrReader.Event sample, String threadName, long value) {
JfrReader.StackTrace stackTrace = reader.stackTraces.get(sample.stackTraceId);
int len = stackTrace.methods.length;
@@ -72,7 +72,7 @@ public class ProfileSegment {
stack[i] = parseStackFrame(reader, stackTrace.methods[i]);
}
- return new ProfileSegment(sample.tid, threadName, stack, duration);
+ return new ProfileSegment(sample.tid, threadName, stack, value);
}
private static AsyncStackTraceElement parseStackFrame(JfrReader reader, long methodId) {
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/SampleCollector.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/SampleCollector.java
new file mode 100644
index 0000000..6054b91
--- /dev/null
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/SampleCollector.java
@@ -0,0 +1,154 @@
+/*
+ * This file is part of spark.
+ *
+ * Copyright (c) lucko (Luck) <luck@lucko.me>
+ * Copyright (c) contributors
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+package me.lucko.spark.common.sampler.async;
+
+import com.google.common.collect.ImmutableList;
+
+import me.lucko.spark.common.sampler.SamplerMode;
+import me.lucko.spark.common.sampler.async.AsyncProfilerAccess.ProfilingEvent;
+import me.lucko.spark.common.sampler.async.jfr.JfrReader.AllocationSample;
+import me.lucko.spark.common.sampler.async.jfr.JfrReader.Event;
+import me.lucko.spark.common.sampler.async.jfr.JfrReader.ExecutionSample;
+
+import java.util.Collection;
+import java.util.Objects;
+
+/**
+ * Collects and processes sample events for a given type.
+ *
+ * @param <E> the event type
+ */
+public interface SampleCollector<E extends Event> {
+
+ /**
+ * Gets the arguments to initialise the profiler.
+ *
+ * @param access the async profiler access object
+ * @return the initialisation arguments
+ */
+ Collection<String> initArguments(AsyncProfilerAccess access);
+
+ /**
+ * Gets the event class processed by this sample collector.
+ *
+ * @return the event class
+ */
+ Class<E> eventClass();
+
+ /**
+ * Gets the measurements for a given event
+ *
+ * @param event the event
+ * @return the measurement
+ */
+ long measure(E event);
+
+ /**
+ * Gets the mode for the collector.
+ *
+ * @return the mode
+ */
+ SamplerMode getMode();
+
+ /**
+ * Sample collector for execution (cpu time) profiles.
+ */
+ final class Execution implements SampleCollector<ExecutionSample> {
+ private final int interval; // time in microseconds
+
+ public Execution(int interval) {
+ this.interval = interval;
+ }
+
+ @Override
+ public Collection<String> initArguments(AsyncProfilerAccess access) {
+ ProfilingEvent event = access.getProfilingEvent();
+ Objects.requireNonNull(event, "event");
+
+ return ImmutableList.of(
+ "event=" + event,
+ "interval=" + this.interval + "us"
+ );
+ }
+
+ @Override
+ public Class<ExecutionSample> eventClass() {
+ return ExecutionSample.class;
+ }
+
+ @Override
+ public long measure(ExecutionSample event) {
+ return event.value() * this.interval;
+ }
+
+ @Override
+ public SamplerMode getMode() {
+ return SamplerMode.EXECUTION;
+ }
+ }
+
+ /**
+ * Sample collector for allocation (memory) profiles.
+ */
+ final class Allocation implements SampleCollector<AllocationSample> {
+ private final int intervalBytes;
+ private final boolean liveOnly;
+
+ public Allocation(int intervalBytes, boolean liveOnly) {
+ this.intervalBytes = intervalBytes;
+ this.liveOnly = liveOnly;
+ }
+
+ public boolean isLiveOnly() {
+ return this.liveOnly;
+ }
+
+ @Override
+ public Collection<String> initArguments(AsyncProfilerAccess access) {
+ ProfilingEvent event = access.getAllocationProfilingEvent();
+ Objects.requireNonNull(event, "event");
+
+ ImmutableList.Builder<String> builder = ImmutableList.builder();
+ builder.add("event=" + event);
+ builder.add("alloc=" + this.intervalBytes);
+ if (this.liveOnly) {
+ builder.add("live");
+ }
+ return builder.build();
+ }
+
+ @Override
+ public Class<AllocationSample> eventClass() {
+ return AllocationSample.class;
+ }
+
+ @Override
+ public long measure(AllocationSample event) {
+ return event.value();
+ }
+
+ @Override
+ public SamplerMode getMode() {
+ return SamplerMode.ALLOCATION;
+ }
+ }
+
+}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java
index 72a37e8..d5c965f 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java
@@ -25,6 +25,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
import me.lucko.spark.common.SparkPlatform;
import me.lucko.spark.common.command.sender.CommandSender;
import me.lucko.spark.common.sampler.AbstractSampler;
+import me.lucko.spark.common.sampler.SamplerMode;
import me.lucko.spark.common.sampler.SamplerSettings;
import me.lucko.spark.common.sampler.node.MergeMode;
import me.lucko.spark.common.sampler.source.ClassSourceLookup;
@@ -90,6 +91,7 @@ public class JavaSampler extends AbstractSampler implements Runnable {
}
}
+ this.windowStatisticsCollector.recordWindowStartTime(ProfilingWindowUtils.unixMillisToWindow(this.startTime));
this.task = this.workerPool.scheduleAtFixedRate(this, 0, this.interval, TimeUnit.MICROSECONDS);
}
@@ -149,6 +151,9 @@ public class JavaSampler extends AbstractSampler implements Runnable {
int previousWindow = JavaSampler.this.lastWindow.getAndUpdate(previous -> Math.max(this.window, previous));
if (previousWindow != 0 && previousWindow != this.window) {
+ // record the start time for the new window
+ JavaSampler.this.windowStatisticsCollector.recordWindowStartTime(this.window);
+
// collect statistics for the previous window
JavaSampler.this.windowStatisticsCollector.measureNow(previousWindow);
@@ -168,4 +173,8 @@ public class JavaSampler extends AbstractSampler implements Runnable {
return proto.build();
}
+ @Override
+ public SamplerMode getMode() {
+ return SamplerMode.EXECUTION;
+ }
}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/window/ProtoTimeEncoder.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/window/ProtoTimeEncoder.java
index 03da075..fb4a4fc 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/window/ProtoTimeEncoder.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/window/ProtoTimeEncoder.java
@@ -27,18 +27,25 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.LongAdder;
+import java.util.function.LongToDoubleFunction;
import java.util.stream.IntStream;
/**
* Encodes a map of int->double into a double array.
*/
public class ProtoTimeEncoder {
+
+ /** A transformer function to transform the 'time' value from a long to a double */
+ private final LongToDoubleFunction valueTransformer;
+
/** A sorted array of all possible keys to encode */
private final int[] keys;
/** A map of key value -> index in the keys array */
private final Map<Integer, Integer> keysToIndex;
- public ProtoTimeEncoder(List<ThreadNode> sourceData) {
+ public ProtoTimeEncoder(LongToDoubleFunction valueTransformer, List<ThreadNode> sourceData) {
+ this.valueTransformer = valueTransformer;
+
// get an array of all keys that show up in the source data
this.keys = sourceData.stream()
.map(n -> n.getTimeWindows().stream().mapToInt(i -> i))
@@ -81,11 +88,8 @@ public class ProtoTimeEncoder {
throw new RuntimeException("No index for key " + key + " in " + this.keysToIndex.keySet());
}
- // convert the duration from microseconds -> milliseconds
- double durationInMilliseconds = value.longValue() / 1000d;
-
// store in the array
- array[idx] = durationInMilliseconds;
+ array[idx] = this.valueTransformer.applyAsDouble(value.longValue());
});
return array;
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/window/WindowStatisticsCollector.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/window/WindowStatisticsCollector.java
index ce65013..1c05b00 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/window/WindowStatisticsCollector.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/window/WindowStatisticsCollector.java
@@ -29,20 +29,26 @@ import me.lucko.spark.common.tick.TickHook;
import me.lucko.spark.common.util.RollingAverage;
import me.lucko.spark.proto.SparkProtos;
+import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.IntPredicate;
+import java.util.logging.Level;
/**
* Collects statistics for each profiling window.
*/
public class WindowStatisticsCollector {
- private static final SparkProtos.WindowStatistics ZERO = SparkProtos.WindowStatistics.newBuilder().build();
+ private static final SparkProtos.WindowStatistics ZERO = SparkProtos.WindowStatistics.newBuilder()
+ .setDuration(ProfilingWindowUtils.WINDOW_SIZE_SECONDS * 1000)
+ .build();
/** The platform */
private final SparkPlatform platform;
+ /** Map of profiling window -> start time */
+ private final Map<Integer, Long> windowStartTimes = new HashMap<>();
/** Map of profiling window -> statistics */
private final Map<Integer, SparkProtos.WindowStatistics> stats;
@@ -100,12 +106,21 @@ public class WindowStatisticsCollector {
}
/**
+ * Records the wall-clock time when a window was started.
+ *
+ * @param window the window
+ */
+ public void recordWindowStartTime(int window) {
+ this.windowStartTimes.put(window, System.currentTimeMillis());
+ }
+
+ /**
* Measures statistics for the given window if none have been recorded yet.
*
* @param window the window
*/
public void measureNow(int window) {
- this.stats.computeIfAbsent(window, w -> measure());
+ this.stats.computeIfAbsent(window, this::measure);
}
/**
@@ -132,9 +147,20 @@ public class WindowStatisticsCollector {
*
* @return the current statistics
*/
- private SparkProtos.WindowStatistics measure() {
+ private SparkProtos.WindowStatistics measure(int window) {
SparkProtos.WindowStatistics.Builder builder = SparkProtos.WindowStatistics.newBuilder();
+ long endTime = System.currentTimeMillis();
+ Long startTime = this.windowStartTimes.get(window);
+ if (startTime == null) {
+ this.platform.getPlugin().log(Level.WARNING, "Unknown start time for window " + window);
+ startTime = endTime - (ProfilingWindowUtils.WINDOW_SIZE_SECONDS * 1000); // guess
+ }
+
+ builder.setStartTime(startTime);
+ builder.setEndTime(endTime);
+ builder.setDuration((int) (endTime - startTime));
+
TickStatistics tickStatistics = this.platform.getTickStatistics();
if (tickStatistics != null) {
builder.setTps(tickStatistics.tps1Min());