aboutsummaryrefslogtreecommitdiff
path: root/spark-common/src/main/java/me/lucko/spark/common/sampler
diff options
context:
space:
mode:
Diffstat (limited to 'spark-common/src/main/java/me/lucko/spark/common/sampler')
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/AbstractSampler.java53
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java76
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java32
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java31
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/window/WindowStatisticsCollector.java8
5 files changed, 181 insertions, 19 deletions
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/AbstractSampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/AbstractSampler.java
index 5abe71f..d814002 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/AbstractSampler.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/AbstractSampler.java
@@ -32,9 +32,12 @@ import me.lucko.spark.common.sampler.source.ClassSourceLookup;
import me.lucko.spark.common.sampler.source.SourceMetadata;
import me.lucko.spark.common.sampler.window.ProtoTimeEncoder;
import me.lucko.spark.common.sampler.window.WindowStatisticsCollector;
+import me.lucko.spark.common.ws.ViewerSocket;
+import me.lucko.spark.proto.SparkProtos;
import me.lucko.spark.proto.SparkSamplerProtos.SamplerData;
import me.lucko.spark.proto.SparkSamplerProtos.SamplerMetadata;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
@@ -74,6 +77,9 @@ public abstract class AbstractSampler implements Sampler {
/** The garbage collector statistics when profiling started */
protected Map<String, GarbageCollectorStatistics> initialGcStats;
+ /** A set of viewer sockets linked to the sampler */
+ protected List<ViewerSocket> viewerSockets = new ArrayList<>();
+
protected AbstractSampler(SparkPlatform platform, SamplerSettings settings) {
this.platform = platform;
this.interval = settings.interval();
@@ -122,13 +128,54 @@ public abstract class AbstractSampler implements Sampler {
@Override
public void stop(boolean cancelled) {
this.windowStatisticsCollector.stop();
+ for (ViewerSocket viewerSocket : this.viewerSockets) {
+ viewerSocket.processSamplerStopped(this);
+ }
+ }
+
+ @Override
+ public void attachSocket(ViewerSocket socket) {
+ this.viewerSockets.add(socket);
+ }
+
+ @Override
+ public Collection<ViewerSocket> getAttachedSockets() {
+ return this.viewerSockets;
+ }
+
+ protected void processWindowRotate() {
+ this.viewerSockets.removeIf(socket -> {
+ if (!socket.isOpen()) {
+ return true;
+ }
+
+ socket.processWindowRotate(this);
+ return false;
+ });
+ }
+
+ protected void sendStatisticsToSocket() {
+ try {
+ if (this.viewerSockets.isEmpty()) {
+ return;
+ }
+
+ SparkProtos.PlatformStatistics platform = this.platform.getStatisticsProvider().getPlatformStatistics(getInitialGcStats(), false);
+ SparkProtos.SystemStatistics system = this.platform.getStatisticsProvider().getSystemStatistics();
+
+ for (ViewerSocket viewerSocket : this.viewerSockets) {
+ viewerSocket.sendUpdatedStatistics(platform, system);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
}
- protected void writeMetadataToProto(SamplerData.Builder proto, SparkPlatform platform, CommandSender creator, String comment, DataAggregator dataAggregator) {
+ protected void writeMetadataToProto(SamplerData.Builder proto, SparkPlatform platform, CommandSender.Data creator, String comment, DataAggregator dataAggregator) {
SamplerMetadata.Builder metadata = SamplerMetadata.newBuilder()
.setSamplerMode(getMode().asProto())
.setPlatformMetadata(platform.getPlugin().getPlatformInfo().toData().toProto())
- .setCreator(creator.toData().toProto())
+ .setCreator(creator.toProto())
.setStartTime(this.startTime)
.setEndTime(System.currentTimeMillis())
.setInterval(this.interval)
@@ -145,7 +192,7 @@ public abstract class AbstractSampler implements Sampler {
}
try {
- metadata.setPlatformStatistics(platform.getStatisticsProvider().getPlatformStatistics(getInitialGcStats()));
+ metadata.setPlatformStatistics(platform.getStatisticsProvider().getPlatformStatistics(getInitialGcStats(), true));
} catch (Exception e) {
e.printStackTrace();
}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java
index aaf4f38..844ab0b 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java
@@ -24,9 +24,13 @@ import me.lucko.spark.common.SparkPlatform;
import me.lucko.spark.common.command.sender.CommandSender;
import me.lucko.spark.common.sampler.node.MergeMode;
import me.lucko.spark.common.sampler.source.ClassSourceLookup;
+import me.lucko.spark.common.ws.ViewerSocket;
import me.lucko.spark.proto.SparkSamplerProtos.SamplerData;
+import me.lucko.spark.proto.SparkSamplerProtos.SocketChannelInfo;
+import java.util.Collection;
import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
/**
* Abstract superinterface for all sampler implementations.
@@ -44,6 +48,20 @@ public interface Sampler {
void stop(boolean cancelled);
/**
+ * Attaches a viewer socket to this sampler.
+ *
+ * @param socket the socket
+ */
+ void attachSocket(ViewerSocket socket);
+
+ /**
+ * Gets the sockets attached to this sampler.
+ *
+ * @return the attached sockets
+ */
+ Collection<ViewerSocket> getAttachedSockets();
+
+ /**
* Gets the time when the sampler started (unix timestamp in millis)
*
* @return the start time
@@ -79,6 +97,62 @@ public interface Sampler {
CompletableFuture<Sampler> getFuture();
// Methods used to export the sampler data to the web viewer.
- SamplerData toProto(SparkPlatform platform, CommandSender creator, String comment, MergeMode mergeMode, ClassSourceLookup classSourceLookup);
+ SamplerData toProto(SparkPlatform platform, ExportProps exportProps);
+
+ final class ExportProps {
+ private CommandSender.Data creator;
+ private String comment;
+ private Supplier<MergeMode> mergeMode;
+ private Supplier<ClassSourceLookup> classSourceLookup;
+ private SocketChannelInfo channelInfo;
+
+ public ExportProps() {
+ }
+
+ public CommandSender.Data creator() {
+ return this.creator;
+ }
+
+ public String comment() {
+ return this.comment;
+ }
+
+ public Supplier<MergeMode> mergeMode() {
+ return this.mergeMode;
+ }
+
+ public Supplier<ClassSourceLookup> classSourceLookup() {
+ return this.classSourceLookup;
+ }
+
+ public SocketChannelInfo channelInfo() {
+ return this.channelInfo;
+ }
+
+ public ExportProps creator(CommandSender.Data creator) {
+ this.creator = creator;
+ return this;
+ }
+
+ public ExportProps comment(String comment) {
+ this.comment = comment;
+ return this;
+ }
+
+ public ExportProps mergeMode(Supplier<MergeMode> mergeMode) {
+ this.mergeMode = mergeMode;
+ return this;
+ }
+
+ public ExportProps classSourceLookup(Supplier<ClassSourceLookup> classSourceLookup) {
+ this.classSourceLookup = classSourceLookup;
+ return this;
+ }
+
+ public ExportProps channelInfo(SocketChannelInfo channelInfo) {
+ this.channelInfo = channelInfo;
+ return this;
+ }
+ }
}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java
index 2328582..ec88677 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java
@@ -23,18 +23,17 @@ package me.lucko.spark.common.sampler.async;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import me.lucko.spark.common.SparkPlatform;
-import me.lucko.spark.common.command.sender.CommandSender;
import me.lucko.spark.common.sampler.AbstractSampler;
import me.lucko.spark.common.sampler.SamplerMode;
import me.lucko.spark.common.sampler.SamplerSettings;
-import me.lucko.spark.common.sampler.node.MergeMode;
-import me.lucko.spark.common.sampler.source.ClassSourceLookup;
import me.lucko.spark.common.sampler.window.ProfilingWindowUtils;
import me.lucko.spark.common.tick.TickHook;
+import me.lucko.spark.common.ws.ViewerSocket;
import me.lucko.spark.proto.SparkSamplerProtos.SamplerData;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.IntPredicate;
@@ -61,6 +60,9 @@ public class AsyncSampler extends AbstractSampler {
/** The executor used for scheduling and management */
private ScheduledExecutorService scheduler;
+ /** The task to send statistics to the viewer socket */
+ private ScheduledFuture<?> socketStatisticsTask;
+
public AsyncSampler(SparkPlatform platform, SamplerSettings settings, SampleCollector<?> collector) {
super(platform, settings);
this.sampleCollector = collector;
@@ -143,6 +145,8 @@ public class AsyncSampler extends AbstractSampler {
IntPredicate predicate = ProfilingWindowUtils.keepHistoryBefore(window);
this.dataAggregator.pruneData(predicate);
this.windowStatisticsCollector.pruneStatistics(predicate);
+
+ this.scheduler.execute(this::processWindowRotate);
}
} catch (Throwable e) {
e.printStackTrace();
@@ -183,6 +187,10 @@ public class AsyncSampler extends AbstractSampler {
this.currentJob = null;
}
+ if (this.socketStatisticsTask != null) {
+ this.socketStatisticsTask.cancel(false);
+ }
+
if (this.scheduler != null) {
this.scheduler.shutdown();
this.scheduler = null;
@@ -190,15 +198,27 @@ public class AsyncSampler extends AbstractSampler {
}
@Override
+ public void attachSocket(ViewerSocket socket) {
+ super.attachSocket(socket);
+
+ if (this.socketStatisticsTask == null) {
+ this.socketStatisticsTask = this.scheduler.scheduleAtFixedRate(this::sendStatisticsToSocket, 10, 10, TimeUnit.SECONDS);
+ }
+ }
+
+ @Override
public SamplerMode getMode() {
return this.sampleCollector.getMode();
}
@Override
- public SamplerData toProto(SparkPlatform platform, CommandSender creator, String comment, MergeMode mergeMode, ClassSourceLookup classSourceLookup) {
+ public SamplerData toProto(SparkPlatform platform, ExportProps exportProps) {
SamplerData.Builder proto = SamplerData.newBuilder();
- writeMetadataToProto(proto, platform, creator, comment, this.dataAggregator);
- writeDataToProto(proto, this.dataAggregator, mergeMode, classSourceLookup);
+ if (exportProps.channelInfo() != null) {
+ proto.setChannelInfo(exportProps.channelInfo());
+ }
+ writeMetadataToProto(proto, platform, exportProps.creator(), exportProps.comment(), this.dataAggregator);
+ writeDataToProto(proto, this.dataAggregator, exportProps.mergeMode().get(), exportProps.classSourceLookup().get());
return proto.build();
}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java
index d5c965f..e9d7e0d 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java
@@ -23,15 +23,13 @@ package me.lucko.spark.common.sampler.java;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import me.lucko.spark.common.SparkPlatform;
-import me.lucko.spark.common.command.sender.CommandSender;
import me.lucko.spark.common.sampler.AbstractSampler;
import me.lucko.spark.common.sampler.SamplerMode;
import me.lucko.spark.common.sampler.SamplerSettings;
-import me.lucko.spark.common.sampler.node.MergeMode;
-import me.lucko.spark.common.sampler.source.ClassSourceLookup;
import me.lucko.spark.common.sampler.window.ProfilingWindowUtils;
import me.lucko.spark.common.sampler.window.WindowStatisticsCollector;
import me.lucko.spark.common.tick.TickHook;
+import me.lucko.spark.common.ws.ViewerSocket;
import me.lucko.spark.proto.SparkSamplerProtos.SamplerData;
import java.lang.management.ManagementFactory;
@@ -58,6 +56,9 @@ public class JavaSampler extends AbstractSampler implements Runnable {
/** The main sampling task */
private ScheduledFuture<?> task;
+ /** The task to send statistics to the viewer socket */
+ private ScheduledFuture<?> socketStatisticsTask;
+
/** The thread management interface for the current JVM */
private final ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
@@ -101,6 +102,10 @@ public class JavaSampler extends AbstractSampler implements Runnable {
this.task.cancel(false);
+ if (this.socketStatisticsTask != null) {
+ this.socketStatisticsTask.cancel(false);
+ }
+
if (!cancelled) {
// collect statistics for the final window
this.windowStatisticsCollector.measureNow(this.lastWindow.get());
@@ -129,6 +134,15 @@ public class JavaSampler extends AbstractSampler implements Runnable {
}
}
+ @Override
+ public void attachSocket(ViewerSocket socket) {
+ super.attachSocket(socket);
+
+ if (this.socketStatisticsTask == null) {
+ this.socketStatisticsTask = this.workerPool.scheduleAtFixedRate(this::sendStatisticsToSocket, 10, 10, TimeUnit.SECONDS);
+ }
+ }
+
private final class InsertDataTask implements Runnable {
private final ThreadInfo[] threadDumps;
private final int window;
@@ -161,15 +175,20 @@ public class JavaSampler extends AbstractSampler implements Runnable {
IntPredicate predicate = ProfilingWindowUtils.keepHistoryBefore(this.window);
JavaSampler.this.dataAggregator.pruneData(predicate);
JavaSampler.this.windowStatisticsCollector.pruneStatistics(predicate);
+
+ JavaSampler.this.workerPool.execute(JavaSampler.this::processWindowRotate);
}
}
}
@Override
- public SamplerData toProto(SparkPlatform platform, CommandSender creator, String comment, MergeMode mergeMode, ClassSourceLookup classSourceLookup) {
+ public SamplerData toProto(SparkPlatform platform, ExportProps exportProps) {
SamplerData.Builder proto = SamplerData.newBuilder();
- writeMetadataToProto(proto, platform, creator, comment, this.dataAggregator);
- writeDataToProto(proto, this.dataAggregator, mergeMode, classSourceLookup);
+ if (exportProps.channelInfo() != null) {
+ proto.setChannelInfo(exportProps.channelInfo());
+ }
+ writeMetadataToProto(proto, platform, exportProps.creator(), exportProps.comment(), this.dataAggregator);
+ writeDataToProto(proto, this.dataAggregator, exportProps.mergeMode().get(), exportProps.classSourceLookup().get());
return proto.build();
}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/window/WindowStatisticsCollector.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/window/WindowStatisticsCollector.java
index 1c05b00..7acbd6b 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/window/WindowStatisticsCollector.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/window/WindowStatisticsCollector.java
@@ -251,11 +251,13 @@ public class WindowStatisticsCollector {
if (this.startTick == -1) {
throw new IllegalStateException("start tick not recorded");
}
- if (this.stopTick == -1) {
- throw new IllegalStateException("stop tick not recorded");
+
+ int stopTick = this.stopTick;
+ if (stopTick == -1) {
+ stopTick = this.tickHook.getCurrentTick();
}
- return this.stopTick - this.startTick;
+ return stopTick - this.startTick;
}
}