From d6d2e31169af5966e244585582a4ccea317cecb3 Mon Sep 17 00:00:00 2001 From: vaperion Date: Sat, 31 Dec 2022 15:47:56 +0100 Subject: Make thread grouper configurable for the background sampler --- .../spark/common/sampler/BackgroundSamplerManager.java | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) (limited to 'spark-common/src/main/java') diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/BackgroundSamplerManager.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/BackgroundSamplerManager.java index 7e3b6b4..82a4b47 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/BackgroundSamplerManager.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/BackgroundSamplerManager.java @@ -31,6 +31,7 @@ public class BackgroundSamplerManager { private static final String OPTION_ENABLED = "backgroundProfiler"; private static final String OPTION_ENGINE = "backgroundProfilerEngine"; private static final String OPTION_INTERVAL = "backgroundProfilerInterval"; + private static final String OPTION_THREAD_GROUPER = "backgroundProfilerThreadGrouper"; private static final String MARKER_FAILED = "_marker_background_profiler_failed"; @@ -101,10 +102,23 @@ public class BackgroundSamplerManager { private void startSampler() { boolean forceJavaEngine = this.configuration.getString(OPTION_ENGINE, "async").equals("java"); + ThreadGrouper threadGrouper; + switch (this.configuration.getString(OPTION_THREAD_GROUPER, "byPool")) { + case "asOne": + threadGrouper = ThreadGrouper.AS_ONE; + break; + case "byName": + threadGrouper = ThreadGrouper.BY_NAME; + break; + default: + threadGrouper = ThreadGrouper.BY_POOL; + break; + } + Sampler sampler = new SamplerBuilder() .background(true) .threadDumper(this.platform.getPlugin().getDefaultThreadDumper()) - .threadGrouper(ThreadGrouper.BY_POOL) + .threadGrouper(threadGrouper) .samplingInterval(this.configuration.getInteger(OPTION_INTERVAL, 10)) .forceJavaSampler(forceJavaEngine) .start(this.platform); -- cgit From cf41ef932545ea28567e6bf3c9bf6ce634d43976 Mon Sep 17 00:00:00 2001 From: vaperion Date: Sat, 31 Dec 2022 16:08:20 +0100 Subject: Make thread dumper configurable for the background sampler --- .../common/sampler/BackgroundSamplerManager.java | 35 +++++++++++++++++----- 1 file changed, 28 insertions(+), 7 deletions(-) (limited to 'spark-common/src/main/java') diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/BackgroundSamplerManager.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/BackgroundSamplerManager.java index 82a4b47..ff1b3ac 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/BackgroundSamplerManager.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/BackgroundSamplerManager.java @@ -24,7 +24,10 @@ import me.lucko.spark.common.SparkPlatform; import me.lucko.spark.common.platform.PlatformInfo; import me.lucko.spark.common.util.Configuration; +import java.util.Arrays; +import java.util.Set; import java.util.logging.Level; +import java.util.stream.Collectors; public class BackgroundSamplerManager { @@ -32,6 +35,7 @@ public class BackgroundSamplerManager { private static final String OPTION_ENGINE = "backgroundProfilerEngine"; private static final String OPTION_INTERVAL = "backgroundProfilerInterval"; private static final String OPTION_THREAD_GROUPER = "backgroundProfilerThreadGrouper"; + private static final String OPTION_THREAD_DUMPER = "backgroundProfilerThreadDumper"; private static final String MARKER_FAILED = "_marker_background_profiler_failed"; @@ -101,9 +105,11 @@ public class BackgroundSamplerManager { private void startSampler() { boolean forceJavaEngine = this.configuration.getString(OPTION_ENGINE, "async").equals("java"); + String grouperName = this.configuration.getString(OPTION_THREAD_GROUPER, "byPool"); + String dumperName = this.configuration.getString(OPTION_THREAD_DUMPER, "default"); ThreadGrouper threadGrouper; - switch (this.configuration.getString(OPTION_THREAD_GROUPER, "byPool")) { + switch (grouperName) { case "asOne": threadGrouper = ThreadGrouper.AS_ONE; break; @@ -115,13 +121,28 @@ public class BackgroundSamplerManager { break; } + ThreadDumper threadDumper; + switch (dumperName) { + case "default": + threadDumper = this.platform.getPlugin().getDefaultThreadDumper(); + break; + case "*": + case "all": + threadDumper = ThreadDumper.ALL; + break; + default: + Set threadNames = Arrays.stream(dumperName.split(",")).collect(Collectors.toSet()); + threadDumper = new ThreadDumper.Specific(threadNames); + break; + } + Sampler sampler = new SamplerBuilder() - .background(true) - .threadDumper(this.platform.getPlugin().getDefaultThreadDumper()) - .threadGrouper(threadGrouper) - .samplingInterval(this.configuration.getInteger(OPTION_INTERVAL, 10)) - .forceJavaSampler(forceJavaEngine) - .start(this.platform); + .background(true) + .threadDumper(threadDumper) + .threadGrouper(threadGrouper) + .samplingInterval(this.configuration.getInteger(OPTION_INTERVAL, 10)) + .forceJavaSampler(forceJavaEngine) + .start(this.platform); this.platform.getSamplerContainer().setActiveSampler(sampler); } -- cgit From dcdaacd7deb40be939bf91379f7391c02481cc48 Mon Sep 17 00:00:00 2001 From: Luck Date: Sun, 1 Jan 2023 16:15:50 +0000 Subject: Refactor thread grouper/dumper config parse --- .../common/sampler/BackgroundSamplerManager.java | 39 +++----------- .../lucko/spark/common/sampler/ThreadDumper.java | 18 +++++++ .../lucko/spark/common/sampler/ThreadGrouper.java | 60 +++++++++++++++------- 3 files changed, 66 insertions(+), 51 deletions(-) (limited to 'spark-common/src/main/java') diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/BackgroundSamplerManager.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/BackgroundSamplerManager.java index ff1b3ac..4e9ca9e 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/BackgroundSamplerManager.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/BackgroundSamplerManager.java @@ -24,10 +24,7 @@ import me.lucko.spark.common.SparkPlatform; import me.lucko.spark.common.platform.PlatformInfo; import me.lucko.spark.common.util.Configuration; -import java.util.Arrays; -import java.util.Set; import java.util.logging.Level; -import java.util.stream.Collectors; public class BackgroundSamplerManager { @@ -105,42 +102,20 @@ public class BackgroundSamplerManager { private void startSampler() { boolean forceJavaEngine = this.configuration.getString(OPTION_ENGINE, "async").equals("java"); - String grouperName = this.configuration.getString(OPTION_THREAD_GROUPER, "byPool"); - String dumperName = this.configuration.getString(OPTION_THREAD_DUMPER, "default"); - - ThreadGrouper threadGrouper; - switch (grouperName) { - case "asOne": - threadGrouper = ThreadGrouper.AS_ONE; - break; - case "byName": - threadGrouper = ThreadGrouper.BY_NAME; - break; - default: - threadGrouper = ThreadGrouper.BY_POOL; - break; - } - ThreadDumper threadDumper; - switch (dumperName) { - case "default": - threadDumper = this.platform.getPlugin().getDefaultThreadDumper(); - break; - case "*": - case "all": - threadDumper = ThreadDumper.ALL; - break; - default: - Set threadNames = Arrays.stream(dumperName.split(",")).collect(Collectors.toSet()); - threadDumper = new ThreadDumper.Specific(threadNames); - break; + ThreadGrouper threadGrouper = ThreadGrouper.parseConfigSetting(this.configuration.getString(OPTION_THREAD_GROUPER, "by-pool")); + ThreadDumper threadDumper = ThreadDumper.parseConfigSetting(this.configuration.getString(OPTION_THREAD_DUMPER, "default")); + if (threadDumper == null) { + threadDumper = this.platform.getPlugin().getDefaultThreadDumper(); } + int interval = this.configuration.getInteger(OPTION_INTERVAL, 10); + Sampler sampler = new SamplerBuilder() .background(true) .threadDumper(threadDumper) .threadGrouper(threadGrouper) - .samplingInterval(this.configuration.getInteger(OPTION_INTERVAL, 10)) + .samplingInterval(interval) .forceJavaSampler(forceJavaEngine) .start(this.platform); diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadDumper.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadDumper.java index fd0c413..62e2dda 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadDumper.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadDumper.java @@ -54,6 +54,24 @@ public interface ThreadDumper { */ SamplerMetadata.ThreadDumper getMetadata(); + /** + * Creates a new {@link ThreadDumper} by parsing the given config setting. + * + * @param setting the config setting + * @return the thread dumper + */ + static ThreadDumper parseConfigSetting(String setting) { + switch (setting) { + case "default": + return null; + case "all": + return ALL; + default: + Set threadNames = Arrays.stream(setting.split(",")).collect(Collectors.toSet()); + return new ThreadDumper.Specific(threadNames); + } + } + /** * Implementation of {@link ThreadDumper} that generates data for all threads. */ diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadGrouper.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadGrouper.java index 9ad84df..b6cfbea 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadGrouper.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/ThreadGrouper.java @@ -34,6 +34,47 @@ import java.util.regex.Pattern; */ public interface ThreadGrouper { + /** + * Gets the group for the given thread. + * + * @param threadId the id of the thread + * @param threadName the name of the thread + * @return the group + */ + String getGroup(long threadId, String threadName); + + /** + * Gets the label to use for a given group. + * + * @param group the group + * @return the label + */ + String getLabel(String group); + + /** + * Gets the metadata enum instance for this thread grouper. + * + * @return proto metadata + */ + SamplerMetadata.DataAggregator.ThreadGrouper asProto(); + + /** + * Creates a new {@link ThreadGrouper} by parsing the given config setting. + * + * @param setting the config setting + * @return the thread grouper + */ + static ThreadGrouper parseConfigSetting(String setting) { + switch (setting) { + case "as-one": + return AS_ONE; + case "by-name": + return BY_NAME; + default: + return BY_POOL; + } + } + /** * Implementation of {@link ThreadGrouper} that just groups by thread name. */ @@ -126,23 +167,4 @@ public interface ThreadGrouper { } }; - /** - * Gets the group for the given thread. - * - * @param threadId the id of the thread - * @param threadName the name of the thread - * @return the group - */ - String getGroup(long threadId, String threadName); - - /** - * Gets the label to use for a given group. - * - * @param group the group - * @return the label - */ - String getLabel(String group); - - SamplerMetadata.DataAggregator.ThreadGrouper asProto(); - } -- cgit From d83e49128ad59308f4b3ff19cf4b22b53236be8d Mon Sep 17 00:00:00 2001 From: lucko Date: Sun, 8 Jan 2023 15:21:32 +0000 Subject: Allocation profiler (#290) --- .../common/command/modules/SamplerModule.java | 29 +++- .../spark/common/sampler/AbstractSampler.java | 3 +- .../me/lucko/spark/common/sampler/Sampler.java | 7 + .../lucko/spark/common/sampler/SamplerBuilder.java | 38 ++++- .../me/lucko/spark/common/sampler/SamplerMode.java | 74 ++++++++++ .../common/sampler/async/AsyncDataAggregator.java | 2 +- .../common/sampler/async/AsyncProfilerAccess.java | 25 +++- .../common/sampler/async/AsyncProfilerJob.java | 55 ++++---- .../spark/common/sampler/async/AsyncSampler.java | 45 ++++-- .../spark/common/sampler/async/ProfileSegment.java | 14 +- .../common/sampler/async/SampleCollector.java | 154 +++++++++++++++++++++ .../spark/common/sampler/java/JavaSampler.java | 9 ++ .../common/sampler/window/ProtoTimeEncoder.java | 14 +- .../sampler/window/WindowStatisticsCollector.java | 32 ++++- 14 files changed, 428 insertions(+), 73 deletions(-) create mode 100644 spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerMode.java create mode 100644 spark-common/src/main/java/me/lucko/spark/common/sampler/async/SampleCollector.java (limited to 'spark-common/src/main/java') diff --git a/spark-common/src/main/java/me/lucko/spark/common/command/modules/SamplerModule.java b/spark-common/src/main/java/me/lucko/spark/common/command/modules/SamplerModule.java index cd00f0d..041cacf 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/command/modules/SamplerModule.java +++ b/spark-common/src/main/java/me/lucko/spark/common/command/modules/SamplerModule.java @@ -33,6 +33,7 @@ import me.lucko.spark.common.command.tabcomplete.CompletionSupplier; import me.lucko.spark.common.command.tabcomplete.TabCompleter; import me.lucko.spark.common.sampler.Sampler; import me.lucko.spark.common.sampler.SamplerBuilder; +import me.lucko.spark.common.sampler.SamplerMode; import me.lucko.spark.common.sampler.ThreadDumper; import me.lucko.spark.common.sampler.ThreadGrouper; import me.lucko.spark.common.sampler.async.AsyncSampler; @@ -80,6 +81,7 @@ public class SamplerModule implements CommandModule { .argumentUsage("start", "thread", "thread name") .argumentUsage("start", "only-ticks-over", "tick length millis") .argumentUsage("start", "interval", "interval millis") + .argumentUsage("start", "alloc", null) .argumentUsage("stop", "", null) .argumentUsage("cancel", "", null) .executor(this::profiler) @@ -94,7 +96,7 @@ public class SamplerModule implements CommandModule { } if (subCommand.equals("start")) { opts = new ArrayList<>(Arrays.asList("--timeout", "--regex", "--combine-all", - "--not-combined", "--interval", "--only-ticks-over", "--force-java-sampler")); + "--not-combined", "--interval", "--only-ticks-over", "--force-java-sampler", "--alloc", "--alloc-live-only")); opts.removeAll(arguments); opts.add("--thread"); // allowed multiple times } @@ -166,9 +168,12 @@ public class SamplerModule implements CommandModule { "Consider setting a timeout value over 30 seconds.")); } - double intervalMillis = arguments.doubleFlag("interval"); - if (intervalMillis <= 0) { - intervalMillis = 4; + SamplerMode mode = arguments.boolFlag("alloc") ? SamplerMode.ALLOCATION : SamplerMode.EXECUTION; + boolean allocLiveOnly = arguments.boolFlag("alloc-live-only"); + + double interval = arguments.doubleFlag("interval"); + if (interval <= 0) { + interval = mode.defaultInterval(); } boolean ignoreSleeping = arguments.boolFlag("ignore-sleeping"); @@ -213,23 +218,33 @@ public class SamplerModule implements CommandModule { resp.broadcastPrefixed(text("Starting a new profiler, please wait...")); SamplerBuilder builder = new SamplerBuilder(); + builder.mode(mode); builder.threadDumper(threadDumper); builder.threadGrouper(threadGrouper); if (timeoutSeconds != -1) { builder.completeAfter(timeoutSeconds, TimeUnit.SECONDS); } - builder.samplingInterval(intervalMillis); + builder.samplingInterval(interval); builder.ignoreSleeping(ignoreSleeping); builder.ignoreNative(ignoreNative); builder.forceJavaSampler(forceJavaSampler); + builder.allocLiveOnly(allocLiveOnly); if (ticksOver != -1) { builder.ticksOver(ticksOver, tickHook); } - Sampler sampler = builder.start(platform); + + Sampler sampler; + try { + sampler = builder.start(platform); + } catch (UnsupportedOperationException e) { + resp.replyPrefixed(text(e.getMessage(), RED)); + return; + } + platform.getSamplerContainer().setActiveSampler(sampler); resp.broadcastPrefixed(text() - .append(text("Profiler is now running!", GOLD)) + .append(text((mode == SamplerMode.ALLOCATION ? "Allocation Profiler" : "Profiler") + " is now running!", GOLD)) .append(space()) .append(text("(" + (sampler instanceof AsyncSampler ? "async" : "built-in java") + ")", DARK_GRAY)) .build() diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/AbstractSampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/AbstractSampler.java index e324fd3..5abe71f 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/AbstractSampler.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/AbstractSampler.java @@ -126,6 +126,7 @@ public abstract class AbstractSampler implements Sampler { protected void writeMetadataToProto(SamplerData.Builder proto, SparkPlatform platform, CommandSender creator, String comment, DataAggregator dataAggregator) { SamplerMetadata.Builder metadata = SamplerMetadata.newBuilder() + .setSamplerMode(getMode().asProto()) .setPlatformMetadata(platform.getPlugin().getPlatformInfo().toData().toProto()) .setCreator(creator.toData().toProto()) .setStartTime(this.startTime) @@ -187,7 +188,7 @@ public abstract class AbstractSampler implements Sampler { ClassSourceLookup.Visitor classSourceVisitor = ClassSourceLookup.createVisitor(classSourceLookup); - ProtoTimeEncoder timeEncoder = new ProtoTimeEncoder(data); + ProtoTimeEncoder timeEncoder = new ProtoTimeEncoder(getMode().valueTransformer(), data); int[] timeWindows = timeEncoder.getKeys(); for (int timeWindow : timeWindows) { proto.addTimeWindows(timeWindow); diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java index 36a63f1..aaf4f38 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java @@ -64,6 +64,13 @@ public interface Sampler { */ boolean isRunningInBackground(); + /** + * Gets the sampler mode. + * + * @return the sampler mode + */ + SamplerMode getMode(); + /** * Gets a future to encapsulate the completion of the sampler * diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java index ec635ef..7891a98 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java @@ -23,6 +23,7 @@ package me.lucko.spark.common.sampler; import me.lucko.spark.common.SparkPlatform; import me.lucko.spark.common.sampler.async.AsyncProfilerAccess; import me.lucko.spark.common.sampler.async.AsyncSampler; +import me.lucko.spark.common.sampler.async.SampleCollector; import me.lucko.spark.common.sampler.java.JavaSampler; import me.lucko.spark.common.tick.TickHook; @@ -34,10 +35,12 @@ import java.util.concurrent.TimeUnit; @SuppressWarnings("UnusedReturnValue") public class SamplerBuilder { - private double samplingInterval = 4; // milliseconds + private SamplerMode mode = SamplerMode.EXECUTION; + private double samplingInterval = -1; private boolean ignoreSleeping = false; private boolean ignoreNative = false; private boolean useAsyncProfiler = true; + private boolean allocLiveOnly = false; private long autoEndTime = -1; private boolean background = false; private ThreadDumper threadDumper = ThreadDumper.ALL; @@ -49,6 +52,11 @@ public class SamplerBuilder { public SamplerBuilder() { } + public SamplerBuilder mode(SamplerMode mode) { + this.mode = mode; + return this; + } + public SamplerBuilder samplingInterval(double samplingInterval) { this.samplingInterval = samplingInterval; return this; @@ -98,7 +106,16 @@ public class SamplerBuilder { return this; } - public Sampler start(SparkPlatform platform) { + public SamplerBuilder allocLiveOnly(boolean allocLiveOnly) { + this.allocLiveOnly = allocLiveOnly; + return this; + } + + public Sampler start(SparkPlatform platform) throws UnsupportedOperationException { + if (this.samplingInterval <= 0) { + throw new IllegalArgumentException("samplingInterval = " + this.samplingInterval); + } + boolean onlyTicksOverMode = this.ticksOver != -1 && this.tickHook != null; boolean canUseAsyncProfiler = this.useAsyncProfiler && !onlyTicksOverMode && @@ -106,13 +123,22 @@ public class SamplerBuilder { !(this.threadDumper instanceof ThreadDumper.Regex) && AsyncProfilerAccess.getInstance(platform).checkSupported(platform); + if (this.mode == SamplerMode.ALLOCATION && (!canUseAsyncProfiler || !AsyncProfilerAccess.getInstance(platform).checkAllocationProfilingSupported(platform))) { + throw new UnsupportedOperationException("Allocation profiling is not supported on your system. Check the console for more info."); + } + + int interval = (int) (this.mode == SamplerMode.EXECUTION ? + this.samplingInterval * 1000d : // convert to microseconds + this.samplingInterval + ); - int intervalMicros = (int) (this.samplingInterval * 1000d); - SamplerSettings settings = new SamplerSettings(intervalMicros, this.threadDumper, this.threadGrouper, this.autoEndTime, this.background); + SamplerSettings settings = new SamplerSettings(interval, this.threadDumper, this.threadGrouper, this.autoEndTime, this.background); Sampler sampler; - if (canUseAsyncProfiler) { - sampler = new AsyncSampler(platform, settings); + if (this.mode == SamplerMode.ALLOCATION) { + sampler = new AsyncSampler(platform, settings, new SampleCollector.Allocation(interval, this.allocLiveOnly)); + } else if (canUseAsyncProfiler) { + sampler = new AsyncSampler(platform, settings, new SampleCollector.Execution(interval)); } else if (onlyTicksOverMode) { sampler = new JavaSampler(platform, settings, this.ignoreSleeping, this.ignoreNative, this.tickHook, this.ticksOver); } else { diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerMode.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerMode.java new file mode 100644 index 0000000..f9a6e41 --- /dev/null +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerMode.java @@ -0,0 +1,74 @@ +/* + * This file is part of spark. + * + * Copyright (c) lucko (Luck) + * Copyright (c) contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +package me.lucko.spark.common.sampler; + +import me.lucko.spark.proto.SparkSamplerProtos.SamplerMetadata; + +import java.util.function.LongToDoubleFunction; + +public enum SamplerMode { + + EXECUTION( + value -> { + // convert the duration from microseconds -> milliseconds + return value / 1000d; + }, + 4, // ms + SamplerMetadata.SamplerMode.EXECUTION + ), + + ALLOCATION( + value -> { + // do nothing + return value; + }, + 524287, // 512 KiB + SamplerMetadata.SamplerMode.ALLOCATION + ); + + private final LongToDoubleFunction valueTransformer; + private final int defaultInterval; + private final SamplerMetadata.SamplerMode proto; + + SamplerMode(LongToDoubleFunction valueTransformer, int defaultInterval, SamplerMetadata.SamplerMode proto) { + this.valueTransformer = valueTransformer; + this.defaultInterval = defaultInterval; + this.proto = proto; + } + + public LongToDoubleFunction valueTransformer() { + return this.valueTransformer; + } + + public int defaultInterval() { + return this.defaultInterval; + } + + /** + * Gets the metadata enum instance for this sampler mode. + * + * @return proto metadata + */ + public SamplerMetadata.SamplerMode asProto() { + return this.proto; + } + +} diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncDataAggregator.java index 402330a..b9a80e0 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncDataAggregator.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncDataAggregator.java @@ -50,7 +50,7 @@ public class AsyncDataAggregator extends AbstractDataAggregator { public void insertData(ProfileSegment element, int window) { try { ThreadNode node = getNode(this.threadGrouper.getGroup(element.getNativeThreadId(), element.getThreadName())); - node.log(STACK_TRACE_DESCRIBER, element.getStackTrace(), element.getTime(), window); + node.log(STACK_TRACE_DESCRIBER, element.getStackTrace(), element.getValue(), window); } catch (Exception e) { e.printStackTrace(); } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerAccess.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerAccess.java index 1480650..5bee56f 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerAccess.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerAccess.java @@ -61,6 +61,8 @@ public class AsyncProfilerAccess { /** The event to use for profiling */ private final ProfilingEvent profilingEvent; + /** The event to use for allocation profiling */ + private final ProfilingEvent allocationProfilingEvent; /** If profiler is null, contains the reason why setup failed */ private final Exception setupException; @@ -68,10 +70,16 @@ public class AsyncProfilerAccess { AsyncProfilerAccess(SparkPlatform platform) { AsyncProfiler profiler; ProfilingEvent profilingEvent = null; + ProfilingEvent allocationProfilingEvent = null; Exception setupException = null; try { profiler = load(platform); + + if (isEventSupported(profiler, ProfilingEvent.ALLOC, false)) { + allocationProfilingEvent = ProfilingEvent.ALLOC; + } + if (isEventSupported(profiler, ProfilingEvent.CPU, false)) { profilingEvent = ProfilingEvent.CPU; } else if (isEventSupported(profiler, ProfilingEvent.WALL, true)) { @@ -84,6 +92,7 @@ public class AsyncProfilerAccess { this.profiler = profiler; this.profilingEvent = profilingEvent; + this.allocationProfilingEvent = allocationProfilingEvent; this.setupException = setupException; } @@ -98,6 +107,10 @@ public class AsyncProfilerAccess { return this.profilingEvent; } + public ProfilingEvent getAllocationProfilingEvent() { + return this.allocationProfilingEvent; + } + public boolean checkSupported(SparkPlatform platform) { if (this.setupException != null) { if (this.setupException instanceof UnsupportedSystemException) { @@ -116,6 +129,15 @@ public class AsyncProfilerAccess { return this.profiler != null; } + public boolean checkAllocationProfilingSupported(SparkPlatform platform) { + boolean supported = this.allocationProfilingEvent != null; + if (!supported && this.profiler != null) { + platform.getPlugin().log(Level.WARNING, "The allocation profiling mode is not supported on your system. This is most likely because Hotspot debug symbols are not available."); + platform.getPlugin().log(Level.WARNING, "To resolve, try installing the 'openjdk-11-dbg' or 'openjdk-8-dbg' package using your OS package manager."); + } + return supported; + } + private static AsyncProfiler load(SparkPlatform platform) throws Exception { // check compatibility String os = System.getProperty("os.name").toLowerCase(Locale.ROOT).replace(" ", ""); @@ -183,7 +205,8 @@ public class AsyncProfilerAccess { enum ProfilingEvent { CPU(Events.CPU), - WALL(Events.WALL); + WALL(Events.WALL), + ALLOC(Events.ALLOC); private final String id; diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerJob.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerJob.java index d74b75f..039d4ba 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerJob.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerJob.java @@ -20,6 +20,8 @@ package me.lucko.spark.common.sampler.async; +import com.google.common.collect.ImmutableList; + import me.lucko.spark.common.SparkPlatform; import me.lucko.spark.common.sampler.ThreadDumper; import me.lucko.spark.common.sampler.async.jfr.JfrReader; @@ -29,8 +31,8 @@ import one.profiler.AsyncProfiler; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; +import java.util.Collection; import java.util.List; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Predicate; @@ -77,8 +79,8 @@ public class AsyncProfilerJob { // Set on init /** The platform */ private SparkPlatform platform; - /** The sampling interval in microseconds */ - private int interval; + /** The sample collector */ + private SampleCollector sampleCollector; /** The thread dumper */ private ThreadDumper threadDumper; /** The profiling window */ @@ -100,9 +102,9 @@ public class AsyncProfilerJob { * @param command the command * @return the output */ - private String execute(String command) { + private String execute(Collection command) { try { - return this.profiler.execute(command); + return this.profiler.execute(String.join(",", command)); } catch (IOException e) { throw new RuntimeException("Exception whilst executing profiler command", e); } @@ -118,9 +120,9 @@ public class AsyncProfilerJob { } // Initialise the job - public void init(SparkPlatform platform, int interval, ThreadDumper threadDumper, int window, boolean quiet) { + public void init(SparkPlatform platform, SampleCollector collector, ThreadDumper threadDumper, int window, boolean quiet) { this.platform = platform; - this.interval = interval; + this.sampleCollector = collector; this.threadDumper = threadDumper; this.window = window; this.quiet = quiet; @@ -141,16 +143,20 @@ public class AsyncProfilerJob { } // construct a command to send to async-profiler - String command = "start,event=" + this.access.getProfilingEvent() + ",interval=" + this.interval + "us,threads,jfr,file=" + this.outputFile.toString(); + ImmutableList.Builder command = ImmutableList.builder() + .add("start") + .addAll(this.sampleCollector.initArguments(this.access)) + .add("threads").add("jfr").add("file=" + this.outputFile.toString()); + if (this.quiet) { - command += ",loglevel=NONE"; + command.add("loglevel=NONE"); } if (this.threadDumper instanceof ThreadDumper.Specific) { - command += ",filter"; + command.add("filter"); } // start the profiler - String resp = execute(command).trim(); + String resp = execute(command.build()).trim(); if (!resp.equalsIgnoreCase("profiling started")) { throw new RuntimeException("Unexpected response: " + resp); @@ -208,7 +214,7 @@ public class AsyncProfilerJob { // read the jfr file produced by async-profiler try (JfrReader reader = new JfrReader(this.outputFile)) { - readSegments(reader, threadFilter, dataAggregator, this.window); + readSegments(reader, this.sampleCollector, threadFilter, dataAggregator); } catch (Exception e) { boolean fileExists; try { @@ -235,22 +241,9 @@ public class AsyncProfilerJob { } } - private void readSegments(JfrReader reader, Predicate threadFilter, AsyncDataAggregator dataAggregator, int window) throws IOException { - List samples = reader.readAllEvents(JfrReader.ExecutionSample.class); - for (int i = 0; i < samples.size(); i++) { - JfrReader.ExecutionSample sample = samples.get(i); - - long duration; - if (i == 0) { - // we don't really know the duration of the first sample, so just use the sampling - // interval - duration = this.interval; - } else { - // calculate the duration of the sample by calculating the time elapsed since the - // previous sample - duration = TimeUnit.NANOSECONDS.toMicros(sample.time - samples.get(i - 1).time); - } - + private void readSegments(JfrReader reader, SampleCollector collector, Predicate threadFilter, AsyncDataAggregator dataAggregator) throws IOException { + List samples = reader.readAllEvents(collector.eventClass()); + for (E sample : samples) { String threadName = reader.threads.get((long) sample.tid); if (threadName == null) { continue; @@ -260,9 +253,11 @@ public class AsyncProfilerJob { continue; } + long value = collector.measure(sample); + // parse the segment and give it to the data aggregator - ProfileSegment segment = ProfileSegment.parseSegment(reader, sample, threadName, duration); - dataAggregator.insertData(segment, window); + ProfileSegment segment = ProfileSegment.parseSegment(reader, sample, threadName, value); + dataAggregator.insertData(segment, this.window); } } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java index 178f055..2328582 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java @@ -25,6 +25,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import me.lucko.spark.common.SparkPlatform; import me.lucko.spark.common.command.sender.CommandSender; import me.lucko.spark.common.sampler.AbstractSampler; +import me.lucko.spark.common.sampler.SamplerMode; import me.lucko.spark.common.sampler.SamplerSettings; import me.lucko.spark.common.sampler.node.MergeMode; import me.lucko.spark.common.sampler.source.ClassSourceLookup; @@ -41,6 +42,11 @@ import java.util.function.IntPredicate; * A sampler implementation using async-profiler. */ public class AsyncSampler extends AbstractSampler { + + /** Function to collect and measure samples - either execution or allocation */ + private final SampleCollector sampleCollector; + + /** Object that provides access to the async-profiler API */ private final AsyncProfilerAccess profilerAccess; /** Responsible for aggregating and then outputting collected sampling data */ @@ -55,8 +61,9 @@ public class AsyncSampler extends AbstractSampler { /** The executor used for scheduling and management */ private ScheduledExecutorService scheduler; - public AsyncSampler(SparkPlatform platform, SamplerSettings settings) { + public AsyncSampler(SparkPlatform platform, SamplerSettings settings, SampleCollector collector) { super(platform, settings); + this.sampleCollector = collector; this.profilerAccess = AsyncProfilerAccess.getInstance(platform); this.dataAggregator = new AsyncDataAggregator(settings.threadGrouper()); this.scheduler = Executors.newSingleThreadScheduledExecutor( @@ -79,17 +86,21 @@ public class AsyncSampler extends AbstractSampler { int window = ProfilingWindowUtils.windowNow(); AsyncProfilerJob job = this.profilerAccess.startNewProfilerJob(); - job.init(this.platform, this.interval, this.threadDumper, window, this.background); + job.init(this.platform, this.sampleCollector, this.threadDumper, window, this.background); job.start(); + this.windowStatisticsCollector.recordWindowStartTime(window); this.currentJob = job; // rotate the sampler job to put data into a new window - this.scheduler.scheduleAtFixedRate( - this::rotateProfilerJob, - ProfilingWindowUtils.WINDOW_SIZE_SECONDS, - ProfilingWindowUtils.WINDOW_SIZE_SECONDS, - TimeUnit.SECONDS - ); + boolean shouldNotRotate = this.sampleCollector instanceof SampleCollector.Allocation && ((SampleCollector.Allocation) this.sampleCollector).isLiveOnly(); + if (!shouldNotRotate) { + this.scheduler.scheduleAtFixedRate( + this::rotateProfilerJob, + ProfilingWindowUtils.WINDOW_SIZE_SECONDS, + ProfilingWindowUtils.WINDOW_SIZE_SECONDS, + TimeUnit.SECONDS + ); + } recordInitialGcStats(); scheduleTimeout(); @@ -106,9 +117,6 @@ public class AsyncSampler extends AbstractSampler { try { // stop the previous job previousJob.stop(); - - // collect statistics for the window - this.windowStatisticsCollector.measureNow(previousJob.getWindow()); } catch (Exception e) { e.printStackTrace(); } @@ -116,10 +124,18 @@ public class AsyncSampler extends AbstractSampler { // start a new job int window = previousJob.getWindow() + 1; AsyncProfilerJob newJob = this.profilerAccess.startNewProfilerJob(); - newJob.init(this.platform, this.interval, this.threadDumper, window, this.background); + newJob.init(this.platform, this.sampleCollector, this.threadDumper, window, this.background); newJob.start(); + this.windowStatisticsCollector.recordWindowStartTime(window); this.currentJob = newJob; + // collect statistics for the previous window + try { + this.windowStatisticsCollector.measureNow(previousJob.getWindow()); + } catch (Exception e) { + e.printStackTrace(); + } + // aggregate the output of the previous job previousJob.aggregate(this.dataAggregator); @@ -173,6 +189,11 @@ public class AsyncSampler extends AbstractSampler { } } + @Override + public SamplerMode getMode() { + return this.sampleCollector.getMode(); + } + @Override public SamplerData toProto(SparkPlatform platform, CommandSender creator, String comment, MergeMode mergeMode, ClassSourceLookup classSourceLookup) { SamplerData.Builder proto = SamplerData.newBuilder(); diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/ProfileSegment.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/ProfileSegment.java index 26debaf..0804ccf 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/ProfileSegment.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/ProfileSegment.java @@ -38,13 +38,13 @@ public class ProfileSegment { /** The stack trace for this segment */ private final AsyncStackTraceElement[] stackTrace; /** The time spent executing this segment in microseconds */ - private final long time; + private final long value; - public ProfileSegment(int nativeThreadId, String threadName, AsyncStackTraceElement[] stackTrace, long time) { + public ProfileSegment(int nativeThreadId, String threadName, AsyncStackTraceElement[] stackTrace, long value) { this.nativeThreadId = nativeThreadId; this.threadName = threadName; this.stackTrace = stackTrace; - this.time = time; + this.value = value; } public int getNativeThreadId() { @@ -59,11 +59,11 @@ public class ProfileSegment { return this.stackTrace; } - public long getTime() { - return this.time; + public long getValue() { + return this.value; } - public static ProfileSegment parseSegment(JfrReader reader, JfrReader.ExecutionSample sample, String threadName, long duration) { + public static ProfileSegment parseSegment(JfrReader reader, JfrReader.Event sample, String threadName, long value) { JfrReader.StackTrace stackTrace = reader.stackTraces.get(sample.stackTraceId); int len = stackTrace.methods.length; @@ -72,7 +72,7 @@ public class ProfileSegment { stack[i] = parseStackFrame(reader, stackTrace.methods[i]); } - return new ProfileSegment(sample.tid, threadName, stack, duration); + return new ProfileSegment(sample.tid, threadName, stack, value); } private static AsyncStackTraceElement parseStackFrame(JfrReader reader, long methodId) { diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/SampleCollector.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/SampleCollector.java new file mode 100644 index 0000000..6054b91 --- /dev/null +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/SampleCollector.java @@ -0,0 +1,154 @@ +/* + * This file is part of spark. + * + * Copyright (c) lucko (Luck) + * Copyright (c) contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +package me.lucko.spark.common.sampler.async; + +import com.google.common.collect.ImmutableList; + +import me.lucko.spark.common.sampler.SamplerMode; +import me.lucko.spark.common.sampler.async.AsyncProfilerAccess.ProfilingEvent; +import me.lucko.spark.common.sampler.async.jfr.JfrReader.AllocationSample; +import me.lucko.spark.common.sampler.async.jfr.JfrReader.Event; +import me.lucko.spark.common.sampler.async.jfr.JfrReader.ExecutionSample; + +import java.util.Collection; +import java.util.Objects; + +/** + * Collects and processes sample events for a given type. + * + * @param the event type + */ +public interface SampleCollector { + + /** + * Gets the arguments to initialise the profiler. + * + * @param access the async profiler access object + * @return the initialisation arguments + */ + Collection initArguments(AsyncProfilerAccess access); + + /** + * Gets the event class processed by this sample collector. + * + * @return the event class + */ + Class eventClass(); + + /** + * Gets the measurements for a given event + * + * @param event the event + * @return the measurement + */ + long measure(E event); + + /** + * Gets the mode for the collector. + * + * @return the mode + */ + SamplerMode getMode(); + + /** + * Sample collector for execution (cpu time) profiles. + */ + final class Execution implements SampleCollector { + private final int interval; // time in microseconds + + public Execution(int interval) { + this.interval = interval; + } + + @Override + public Collection initArguments(AsyncProfilerAccess access) { + ProfilingEvent event = access.getProfilingEvent(); + Objects.requireNonNull(event, "event"); + + return ImmutableList.of( + "event=" + event, + "interval=" + this.interval + "us" + ); + } + + @Override + public Class eventClass() { + return ExecutionSample.class; + } + + @Override + public long measure(ExecutionSample event) { + return event.value() * this.interval; + } + + @Override + public SamplerMode getMode() { + return SamplerMode.EXECUTION; + } + } + + /** + * Sample collector for allocation (memory) profiles. + */ + final class Allocation implements SampleCollector { + private final int intervalBytes; + private final boolean liveOnly; + + public Allocation(int intervalBytes, boolean liveOnly) { + this.intervalBytes = intervalBytes; + this.liveOnly = liveOnly; + } + + public boolean isLiveOnly() { + return this.liveOnly; + } + + @Override + public Collection initArguments(AsyncProfilerAccess access) { + ProfilingEvent event = access.getAllocationProfilingEvent(); + Objects.requireNonNull(event, "event"); + + ImmutableList.Builder builder = ImmutableList.builder(); + builder.add("event=" + event); + builder.add("alloc=" + this.intervalBytes); + if (this.liveOnly) { + builder.add("live"); + } + return builder.build(); + } + + @Override + public Class eventClass() { + return AllocationSample.class; + } + + @Override + public long measure(AllocationSample event) { + return event.value(); + } + + @Override + public SamplerMode getMode() { + return SamplerMode.ALLOCATION; + } + } + +} diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java index 72a37e8..d5c965f 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java @@ -25,6 +25,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import me.lucko.spark.common.SparkPlatform; import me.lucko.spark.common.command.sender.CommandSender; import me.lucko.spark.common.sampler.AbstractSampler; +import me.lucko.spark.common.sampler.SamplerMode; import me.lucko.spark.common.sampler.SamplerSettings; import me.lucko.spark.common.sampler.node.MergeMode; import me.lucko.spark.common.sampler.source.ClassSourceLookup; @@ -90,6 +91,7 @@ public class JavaSampler extends AbstractSampler implements Runnable { } } + this.windowStatisticsCollector.recordWindowStartTime(ProfilingWindowUtils.unixMillisToWindow(this.startTime)); this.task = this.workerPool.scheduleAtFixedRate(this, 0, this.interval, TimeUnit.MICROSECONDS); } @@ -149,6 +151,9 @@ public class JavaSampler extends AbstractSampler implements Runnable { int previousWindow = JavaSampler.this.lastWindow.getAndUpdate(previous -> Math.max(this.window, previous)); if (previousWindow != 0 && previousWindow != this.window) { + // record the start time for the new window + JavaSampler.this.windowStatisticsCollector.recordWindowStartTime(this.window); + // collect statistics for the previous window JavaSampler.this.windowStatisticsCollector.measureNow(previousWindow); @@ -168,4 +173,8 @@ public class JavaSampler extends AbstractSampler implements Runnable { return proto.build(); } + @Override + public SamplerMode getMode() { + return SamplerMode.EXECUTION; + } } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/window/ProtoTimeEncoder.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/window/ProtoTimeEncoder.java index 03da075..fb4a4fc 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/window/ProtoTimeEncoder.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/window/ProtoTimeEncoder.java @@ -27,18 +27,25 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.LongAdder; +import java.util.function.LongToDoubleFunction; import java.util.stream.IntStream; /** * Encodes a map of int->double into a double array. */ public class ProtoTimeEncoder { + + /** A transformer function to transform the 'time' value from a long to a double */ + private final LongToDoubleFunction valueTransformer; + /** A sorted array of all possible keys to encode */ private final int[] keys; /** A map of key value -> index in the keys array */ private final Map keysToIndex; - public ProtoTimeEncoder(List sourceData) { + public ProtoTimeEncoder(LongToDoubleFunction valueTransformer, List sourceData) { + this.valueTransformer = valueTransformer; + // get an array of all keys that show up in the source data this.keys = sourceData.stream() .map(n -> n.getTimeWindows().stream().mapToInt(i -> i)) @@ -81,11 +88,8 @@ public class ProtoTimeEncoder { throw new RuntimeException("No index for key " + key + " in " + this.keysToIndex.keySet()); } - // convert the duration from microseconds -> milliseconds - double durationInMilliseconds = value.longValue() / 1000d; - // store in the array - array[idx] = durationInMilliseconds; + array[idx] = this.valueTransformer.applyAsDouble(value.longValue()); }); return array; diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/window/WindowStatisticsCollector.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/window/WindowStatisticsCollector.java index ce65013..1c05b00 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/window/WindowStatisticsCollector.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/window/WindowStatisticsCollector.java @@ -29,20 +29,26 @@ import me.lucko.spark.common.tick.TickHook; import me.lucko.spark.common.util.RollingAverage; import me.lucko.spark.proto.SparkProtos; +import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.IntPredicate; +import java.util.logging.Level; /** * Collects statistics for each profiling window. */ public class WindowStatisticsCollector { - private static final SparkProtos.WindowStatistics ZERO = SparkProtos.WindowStatistics.newBuilder().build(); + private static final SparkProtos.WindowStatistics ZERO = SparkProtos.WindowStatistics.newBuilder() + .setDuration(ProfilingWindowUtils.WINDOW_SIZE_SECONDS * 1000) + .build(); /** The platform */ private final SparkPlatform platform; + /** Map of profiling window -> start time */ + private final Map windowStartTimes = new HashMap<>(); /** Map of profiling window -> statistics */ private final Map stats; @@ -99,13 +105,22 @@ public class WindowStatisticsCollector { return this.tickCounter == null ? -1 : this.tickCounter.getTotalTicks(); } + /** + * Records the wall-clock time when a window was started. + * + * @param window the window + */ + public void recordWindowStartTime(int window) { + this.windowStartTimes.put(window, System.currentTimeMillis()); + } + /** * Measures statistics for the given window if none have been recorded yet. * * @param window the window */ public void measureNow(int window) { - this.stats.computeIfAbsent(window, w -> measure()); + this.stats.computeIfAbsent(window, this::measure); } /** @@ -132,9 +147,20 @@ public class WindowStatisticsCollector { * * @return the current statistics */ - private SparkProtos.WindowStatistics measure() { + private SparkProtos.WindowStatistics measure(int window) { SparkProtos.WindowStatistics.Builder builder = SparkProtos.WindowStatistics.newBuilder(); + long endTime = System.currentTimeMillis(); + Long startTime = this.windowStartTimes.get(window); + if (startTime == null) { + this.platform.getPlugin().log(Level.WARNING, "Unknown start time for window " + window); + startTime = endTime - (ProfilingWindowUtils.WINDOW_SIZE_SECONDS * 1000); // guess + } + + builder.setStartTime(startTime); + builder.setEndTime(endTime); + builder.setDuration((int) (endTime - startTime)); + TickStatistics tickStatistics = this.platform.getTickStatistics(); if (tickStatistics != null) { builder.setTps(tickStatistics.tps1Min()); -- cgit From 06b794dcea806150770fb88d43e366a3496a9d0f Mon Sep 17 00:00:00 2001 From: lucko Date: Sat, 28 Jan 2023 11:07:45 +0000 Subject: Stream live data to the viewer using WebSockets (#294) --- .../java/me/lucko/spark/common/SparkPlatform.java | 18 +- .../common/command/modules/HeapAnalysisModule.java | 4 +- .../common/command/modules/SamplerModule.java | 121 ++++++++-- .../spark/common/heapdump/HeapDumpSummary.java | 2 +- .../platform/PlatformStatisticsProvider.java | 23 +- .../spark/common/sampler/AbstractSampler.java | 53 ++++- .../me/lucko/spark/common/sampler/Sampler.java | 76 +++++- .../spark/common/sampler/async/AsyncSampler.java | 32 ++- .../spark/common/sampler/java/JavaSampler.java | 31 ++- .../sampler/window/WindowStatisticsCollector.java | 8 +- .../me/lucko/spark/common/util/BytebinClient.java | 14 +- .../me/lucko/spark/common/util/Configuration.java | 27 +++ .../me/lucko/spark/common/util/MediaTypes.java | 29 +++ .../spark/common/util/ws/BytesocksClient.java | 118 ++++++++++ .../spark/common/util/ws/BytesocksClientImpl.java | 40 ++++ .../me/lucko/spark/common/ws/CryptoAlgorithm.java | 90 ++++++++ .../me/lucko/spark/common/ws/TrustedKeyStore.java | 139 +++++++++++ .../me/lucko/spark/common/ws/ViewerSocket.java | 255 +++++++++++++++++++++ .../spark/common/ws/ViewerSocketConnection.java | 218 ++++++++++++++++++ 19 files changed, 1247 insertions(+), 51 deletions(-) create mode 100644 spark-common/src/main/java/me/lucko/spark/common/util/MediaTypes.java create mode 100644 spark-common/src/main/java/me/lucko/spark/common/util/ws/BytesocksClient.java create mode 100644 spark-common/src/main/java/me/lucko/spark/common/util/ws/BytesocksClientImpl.java create mode 100644 spark-common/src/main/java/me/lucko/spark/common/ws/CryptoAlgorithm.java create mode 100644 spark-common/src/main/java/me/lucko/spark/common/ws/TrustedKeyStore.java create mode 100644 spark-common/src/main/java/me/lucko/spark/common/ws/ViewerSocket.java create mode 100644 spark-common/src/main/java/me/lucko/spark/common/ws/ViewerSocketConnection.java (limited to 'spark-common/src/main/java') diff --git a/spark-common/src/main/java/me/lucko/spark/common/SparkPlatform.java b/spark-common/src/main/java/me/lucko/spark/common/SparkPlatform.java index dae04ff..61c6062 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/SparkPlatform.java +++ b/spark-common/src/main/java/me/lucko/spark/common/SparkPlatform.java @@ -53,6 +53,8 @@ import me.lucko.spark.common.tick.TickReporter; import me.lucko.spark.common.util.BytebinClient; import me.lucko.spark.common.util.Configuration; import me.lucko.spark.common.util.TemporaryFiles; +import me.lucko.spark.common.util.ws.BytesocksClient; +import me.lucko.spark.common.ws.TrustedKeyStore; import net.kyori.adventure.text.Component; import net.kyori.adventure.text.event.ClickEvent; @@ -95,6 +97,8 @@ public class SparkPlatform { private final Configuration configuration; private final String viewerUrl; private final BytebinClient bytebinClient; + private final BytesocksClient bytesocksClient; + private final TrustedKeyStore trustedKeyStore; private final boolean disableResponseBroadcast; private final List commandModules; private final List commands; @@ -118,8 +122,12 @@ public class SparkPlatform { this.configuration = new Configuration(this.plugin.getPluginDirectory().resolve("config.json")); this.viewerUrl = this.configuration.getString("viewerUrl", "https://spark.lucko.me/"); - String bytebinUrl = this.configuration.getString("bytebinUrl", "https://bytebin.lucko.me/"); + String bytebinUrl = this.configuration.getString("bytebinUrl", "https://spark-usercontent.lucko.me/"); + String bytesocksHost = this.configuration.getString("bytesocksHost", "spark-usersockets.lucko.me"); + this.bytebinClient = new BytebinClient(bytebinUrl, "spark-plugin"); + this.bytesocksClient = BytesocksClient.create(bytesocksHost, "spark-plugin"); + this.trustedKeyStore = new TrustedKeyStore(this.configuration); this.disableResponseBroadcast = this.configuration.getBoolean("disableResponseBroadcast", false); @@ -228,6 +236,14 @@ public class SparkPlatform { return this.bytebinClient; } + public BytesocksClient getBytesocksClient() { + return this.bytesocksClient; + } + + public TrustedKeyStore getTrustedKeyStore() { + return this.trustedKeyStore; + } + public boolean shouldBroadcastResponse() { return !this.disableResponseBroadcast; } diff --git a/spark-common/src/main/java/me/lucko/spark/common/command/modules/HeapAnalysisModule.java b/spark-common/src/main/java/me/lucko/spark/common/command/modules/HeapAnalysisModule.java index 5bd62a8..6ac3b2f 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/command/modules/HeapAnalysisModule.java +++ b/spark-common/src/main/java/me/lucko/spark/common/command/modules/HeapAnalysisModule.java @@ -32,6 +32,7 @@ import me.lucko.spark.common.heapdump.HeapDump; import me.lucko.spark.common.heapdump.HeapDumpSummary; import me.lucko.spark.common.util.Compression; import me.lucko.spark.common.util.FormatUtil; +import me.lucko.spark.common.util.MediaTypes; import me.lucko.spark.proto.SparkHeapProtos; import net.kyori.adventure.text.event.ClickEvent; @@ -52,7 +53,6 @@ import static net.kyori.adventure.text.format.NamedTextColor.GREEN; import static net.kyori.adventure.text.format.NamedTextColor.RED; public class HeapAnalysisModule implements CommandModule { - private static final String SPARK_HEAP_MEDIA_TYPE = "application/x-spark-heap"; @Override public void registerCommands(Consumer consumer) { @@ -97,7 +97,7 @@ public class HeapAnalysisModule implements CommandModule { saveToFile = true; } else { try { - String key = platform.getBytebinClient().postContent(output, SPARK_HEAP_MEDIA_TYPE).key(); + String key = platform.getBytebinClient().postContent(output, MediaTypes.SPARK_HEAP_MEDIA_TYPE).key(); String url = platform.getViewerUrl() + key; resp.broadcastPrefixed(text("Heap dump summmary output:", GOLD)); diff --git a/spark-common/src/main/java/me/lucko/spark/common/command/modules/SamplerModule.java b/spark-common/src/main/java/me/lucko/spark/common/command/modules/SamplerModule.java index 041cacf..049c817 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/command/modules/SamplerModule.java +++ b/spark-common/src/main/java/me/lucko/spark/common/command/modules/SamplerModule.java @@ -41,7 +41,10 @@ import me.lucko.spark.common.sampler.node.MergeMode; import me.lucko.spark.common.sampler.source.ClassSourceLookup; import me.lucko.spark.common.tick.TickHook; import me.lucko.spark.common.util.FormatUtil; +import me.lucko.spark.common.util.MediaTypes; import me.lucko.spark.common.util.MethodDisambiguator; +import me.lucko.spark.common.util.ws.BytesocksClient; +import me.lucko.spark.common.ws.ViewerSocket; import me.lucko.spark.proto.SparkSamplerProtos; import net.kyori.adventure.text.Component; @@ -68,7 +71,6 @@ import static net.kyori.adventure.text.format.NamedTextColor.RED; import static net.kyori.adventure.text.format.NamedTextColor.WHITE; public class SamplerModule implements CommandModule { - private static final String SPARK_SAMPLER_MEDIA_TYPE = "application/x-spark-sampler"; @Override public void registerCommands(Consumer consumer) { @@ -76,6 +78,7 @@ public class Samp