diff options
Diffstat (limited to 'spark-common/src/main/java/me/lucko/spark/common/sampler')
5 files changed, 181 insertions, 19 deletions
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/AbstractSampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/AbstractSampler.java index 5abe71f..d814002 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/AbstractSampler.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/AbstractSampler.java @@ -32,9 +32,12 @@ import me.lucko.spark.common.sampler.source.ClassSourceLookup; import me.lucko.spark.common.sampler.source.SourceMetadata; import me.lucko.spark.common.sampler.window.ProtoTimeEncoder; import me.lucko.spark.common.sampler.window.WindowStatisticsCollector; +import me.lucko.spark.common.ws.ViewerSocket; +import me.lucko.spark.proto.SparkProtos; import me.lucko.spark.proto.SparkSamplerProtos.SamplerData; import me.lucko.spark.proto.SparkSamplerProtos.SamplerMetadata; +import java.util.ArrayList; import java.util.Collection; import java.util.Comparator; import java.util.List; @@ -74,6 +77,9 @@ public abstract class AbstractSampler implements Sampler { /** The garbage collector statistics when profiling started */ protected Map<String, GarbageCollectorStatistics> initialGcStats; + /** A set of viewer sockets linked to the sampler */ + protected List<ViewerSocket> viewerSockets = new ArrayList<>(); + protected AbstractSampler(SparkPlatform platform, SamplerSettings settings) { this.platform = platform; this.interval = settings.interval(); @@ -122,13 +128,54 @@ public abstract class AbstractSampler implements Sampler { @Override public void stop(boolean cancelled) { this.windowStatisticsCollector.stop(); + for (ViewerSocket viewerSocket : this.viewerSockets) { + viewerSocket.processSamplerStopped(this); + } + } + + @Override + public void attachSocket(ViewerSocket socket) { + this.viewerSockets.add(socket); + } + + @Override + public Collection<ViewerSocket> getAttachedSockets() { + return this.viewerSockets; + } + + protected void processWindowRotate() { + this.viewerSockets.removeIf(socket -> { + if (!socket.isOpen()) { + return true; + } + + socket.processWindowRotate(this); + return false; + }); + } + + protected void sendStatisticsToSocket() { + try { + if (this.viewerSockets.isEmpty()) { + return; + } + + SparkProtos.PlatformStatistics platform = this.platform.getStatisticsProvider().getPlatformStatistics(getInitialGcStats(), false); + SparkProtos.SystemStatistics system = this.platform.getStatisticsProvider().getSystemStatistics(); + + for (ViewerSocket viewerSocket : this.viewerSockets) { + viewerSocket.sendUpdatedStatistics(platform, system); + } + } catch (Exception e) { + e.printStackTrace(); + } } - protected void writeMetadataToProto(SamplerData.Builder proto, SparkPlatform platform, CommandSender creator, String comment, DataAggregator dataAggregator) { + protected void writeMetadataToProto(SamplerData.Builder proto, SparkPlatform platform, CommandSender.Data creator, String comment, DataAggregator dataAggregator) { SamplerMetadata.Builder metadata = SamplerMetadata.newBuilder() .setSamplerMode(getMode().asProto()) .setPlatformMetadata(platform.getPlugin().getPlatformInfo().toData().toProto()) - .setCreator(creator.toData().toProto()) + .setCreator(creator.toProto()) .setStartTime(this.startTime) .setEndTime(System.currentTimeMillis()) .setInterval(this.interval) @@ -145,7 +192,7 @@ public abstract class AbstractSampler implements Sampler { } try { - metadata.setPlatformStatistics(platform.getStatisticsProvider().getPlatformStatistics(getInitialGcStats())); + metadata.setPlatformStatistics(platform.getStatisticsProvider().getPlatformStatistics(getInitialGcStats(), true)); } catch (Exception e) { e.printStackTrace(); } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java index aaf4f38..844ab0b 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java @@ -24,9 +24,13 @@ import me.lucko.spark.common.SparkPlatform; import me.lucko.spark.common.command.sender.CommandSender; import me.lucko.spark.common.sampler.node.MergeMode; import me.lucko.spark.common.sampler.source.ClassSourceLookup; +import me.lucko.spark.common.ws.ViewerSocket; import me.lucko.spark.proto.SparkSamplerProtos.SamplerData; +import me.lucko.spark.proto.SparkSamplerProtos.SocketChannelInfo; +import java.util.Collection; import java.util.concurrent.CompletableFuture; +import java.util.function.Supplier; /** * Abstract superinterface for all sampler implementations. @@ -44,6 +48,20 @@ public interface Sampler { void stop(boolean cancelled); /** + * Attaches a viewer socket to this sampler. + * + * @param socket the socket + */ + void attachSocket(ViewerSocket socket); + + /** + * Gets the sockets attached to this sampler. + * + * @return the attached sockets + */ + Collection<ViewerSocket> getAttachedSockets(); + + /** * Gets the time when the sampler started (unix timestamp in millis) * * @return the start time @@ -79,6 +97,62 @@ public interface Sampler { CompletableFuture<Sampler> getFuture(); // Methods used to export the sampler data to the web viewer. - SamplerData toProto(SparkPlatform platform, CommandSender creator, String comment, MergeMode mergeMode, ClassSourceLookup classSourceLookup); + SamplerData toProto(SparkPlatform platform, ExportProps exportProps); + + final class ExportProps { + private CommandSender.Data creator; + private String comment; + private Supplier<MergeMode> mergeMode; + private Supplier<ClassSourceLookup> classSourceLookup; + private SocketChannelInfo channelInfo; + + public ExportProps() { + } + + public CommandSender.Data creator() { + return this.creator; + } + + public String comment() { + return this.comment; + } + + public Supplier<MergeMode> mergeMode() { + return this.mergeMode; + } + + public Supplier<ClassSourceLookup> classSourceLookup() { + return this.classSourceLookup; + } + + public SocketChannelInfo channelInfo() { + return this.channelInfo; + } + + public ExportProps creator(CommandSender.Data creator) { + this.creator = creator; + return this; + } + + public ExportProps comment(String comment) { + this.comment = comment; + return this; + } + + public ExportProps mergeMode(Supplier<MergeMode> mergeMode) { + this.mergeMode = mergeMode; + return this; + } + + public ExportProps classSourceLookup(Supplier<ClassSourceLookup> classSourceLookup) { + this.classSourceLookup = classSourceLookup; + return this; + } + + public ExportProps channelInfo(SocketChannelInfo channelInfo) { + this.channelInfo = channelInfo; + return this; + } + } } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java index 2328582..ec88677 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java @@ -23,18 +23,17 @@ package me.lucko.spark.common.sampler.async; import com.google.common.util.concurrent.ThreadFactoryBuilder; import me.lucko.spark.common.SparkPlatform; -import me.lucko.spark.common.command.sender.CommandSender; import me.lucko.spark.common.sampler.AbstractSampler; import me.lucko.spark.common.sampler.SamplerMode; import me.lucko.spark.common.sampler.SamplerSettings; -import me.lucko.spark.common.sampler.node.MergeMode; -import me.lucko.spark.common.sampler.source.ClassSourceLookup; import me.lucko.spark.common.sampler.window.ProfilingWindowUtils; import me.lucko.spark.common.tick.TickHook; +import me.lucko.spark.common.ws.ViewerSocket; import me.lucko.spark.proto.SparkSamplerProtos.SamplerData; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.function.IntPredicate; @@ -61,6 +60,9 @@ public class AsyncSampler extends AbstractSampler { /** The executor used for scheduling and management */ private ScheduledExecutorService scheduler; + /** The task to send statistics to the viewer socket */ + private ScheduledFuture<?> socketStatisticsTask; + public AsyncSampler(SparkPlatform platform, SamplerSettings settings, SampleCollector<?> collector) { super(platform, settings); this.sampleCollector = collector; @@ -143,6 +145,8 @@ public class AsyncSampler extends AbstractSampler { IntPredicate predicate = ProfilingWindowUtils.keepHistoryBefore(window); this.dataAggregator.pruneData(predicate); this.windowStatisticsCollector.pruneStatistics(predicate); + + this.scheduler.execute(this::processWindowRotate); } } catch (Throwable e) { e.printStackTrace(); @@ -183,6 +187,10 @@ public class AsyncSampler extends AbstractSampler { this.currentJob = null; } + if (this.socketStatisticsTask != null) { + this.socketStatisticsTask.cancel(false); + } + if (this.scheduler != null) { this.scheduler.shutdown(); this.scheduler = null; @@ -190,15 +198,27 @@ public class AsyncSampler extends AbstractSampler { } @Override + public void attachSocket(ViewerSocket socket) { + super.attachSocket(socket); + + if (this.socketStatisticsTask == null) { + this.socketStatisticsTask = this.scheduler.scheduleAtFixedRate(this::sendStatisticsToSocket, 10, 10, TimeUnit.SECONDS); + } + } + + @Override public SamplerMode getMode() { return this.sampleCollector.getMode(); } @Override - public SamplerData toProto(SparkPlatform platform, CommandSender creator, String comment, MergeMode mergeMode, ClassSourceLookup classSourceLookup) { + public SamplerData toProto(SparkPlatform platform, ExportProps exportProps) { SamplerData.Builder proto = SamplerData.newBuilder(); - writeMetadataToProto(proto, platform, creator, comment, this.dataAggregator); - writeDataToProto(proto, this.dataAggregator, mergeMode, classSourceLookup); + if (exportProps.channelInfo() != null) { + proto.setChannelInfo(exportProps.channelInfo()); + } + writeMetadataToProto(proto, platform, exportProps.creator(), exportProps.comment(), this.dataAggregator); + writeDataToProto(proto, this.dataAggregator, exportProps.mergeMode().get(), exportProps.classSourceLookup().get()); return proto.build(); } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java index d5c965f..e9d7e0d 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java @@ -23,15 +23,13 @@ package me.lucko.spark.common.sampler.java; import com.google.common.util.concurrent.ThreadFactoryBuilder; import me.lucko.spark.common.SparkPlatform; -import me.lucko.spark.common.command.sender.CommandSender; import me.lucko.spark.common.sampler.AbstractSampler; import me.lucko.spark.common.sampler.SamplerMode; import me.lucko.spark.common.sampler.SamplerSettings; -import me.lucko.spark.common.sampler.node.MergeMode; -import me.lucko.spark.common.sampler.source.ClassSourceLookup; import me.lucko.spark.common.sampler.window.ProfilingWindowUtils; import me.lucko.spark.common.sampler.window.WindowStatisticsCollector; import me.lucko.spark.common.tick.TickHook; +import me.lucko.spark.common.ws.ViewerSocket; import me.lucko.spark.proto.SparkSamplerProtos.SamplerData; import java.lang.management.ManagementFactory; @@ -58,6 +56,9 @@ public class JavaSampler extends AbstractSampler implements Runnable { /** The main sampling task */ private ScheduledFuture<?> task; + /** The task to send statistics to the viewer socket */ + private ScheduledFuture<?> socketStatisticsTask; + /** The thread management interface for the current JVM */ private final ThreadMXBean threadBean = ManagementFactory.getThreadMXBean(); @@ -101,6 +102,10 @@ public class JavaSampler extends AbstractSampler implements Runnable { this.task.cancel(false); + if (this.socketStatisticsTask != null) { + this.socketStatisticsTask.cancel(false); + } + if (!cancelled) { // collect statistics for the final window this.windowStatisticsCollector.measureNow(this.lastWindow.get()); @@ -129,6 +134,15 @@ public class JavaSampler extends AbstractSampler implements Runnable { } } + @Override + public void attachSocket(ViewerSocket socket) { + super.attachSocket(socket); + + if (this.socketStatisticsTask == null) { + this.socketStatisticsTask = this.workerPool.scheduleAtFixedRate(this::sendStatisticsToSocket, 10, 10, TimeUnit.SECONDS); + } + } + private final class InsertDataTask implements Runnable { private final ThreadInfo[] threadDumps; private final int window; @@ -161,15 +175,20 @@ public class JavaSampler extends AbstractSampler implements Runnable { IntPredicate predicate = ProfilingWindowUtils.keepHistoryBefore(this.window); JavaSampler.this.dataAggregator.pruneData(predicate); JavaSampler.this.windowStatisticsCollector.pruneStatistics(predicate); + + JavaSampler.this.workerPool.execute(JavaSampler.this::processWindowRotate); } } } @Override - public SamplerData toProto(SparkPlatform platform, CommandSender creator, String comment, MergeMode mergeMode, ClassSourceLookup classSourceLookup) { + public SamplerData toProto(SparkPlatform platform, ExportProps exportProps) { SamplerData.Builder proto = SamplerData.newBuilder(); - writeMetadataToProto(proto, platform, creator, comment, this.dataAggregator); - writeDataToProto(proto, this.dataAggregator, mergeMode, classSourceLookup); + if (exportProps.channelInfo() != null) { + proto.setChannelInfo(exportProps.channelInfo()); + } + writeMetadataToProto(proto, platform, exportProps.creator(), exportProps.comment(), this.dataAggregator); + writeDataToProto(proto, this.dataAggregator, exportProps.mergeMode().get(), exportProps.classSourceLookup().get()); return proto.build(); } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/window/WindowStatisticsCollector.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/window/WindowStatisticsCollector.java index 1c05b00..7acbd6b 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/window/WindowStatisticsCollector.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/window/WindowStatisticsCollector.java @@ -251,11 +251,13 @@ public class WindowStatisticsCollector { if (this.startTick == -1) { throw new IllegalStateException("start tick not recorded"); } - if (this.stopTick == -1) { - throw new IllegalStateException("stop tick not recorded"); + + int stopTick = this.stopTick; + if (stopTick == -1) { + stopTick = this.tickHook.getCurrentTick(); } - return this.stopTick - this.startTick; + return stopTick - this.startTick; } } |