From ddb0097142a0ef2635f53695b0b92fda855adb4a Mon Sep 17 00:00:00 2001 From: Hannes Greule Date: Sat, 2 Nov 2024 16:21:59 +0100 Subject: Support --ignore-sleeping with async-profiler (#467) --- .../common/command/modules/SamplerModule.java | 2 - .../lucko/spark/common/sampler/SamplerBuilder.java | 38 ++--- .../spark/common/sampler/SamplerSettings.java | 8 +- .../sampler/aggregator/AbstractDataAggregator.java | 15 +- .../common/sampler/async/AsyncDataAggregator.java | 27 +++- .../spark/common/sampler/async/AsyncSampler.java | 2 +- .../spark/common/sampler/async/ProfileSegment.java | 18 ++- .../common/sampler/java/JavaDataAggregator.java | 22 +-- .../spark/common/sampler/java/JavaSampler.java | 12 +- .../common/sampler/java/SimpleDataAggregator.java | 51 ------- .../sampler/java/SimpleJavaDataAggregator.java | 51 +++++++ .../common/sampler/java/TickedDataAggregator.java | 168 --------------------- .../sampler/java/TickedJavaDataAggregator.java | 168 +++++++++++++++++++++ .../sampler/window/WindowStatisticsCollector.java | 3 +- 14 files changed, 314 insertions(+), 271 deletions(-) delete mode 100644 spark-common/src/main/java/me/lucko/spark/common/sampler/java/SimpleDataAggregator.java create mode 100644 spark-common/src/main/java/me/lucko/spark/common/sampler/java/SimpleJavaDataAggregator.java delete mode 100644 spark-common/src/main/java/me/lucko/spark/common/sampler/java/TickedDataAggregator.java create mode 100644 spark-common/src/main/java/me/lucko/spark/common/sampler/java/TickedJavaDataAggregator.java (limited to 'spark-common/src') diff --git a/spark-common/src/main/java/me/lucko/spark/common/command/modules/SamplerModule.java b/spark-common/src/main/java/me/lucko/spark/common/command/modules/SamplerModule.java index d65172b..d82ec63 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/command/modules/SamplerModule.java +++ b/spark-common/src/main/java/me/lucko/spark/common/command/modules/SamplerModule.java @@ -190,7 +190,6 @@ public class SamplerModule implements CommandModule { } boolean ignoreSleeping = arguments.boolFlag("ignore-sleeping"); - boolean ignoreNative = arguments.boolFlag("ignore-native"); boolean forceJavaSampler = arguments.boolFlag("force-java-sampler"); Set threads = arguments.stringFlag("thread"); @@ -239,7 +238,6 @@ public class SamplerModule implements CommandModule { } builder.samplingInterval(interval); builder.ignoreSleeping(ignoreSleeping); - builder.ignoreNative(ignoreNative); builder.forceJavaSampler(forceJavaSampler); builder.allocLiveOnly(allocLiveOnly); if (ticksOver != -1) { diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java index 3046d92..7f83f47d 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerBuilder.java @@ -29,6 +29,7 @@ import me.lucko.spark.common.tick.TickHook; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; +import java.util.logging.Level; /** * Builds {@link Sampler} instances. @@ -39,8 +40,7 @@ public class SamplerBuilder { private SamplerMode mode = SamplerMode.EXECUTION; private double samplingInterval = -1; private boolean ignoreSleeping = false; - private boolean ignoreNative = false; - private boolean useAsyncProfiler = true; + private boolean forceJavaSampler = false; private boolean allocLiveOnly = false; private long autoEndTime = -1; private boolean background = false; @@ -97,13 +97,8 @@ public class SamplerBuilder { return this; } - public SamplerBuilder ignoreNative(boolean ignoreNative) { - this.ignoreNative = ignoreNative; - return this; - } - public SamplerBuilder forceJavaSampler(boolean forceJavaSampler) { - this.useAsyncProfiler = !forceJavaSampler; + this.forceJavaSampler = forceJavaSampler; return this; } @@ -117,14 +112,23 @@ public class SamplerBuilder { throw new IllegalArgumentException("samplingInterval = " + this.samplingInterval); } + boolean canUseAsyncProfiler = AsyncProfilerAccess.getInstance(platform).checkSupported(platform); boolean onlyTicksOverMode = this.ticksOver != -1 && this.tickHook != null; - boolean canUseAsyncProfiler = this.useAsyncProfiler && - !onlyTicksOverMode && - !(this.ignoreSleeping || this.ignoreNative) && - AsyncProfilerAccess.getInstance(platform).checkSupported(platform); - if (this.mode == SamplerMode.ALLOCATION && (!canUseAsyncProfiler || !AsyncProfilerAccess.getInstance(platform).checkAllocationProfilingSupported(platform))) { - throw new UnsupportedOperationException("Allocation profiling is not supported on your system. Check the console for more info."); + if (this.mode == SamplerMode.ALLOCATION) { + if (!canUseAsyncProfiler || !AsyncProfilerAccess.getInstance(platform).checkAllocationProfilingSupported(platform)) { + throw new UnsupportedOperationException("Allocation profiling is not supported on your system. Check the console for more info."); + } + if (this.ignoreSleeping) { + platform.getPlugin().log(Level.WARNING, "Ignoring sleeping threads is not supported in allocation profiling mode. Sleeping threads will be included in the results."); + } + if (onlyTicksOverMode) { + platform.getPlugin().log(Level.WARNING, "'Only-ticks-over' is not supported in allocation profiling mode."); + } + } + + if (onlyTicksOverMode || this.forceJavaSampler) { + canUseAsyncProfiler = false; } int interval = (int) (this.mode == SamplerMode.EXECUTION ? @@ -132,7 +136,7 @@ public class SamplerBuilder { this.samplingInterval ); - SamplerSettings settings = new SamplerSettings(interval, this.threadDumper, this.threadGrouper.get(), this.autoEndTime, this.background); + SamplerSettings settings = new SamplerSettings(interval, this.threadDumper, this.threadGrouper.get(), this.autoEndTime, this.background, this.ignoreSleeping); Sampler sampler; if (this.mode == SamplerMode.ALLOCATION) { @@ -140,9 +144,9 @@ public class SamplerBuilder { } else if (canUseAsyncProfiler) { sampler = new AsyncSampler(platform, settings, new SampleCollector.Execution(interval)); } else if (onlyTicksOverMode) { - sampler = new JavaSampler(platform, settings, this.ignoreSleeping, this.ignoreNative, this.tickHook, this.ticksOver); + sampler = new JavaSampler(platform, settings, this.tickHook, this.ticksOver); } else { - sampler = new JavaSampler(platform, settings, this.ignoreSleeping, this.ignoreNative); + sampler = new JavaSampler(platform, settings); } sampler.start(); diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerSettings.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerSettings.java index 6e55a43..dc28d2a 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerSettings.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/SamplerSettings.java @@ -30,13 +30,15 @@ public class SamplerSettings { private final ThreadGrouper threadGrouper; private final long autoEndTime; private final boolean runningInBackground; + private final boolean ignoreSleeping; - public SamplerSettings(int interval, ThreadDumper threadDumper, ThreadGrouper threadGrouper, long autoEndTime, boolean runningInBackground) { + public SamplerSettings(int interval, ThreadDumper threadDumper, ThreadGrouper threadGrouper, long autoEndTime, boolean runningInBackground, boolean ignoreSleeping) { this.interval = interval; this.threadDumper = threadDumper; this.threadGrouper = threadGrouper; this.autoEndTime = autoEndTime; this.runningInBackground = runningInBackground; + this.ignoreSleeping = ignoreSleeping; } public int interval() { @@ -58,4 +60,8 @@ public class SamplerSettings { public boolean runningInBackground() { return this.runningInBackground; } + + public boolean ignoreSleeping() { + return this.ignoreSleeping; + } } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/AbstractDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/AbstractDataAggregator.java index 2c003e5..744ad41 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/AbstractDataAggregator.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/aggregator/AbstractDataAggregator.java @@ -40,8 +40,12 @@ public abstract class AbstractDataAggregator implements DataAggregator { /** The instance used to group threads together */ protected final ThreadGrouper threadGrouper; - protected AbstractDataAggregator(ThreadGrouper threadGrouper) { + /** If sleeping threads should be ignored */ + protected final boolean ignoreSleeping; + + protected AbstractDataAggregator(ThreadGrouper threadGrouper, boolean ignoreSleeping) { this.threadGrouper = threadGrouper; + this.ignoreSleeping = ignoreSleeping; } protected ThreadNode getNode(String group) { @@ -65,4 +69,13 @@ public abstract class AbstractDataAggregator implements DataAggregator { } return data; } + + protected static boolean isSleeping(String clazz, String method) { + // java.lang.Thread.yield() + // jdk.internal.misc.Unsafe.park() + // sun.misc.Unsafe.park() + return (clazz.equals("java.lang.Thread") && method.equals("yield")) || + (clazz.equals("jdk.internal.misc.Unsafe") && method.equals("park")) || + (clazz.equals("sun.misc.Unsafe") && method.equals("park")); + } } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncDataAggregator.java index 484493a..0a72d99 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncDataAggregator.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncDataAggregator.java @@ -35,8 +35,8 @@ public class AsyncDataAggregator extends AbstractDataAggregator { private static final StackTraceNode.Describer STACK_TRACE_DESCRIBER = (element, parent) -> new StackTraceNode.AsyncDescription(element.getClassName(), element.getMethodName(), element.getMethodDescription()); - protected AsyncDataAggregator(ThreadGrouper threadGrouper) { - super(threadGrouper); + protected AsyncDataAggregator(ThreadGrouper threadGrouper, boolean ignoreSleeping) { + super(threadGrouper, ignoreSleeping); } @Override @@ -48,6 +48,9 @@ public class AsyncDataAggregator extends AbstractDataAggregator { } public void insertData(ProfileSegment element, int window) { + if (this.ignoreSleeping && isSleeping(element)) { + return; + } try { ThreadNode node = getNode(this.threadGrouper.getGroup(element.getNativeThreadId(), element.getThreadName())); node.log(STACK_TRACE_DESCRIBER, element.getStackTrace(), element.getValue(), window); @@ -56,4 +59,24 @@ public class AsyncDataAggregator extends AbstractDataAggregator { } } + private static boolean isSleeping(ProfileSegment element) { + // thread states written by async-profiler: + // https://github.com/async-profiler/async-profiler/blob/116504c9f75721911b2f561e29eda065c224caf6/src/flightRecorder.cpp#L1017-L1023 + String threadState = element.getThreadState(); + if (threadState.equals("STATE_SLEEPING")) { + return true; + } + + // async-profiler includes native frames - let's check more than just the top frame + AsyncStackTraceElement[] stackTrace = element.getStackTrace(); + for (int i = 0; i < Math.min(3, stackTrace.length); i++) { + String clazz = stackTrace[i].getClassName(); + String method = stackTrace[i].getMethodName(); + if (isSleeping(clazz, method)) { + return true; + } + } + return false; + } + } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java index 3d17948..172e29e 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java @@ -69,7 +69,7 @@ public class AsyncSampler extends AbstractSampler { super(platform, settings); this.sampleCollector = collector; this.profilerAccess = AsyncProfilerAccess.getInstance(platform); - this.dataAggregator = new AsyncDataAggregator(settings.threadGrouper()); + this.dataAggregator = new AsyncDataAggregator(settings.threadGrouper(), settings.ignoreSleeping()); this.scheduler = Executors.newSingleThreadScheduledExecutor( new ThreadFactoryBuilder() .setNameFormat("spark-async-sampler-worker-thread") diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/ProfileSegment.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/ProfileSegment.java index f20c969..3cef7d3 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/ProfileSegment.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/ProfileSegment.java @@ -31,6 +31,8 @@ import java.nio.charset.StandardCharsets; */ public class ProfileSegment { + private static final String UNKNOWN_THREAD_STATE = ""; + /** The native thread id (does not correspond to Thread#getId) */ private final int nativeThreadId; /** The name of the thread */ @@ -39,12 +41,15 @@ public class ProfileSegment { private final AsyncStackTraceElement[] stackTrace; /** The time spent executing this segment in microseconds */ private final long value; + /** The state of the thread. {@value #UNKNOWN_THREAD_STATE} if state is unknown */ + private final String threadState; - public ProfileSegment(int nativeThreadId, String threadName, AsyncStackTraceElement[] stackTrace, long value) { + private ProfileSegment(int nativeThreadId, String threadName, AsyncStackTraceElement[] stackTrace, long value, String threadState) { this.nativeThreadId = nativeThreadId; this.threadName = threadName; this.stackTrace = stackTrace; this.value = value; + this.threadState = threadState; } public int getNativeThreadId() { @@ -63,6 +68,10 @@ public class ProfileSegment { return this.value; } + public String getThreadState() { + return this.threadState; + } + public static ProfileSegment parseSegment(JfrReader reader, JfrReader.Event sample, String threadName, long value) { JfrReader.StackTrace stackTrace = reader.stackTraces.get(sample.stackTraceId); int len = stackTrace != null ? stackTrace.methods.length : 0; @@ -71,8 +80,13 @@ public class ProfileSegment { for (int i = 0; i < len; i++) { stack[i] = parseStackFrame(reader, stackTrace.methods[i]); } + String threadState = UNKNOWN_THREAD_STATE; + if (sample instanceof JfrReader.ExecutionSample) { + JfrReader.ExecutionSample executionSample = (JfrReader.ExecutionSample) sample; + threadState = reader.threadStates.get(executionSample.threadState); + } - return new ProfileSegment(sample.tid, threadName, stack, value); + return new ProfileSegment(sample.tid, threadName, stack, value, threadState); } private static AsyncStackTraceElement parseStackFrame(JfrReader reader, long methodId) { diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaDataAggregator.java index 5b6a470..b2f250f 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaDataAggregator.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaDataAggregator.java @@ -48,18 +48,10 @@ public abstract class JavaDataAggregator extends AbstractDataAggregator { /** The interval to wait between sampling, in microseconds */ protected final int interval; - /** If sleeping threads should be ignored */ - private final boolean ignoreSleeping; - - /** If threads executing native code should be ignored */ - private final boolean ignoreNative; - - public JavaDataAggregator(ExecutorService workerPool, ThreadGrouper threadGrouper, int interval, boolean ignoreSleeping, boolean ignoreNative) { - super(threadGrouper); + public JavaDataAggregator(ExecutorService workerPool, ThreadGrouper threadGrouper, int interval, boolean ignoreSleeping) { + super(threadGrouper, ignoreSleeping); this.workerPool = workerPool; this.interval = interval; - this.ignoreSleeping = ignoreSleeping; - this.ignoreNative = ignoreNative; } /** @@ -74,9 +66,6 @@ public abstract class JavaDataAggregator extends AbstractDataAggregator { if (this.ignoreSleeping && isSleeping(threadInfo)) { return; } - if (this.ignoreNative && threadInfo.isInNative()) { - return; - } try { ThreadNode node = getNode(this.threadGrouper.getGroup(threadInfo.getThreadId(), threadInfo.getThreadName())); @@ -113,12 +102,7 @@ public abstract class JavaDataAggregator extends AbstractDataAggregator { String clazz = call.getClassName(); String method = call.getMethodName(); - // java.lang.Thread.yield() - // jdk.internal.misc.Unsafe.park() - // sun.misc.Unsafe.park() - return (clazz.equals("java.lang.Thread") && method.equals("yield")) || - (clazz.equals("jdk.internal.misc.Unsafe") && method.equals("park")) || - (clazz.equals("sun.misc.Unsafe") && method.equals("park")); + return isSleeping(clazz, method); } } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java index 050c5b4..df99ee0 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java @@ -73,14 +73,14 @@ public class JavaSampler extends AbstractSampler implements Runnable { /** The last window that was profiled */ private final AtomicInteger lastWindow = new AtomicInteger(); - public JavaSampler(SparkPlatform platform, SamplerSettings settings, boolean ignoreSleeping, boolean ignoreNative) { + public JavaSampler(SparkPlatform platform, SamplerSettings settings) { super(platform, settings); - this.dataAggregator = new SimpleDataAggregator(this.workerPool, settings.threadGrouper(), settings.interval(), ignoreSleeping, ignoreNative); + this.dataAggregator = new SimpleJavaDataAggregator(this.workerPool, settings.threadGrouper(), settings.interval(), settings.ignoreSleeping()); } - public JavaSampler(SparkPlatform platform, SamplerSettings settings, boolean ignoreSleeping, boolean ignoreNative, TickHook tickHook, int tickLengthThreshold) { + public JavaSampler(SparkPlatform platform, SamplerSettings settings, TickHook tickHook, int tickLengthThreshold) { super(platform, settings); - this.dataAggregator = new TickedDataAggregator(this.workerPool, settings.threadGrouper(), settings.interval(), ignoreSleeping, ignoreNative, tickHook, tickLengthThreshold); + this.dataAggregator = new TickedJavaDataAggregator(this.workerPool, settings.threadGrouper(), settings.interval(), settings.ignoreSleeping(), tickHook, tickLengthThreshold); } @Override @@ -89,9 +89,9 @@ public class JavaSampler extends AbstractSampler implements Runnable { TickHook tickHook = this.platform.getTickHook(); if (tickHook != null) { - if (this.dataAggregator instanceof TickedDataAggregator) { + if (this.dataAggregator instanceof TickedJavaDataAggregator) { WindowStatisticsCollector.ExplicitTickCounter counter = this.windowStatisticsCollector.startCountingTicksExplicit(tickHook); - ((TickedDataAggregator) this.dataAggregator).setTickCounter(counter); + ((TickedJavaDataAggregator) this.dataAggregator).setTickCounter(counter); } else { this.windowStatisticsCollector.startCountingTicks(tickHook); } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/SimpleDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/SimpleDataAggregator.java deleted file mode 100644 index 54173fe..0000000 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/SimpleDataAggregator.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * This file is part of spark. - * - * Copyright (c) lucko (Luck) - * Copyright (c) contributors - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ - -package me.lucko.spark.common.sampler.java; - -import me.lucko.spark.common.sampler.ThreadGrouper; -import me.lucko.spark.common.sampler.aggregator.DataAggregator; -import me.lucko.spark.proto.SparkSamplerProtos.SamplerMetadata; - -import java.lang.management.ThreadInfo; -import java.util.concurrent.ExecutorService; - -/** - * Basic implementation of {@link DataAggregator}. - */ -public class SimpleDataAggregator extends JavaDataAggregator { - public SimpleDataAggregator(ExecutorService workerPool, ThreadGrouper threadGrouper, int interval, boolean ignoreSleeping, boolean ignoreNative) { - super(workerPool, threadGrouper, interval, ignoreSleeping, ignoreNative); - } - - @Override - public SamplerMetadata.DataAggregator getMetadata() { - return SamplerMetadata.DataAggregator.newBuilder() - .setType(SamplerMetadata.DataAggregator.Type.SIMPLE) - .setThreadGrouper(this.threadGrouper.asProto()) - .build(); - } - - @Override - public void insertData(ThreadInfo threadInfo, int window) { - writeData(threadInfo, window); - } - -} diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/SimpleJavaDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/SimpleJavaDataAggregator.java new file mode 100644 index 0000000..461e34c --- /dev/null +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/SimpleJavaDataAggregator.java @@ -0,0 +1,51 @@ +/* + * This file is part of spark. + * + * Copyright (c) lucko (Luck) + * Copyright (c) contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +package me.lucko.spark.common.sampler.java; + +import me.lucko.spark.common.sampler.ThreadGrouper; +import me.lucko.spark.common.sampler.aggregator.DataAggregator; +import me.lucko.spark.proto.SparkSamplerProtos.SamplerMetadata; + +import java.lang.management.ThreadInfo; +import java.util.concurrent.ExecutorService; + +/** + * Basic implementation of {@link DataAggregator}. + */ +public class SimpleJavaDataAggregator extends JavaDataAggregator { + public SimpleJavaDataAggregator(ExecutorService workerPool, ThreadGrouper threadGrouper, int interval, boolean ignoreSleeping) { + super(workerPool, threadGrouper, interval, ignoreSleeping); + } + + @Override + public SamplerMetadata.DataAggregator getMetadata() { + return SamplerMetadata.DataAggregator.newBuilder() + .setType(SamplerMetadata.DataAggregator.Type.SIMPLE) + .setThreadGrouper(this.threadGrouper.asProto()) + .build(); + } + + @Override + public void insertData(ThreadInfo threadInfo, int window) { + writeData(threadInfo, window); + } + +} diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/TickedDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/TickedDataAggregator.java deleted file mode 100644 index f24af3f..0000000 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/TickedDataAggregator.java +++ /dev/null @@ -1,168 +0,0 @@ -/* - * This file is part of spark. - * - * Copyright (c) lucko (Luck) - * Copyright (c) contributors - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ - -package me.lucko.spark.common.sampler.java; - -import me.lucko.spark.common.sampler.ThreadGrouper; -import me.lucko.spark.common.sampler.aggregator.DataAggregator; -import me.lucko.spark.common.sampler.node.ThreadNode; -import me.lucko.spark.common.sampler.window.WindowStatisticsCollector; -import me.lucko.spark.common.tick.TickHook; -import me.lucko.spark.proto.SparkSamplerProtos.SamplerMetadata; - -import java.lang.management.ThreadInfo; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; - -/** - * Implementation of {@link DataAggregator} which supports only including sampling data from "ticks" - * which exceed a certain threshold in duration. - */ -public class TickedDataAggregator extends JavaDataAggregator { - - /** Used to monitor the current "tick" of the server */ - private final TickHook tickHook; - - /** Tick durations under this threshold will not be inserted, measured in microseconds */ - private final long tickLengthThreshold; - - /** The expected number of samples in each tick */ - private final int expectedSize; - - /** Counts the number of ticks aggregated */ - private WindowStatisticsCollector.ExplicitTickCounter tickCounter; - - // state - private int currentTick = -1; - private TickList currentData = null; - - // guards currentData - private final Object mutex = new Object(); - - public TickedDataAggregator(ExecutorService workerPool, ThreadGrouper threadGrouper, int interval, boolean ignoreSleeping, boolean ignoreNative, TickHook tickHook, int tickLengthThreshold) { - super(workerPool, threadGrouper, interval, ignoreSleeping, ignoreNative); - this.tickHook = tickHook; - this.tickLengthThreshold = TimeUnit.MILLISECONDS.toMicros(tickLengthThreshold); - // 50 millis in a tick, plus 10 so we have a bit of room to go over - double intervalMilliseconds = interval / 1000d; - this.expectedSize = (int) ((50 / intervalMilliseconds) + 10); - } - - public void setTickCounter(WindowStatisticsCollector.ExplicitTickCounter tickCounter) { - this.tickCounter = tickCounter; - } - - @Override - public SamplerMetadata.DataAggregator getMetadata() { - // push the current tick (so numberOfTicks is accurate) - synchronized (this.mutex) { - pushCurrentTick(Runnable::run); - this.currentData = null; - } - - return SamplerMetadata.DataAggregator.newBuilder() - .setType(SamplerMetadata.DataAggregator.Type.TICKED) - .setThreadGrouper(this.threadGrouper.asProto()) - .setTickLengthThreshold(this.tickLengthThreshold) - .setNumberOfIncludedTicks(this.tickCounter.getTotalCountedTicks()) - .build(); - } - - @Override - public void insertData(ThreadInfo threadInfo, int window) { - synchronized (this.mutex) { - int tick = this.tickHook.getCurrentTick(); - if (this.currentTick != tick || this.currentData == null) { - pushCurrentTick(this.workerPool); - this.currentTick = tick; - this.currentData = new TickList(this.expectedSize, window); - } - - this.currentData.addData(threadInfo); - } - } - - // guarded by 'mutex' - private void pushCurrentTick(Executor executor) { - TickList currentData = this.currentData; - if (currentData == null) { - return; - } - - // approximate how long the tick lasted - int tickLengthMicros = currentData.sizeWithoutTrailingSleeping() * this.interval; - - // don't push data below the threshold - if (tickLengthMicros < this.tickLengthThreshold) { - return; - } - - executor.execute(currentData); - this.tickCounter.increment(); - } - - @Override - public List exportData() { - // push the current tick - synchronized (this.mutex) { - pushCurrentTick(Runnable::run); - } - - return super.exportData(); - } - - private final class TickList implements Runnable { - private final List list; - private final int window; - - TickList(int expectedSize, int window) { - this.list = new ArrayList<>(expectedSize); - this.window = window; - } - - @Override - public void run() { - for (ThreadInfo data : this.list) { - writeData(data, this.window); - } - } - - public List getList() { - return this.list; - } - - public int sizeWithoutTrailingSleeping() { - // find the last index at which the thread wasn't sleeping - for (int i = this.list.size() - 1; i >= 0; i--) { - if (!isSleeping(this.list.get(i))) { - return i + 1; // add one to go from index to size - } - } - return 0; - } - - public void addData(ThreadInfo data) { - this.list.add(data); - } - } -} diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/TickedJavaDataAggregator.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/TickedJavaDataAggregator.java new file mode 100644 index 0000000..c950648 --- /dev/null +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/TickedJavaDataAggregator.java @@ -0,0 +1,168 @@ +/* + * This file is part of spark. + * + * Copyright (c) lucko (Luck) + * Copyright (c) contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +package me.lucko.spark.common.sampler.java; + +import me.lucko.spark.common.sampler.ThreadGrouper; +import me.lucko.spark.common.sampler.aggregator.DataAggregator; +import me.lucko.spark.common.sampler.node.ThreadNode; +import me.lucko.spark.common.sampler.window.WindowStatisticsCollector; +import me.lucko.spark.common.tick.TickHook; +import me.lucko.spark.proto.SparkSamplerProtos.SamplerMetadata; + +import java.lang.management.ThreadInfo; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * Implementation of {@link DataAggregator} which supports only including sampling data from "ticks" + * which exceed a certain threshold in duration. + */ +public class TickedJavaDataAggregator extends JavaDataAggregator { + + /** Used to monitor the current "tick" of the server */ + private final TickHook tickHook; + + /** Tick durations under this threshold will not be inserted, measured in microseconds */ + private final long tickLengthThreshold; + + /** The expected number of samples in each tick */ + private final int expectedSize; + + /** Counts the number of ticks aggregated */ + private WindowStatisticsCollector.ExplicitTickCounter tickCounter; + + // state + private int currentTick = -1; + private TickList currentData = null; + + // guards currentData + private final Object mutex = new Object(); + + public TickedJavaDataAggregator(ExecutorService workerPool, ThreadGrouper threadGrouper, int interval, boolean ignoreSleeping, TickHook tickHook, int tickLengthThreshold) { + super(workerPool, threadGrouper, interval, ignoreSleeping); + this.tickHook = tickHook; + this.tickLengthThreshold = TimeUnit.MILLISECONDS.toMicros(tickLengthThreshold); + // 50 millis in a tick, plus 10 so we have a bit of room to go over + double intervalMilliseconds = interval / 1000d; + this.expectedSize = (int) ((50 / intervalMilliseconds) + 10); + } + + public void setTickCounter(WindowStatisticsCollector.ExplicitTickCounter tickCounter) { + this.tickCounter = tickCounter; + } + + @Override + public SamplerMetadata.DataAggregator getMetadata() { + // push the current tick (so numberOfTicks is accurate) + synchronized (this.mutex) { + pushCurrentTick(Runnable::run); + this.currentData = null; + } + + return SamplerMetadata.DataAggregator.newBuilder() + .setType(SamplerMetadata.DataAggregator.Type.TICKED) + .setThreadGrouper(this.threadGrouper.asProto()) + .setTickLengthThreshold(this.tickLengthThreshold) + .setNumberOfIncludedTicks(this.tickCounter.getTotalCountedTicks()) + .build(); + } + + @Override + public void insertData(ThreadInfo threadInfo, int window) { + synchronized (this.mutex) { + int tick = this.tickHook.getCurrentTick(); + if (this.currentTick != tick || this.currentData == null) { + pushCurrentTick(this.workerPool); + this.currentTick = tick; + this.currentData = new TickList(this.expectedSize, window); + } + + this.currentData.addData(threadInfo); + } + } + + // guarded by 'mutex' + private void pushCurrentTick(Executor executor) { + TickList currentData = this.currentData; + if (currentData == null) { + return; + } + + // approximate how long the tick lasted + int tickLengthMicros = currentData.sizeWithoutTrailingSleeping() * this.interval; + + // don't push data below the threshold + if (tickLengthMicros < this.tickLengthThreshold) { + return; + } + + executor.execute(currentData); + this.tickCounter.increment(); + } + + @Override + public List exportData() { + // push the current tick + synchronized (this.mutex) { + pushCurrentTick(Runnable::run); + } + + return super.exportData(); + } + + private final class TickList implements Runnable { + private final List list; + private final int window; + + TickList(int expectedSize, int window) { + this.list = new ArrayList<>(expectedSize); + this.window = window; + } + + @Override + public void run() { + for (ThreadInfo data : this.list) { + writeData(data, this.window); + } + } + + public List getList() { + return this.list; + } + + public int sizeWithoutTrailingSleeping() { + // find the last index at which the thread wasn't sleeping + for (int i = this.list.size() - 1; i >= 0; i--) { + if (!isSleeping(this.list.get(i))) { + return i + 1; // add one to go from index to size + } + } + return 0; + } + + public void addData(ThreadInfo data) { + this.list.add(data); + } + } +} diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/window/WindowStatisticsCollector.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/window/WindowStatisticsCollector.java index 0ef2eb3..d561cdf 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/window/WindowStatisticsCollector.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/window/WindowStatisticsCollector.java @@ -26,6 +26,7 @@ import me.lucko.spark.common.monitor.cpu.CpuMonitor; import me.lucko.spark.common.monitor.tick.TickStatistics; import me.lucko.spark.common.platform.world.AsyncWorldInfoProvider; import me.lucko.spark.common.platform.world.WorldInfoProvider; +import me.lucko.spark.common.sampler.java.TickedJavaDataAggregator; import me.lucko.spark.common.tick.TickHook; import me.lucko.spark.proto.SparkProtos; @@ -287,7 +288,7 @@ public class WindowStatisticsCollector { * Counts the number of ticks in a window according to the number of times * {@link #increment()} is called. * - * Used by the {@link me.lucko.spark.common.sampler.java.TickedDataAggregator}. + * Used by the {@link TickedJavaDataAggregator}. */ public static final class ExplicitTickCounter extends BaseTickCounter { private final AtomicInteger counted = new AtomicInteger(); -- cgit