aboutsummaryrefslogtreecommitdiff
path: root/spark-common/src/main/java/me
diff options
context:
space:
mode:
Diffstat (limited to 'spark-common/src/main/java/me')
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/SparkPlatform.java18
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/command/modules/HeapAnalysisModule.java4
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/command/modules/SamplerModule.java121
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/heapdump/HeapDumpSummary.java2
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/platform/PlatformStatisticsProvider.java23
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/AbstractSampler.java53
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java76
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java32
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java31
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/sampler/window/WindowStatisticsCollector.java8
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/util/BytebinClient.java14
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/util/Configuration.java27
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/util/MediaTypes.java29
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/util/ws/BytesocksClient.java118
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/util/ws/BytesocksClientImpl.java40
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/ws/CryptoAlgorithm.java90
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/ws/TrustedKeyStore.java139
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/ws/ViewerSocket.java255
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/ws/ViewerSocketConnection.java218
19 files changed, 1247 insertions, 51 deletions
diff --git a/spark-common/src/main/java/me/lucko/spark/common/SparkPlatform.java b/spark-common/src/main/java/me/lucko/spark/common/SparkPlatform.java
index dae04ff..61c6062 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/SparkPlatform.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/SparkPlatform.java
@@ -53,6 +53,8 @@ import me.lucko.spark.common.tick.TickReporter;
import me.lucko.spark.common.util.BytebinClient;
import me.lucko.spark.common.util.Configuration;
import me.lucko.spark.common.util.TemporaryFiles;
+import me.lucko.spark.common.util.ws.BytesocksClient;
+import me.lucko.spark.common.ws.TrustedKeyStore;
import net.kyori.adventure.text.Component;
import net.kyori.adventure.text.event.ClickEvent;
@@ -95,6 +97,8 @@ public class SparkPlatform {
private final Configuration configuration;
private final String viewerUrl;
private final BytebinClient bytebinClient;
+ private final BytesocksClient bytesocksClient;
+ private final TrustedKeyStore trustedKeyStore;
private final boolean disableResponseBroadcast;
private final List<CommandModule> commandModules;
private final List<Command> commands;
@@ -118,8 +122,12 @@ public class SparkPlatform {
this.configuration = new Configuration(this.plugin.getPluginDirectory().resolve("config.json"));
this.viewerUrl = this.configuration.getString("viewerUrl", "https://spark.lucko.me/");
- String bytebinUrl = this.configuration.getString("bytebinUrl", "https://bytebin.lucko.me/");
+ String bytebinUrl = this.configuration.getString("bytebinUrl", "https://spark-usercontent.lucko.me/");
+ String bytesocksHost = this.configuration.getString("bytesocksHost", "spark-usersockets.lucko.me");
+
this.bytebinClient = new BytebinClient(bytebinUrl, "spark-plugin");
+ this.bytesocksClient = BytesocksClient.create(bytesocksHost, "spark-plugin");
+ this.trustedKeyStore = new TrustedKeyStore(this.configuration);
this.disableResponseBroadcast = this.configuration.getBoolean("disableResponseBroadcast", false);
@@ -228,6 +236,14 @@ public class SparkPlatform {
return this.bytebinClient;
}
+ public BytesocksClient getBytesocksClient() {
+ return this.bytesocksClient;
+ }
+
+ public TrustedKeyStore getTrustedKeyStore() {
+ return this.trustedKeyStore;
+ }
+
public boolean shouldBroadcastResponse() {
return !this.disableResponseBroadcast;
}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/command/modules/HeapAnalysisModule.java b/spark-common/src/main/java/me/lucko/spark/common/command/modules/HeapAnalysisModule.java
index 5bd62a8..6ac3b2f 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/command/modules/HeapAnalysisModule.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/command/modules/HeapAnalysisModule.java
@@ -32,6 +32,7 @@ import me.lucko.spark.common.heapdump.HeapDump;
import me.lucko.spark.common.heapdump.HeapDumpSummary;
import me.lucko.spark.common.util.Compression;
import me.lucko.spark.common.util.FormatUtil;
+import me.lucko.spark.common.util.MediaTypes;
import me.lucko.spark.proto.SparkHeapProtos;
import net.kyori.adventure.text.event.ClickEvent;
@@ -52,7 +53,6 @@ import static net.kyori.adventure.text.format.NamedTextColor.GREEN;
import static net.kyori.adventure.text.format.NamedTextColor.RED;
public class HeapAnalysisModule implements CommandModule {
- private static final String SPARK_HEAP_MEDIA_TYPE = "application/x-spark-heap";
@Override
public void registerCommands(Consumer<Command> consumer) {
@@ -97,7 +97,7 @@ public class HeapAnalysisModule implements CommandModule {
saveToFile = true;
} else {
try {
- String key = platform.getBytebinClient().postContent(output, SPARK_HEAP_MEDIA_TYPE).key();
+ String key = platform.getBytebinClient().postContent(output, MediaTypes.SPARK_HEAP_MEDIA_TYPE).key();
String url = platform.getViewerUrl() + key;
resp.broadcastPrefixed(text("Heap dump summmary output:", GOLD));
diff --git a/spark-common/src/main/java/me/lucko/spark/common/command/modules/SamplerModule.java b/spark-common/src/main/java/me/lucko/spark/common/command/modules/SamplerModule.java
index 041cacf..049c817 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/command/modules/SamplerModule.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/command/modules/SamplerModule.java
@@ -41,7 +41,10 @@ import me.lucko.spark.common.sampler.node.MergeMode;
import me.lucko.spark.common.sampler.source.ClassSourceLookup;
import me.lucko.spark.common.tick.TickHook;
import me.lucko.spark.common.util.FormatUtil;
+import me.lucko.spark.common.util.MediaTypes;
import me.lucko.spark.common.util.MethodDisambiguator;
+import me.lucko.spark.common.util.ws.BytesocksClient;
+import me.lucko.spark.common.ws.ViewerSocket;
import me.lucko.spark.proto.SparkSamplerProtos;
import net.kyori.adventure.text.Component;
@@ -68,7 +71,6 @@ import static net.kyori.adventure.text.format.NamedTextColor.RED;
import static net.kyori.adventure.text.format.NamedTextColor.WHITE;
public class SamplerModule implements CommandModule {
- private static final String SPARK_SAMPLER_MEDIA_TYPE = "application/x-spark-sampler";
@Override
public void registerCommands(Consumer<Command> consumer) {
@@ -76,6 +78,7 @@ public class SamplerModule implements CommandModule {
.aliases("profiler", "sampler")
.allowSubCommand(true)
.argumentUsage("info", "", null)
+ .argumentUsage("open", "", null)
.argumentUsage("start", "timeout", "timeout seconds")
.argumentUsage("start", "thread *", null)
.argumentUsage("start", "thread", "thread name")
@@ -103,7 +106,7 @@ public class SamplerModule implements CommandModule {
}
return TabCompleter.create()
- .at(0, CompletionSupplier.startsWith(Arrays.asList("info", "start", "stop", "cancel")))
+ .at(0, CompletionSupplier.startsWith(Arrays.asList("info", "start", "open", "stop", "cancel")))
.from(1, CompletionSupplier.startsWith(opts))
.complete(arguments);
})
@@ -119,6 +122,16 @@ public class SamplerModule implements CommandModule {
return;
}
+ if (subCommand.equals("open") || arguments.boolFlag("open")) {
+ profilerOpen(platform, sender, resp, arguments);
+ return;
+ }
+
+ if (subCommand.equals("trust-viewer") || arguments.boolFlag("trust-viewer")) {
+ profilerTrustViewer(platform, sender, resp, arguments);
+ return;
+ }
+
if (subCommand.equals("cancel") || arguments.boolFlag("cancel")) {
profilerCancel(platform, resp);
return;
@@ -254,6 +267,8 @@ public class SamplerModule implements CommandModule {
resp.broadcastPrefixed(text("It will run in the background until it is stopped by an admin."));
resp.broadcastPrefixed(text("To stop the profiler and upload the results, run:"));
resp.broadcastPrefixed(cmdPrompt("/" + platform.getPlugin().getCommandName() + " profiler stop"));
+ resp.broadcastPrefixed(text("To view the profiler while it's running, run:"));
+ resp.broadcastPrefixed(cmdPrompt("/" + platform.getPlugin().getCommandName() + " profiler open"));
} else {
resp.broadcastPrefixed(text("The results will be automatically returned after the profiler has been running for " + FormatUtil.formatSeconds(timeoutSeconds) + "."));
}
@@ -273,13 +288,11 @@ public class SamplerModule implements CommandModule {
// await the result
if (timeoutSeconds != -1) {
- String comment = Iterables.getFirst(arguments.stringFlag("comment"), null);
- MethodDisambiguator methodDisambiguator = new MethodDisambiguator();
- MergeMode mergeMode = arguments.boolFlag("separate-parent-calls") ? MergeMode.separateParentCalls(methodDisambiguator) : MergeMode.sameMethod(methodDisambiguator);
+ Sampler.ExportProps exportProps = getExportProps(platform, resp, arguments);
boolean saveToFile = arguments.boolFlag("save-to-file");
future.thenAcceptAsync(s -> {
resp.broadcastPrefixed(text("The active profiler has completed! Uploading results..."));
- handleUpload(platform, resp, s, comment, mergeMode, saveToFile);
+ handleUpload(platform, resp, s, exportProps, saveToFile);
});
}
}
@@ -306,6 +319,9 @@ public class SamplerModule implements CommandModule {
resp.replyPrefixed(text("So far, it has profiled for " + FormatUtil.formatSeconds(runningTime) + "."));
}
+ resp.replyPrefixed(text("To view the profiler while it's running, run:"));
+ resp.replyPrefixed(cmdPrompt("/" + platform.getPlugin().getCommandName() + " profiler open"));
+
long timeout = sampler.getAutoEndTime();
if (timeout == -1) {
resp.replyPrefixed(text("To stop the profiler and upload the results, run:"));
@@ -320,6 +336,48 @@ public class SamplerModule implements CommandModule {
}
}
+ private void profilerOpen(SparkPlatform platform, CommandSender sender, CommandResponseHandler resp, Arguments arguments) {
+ BytesocksClient bytesocksClient = platform.getBytesocksClient();
+ if (bytesocksClient == null) {
+ resp.replyPrefixed(text("The live viewer is only supported on Java 11 or newer.", RED));
+ return;
+ }
+
+ Sampler sampler = platform.getSamplerContainer().getActiveSampler();
+ if (sampler == null) {
+ resp.replyPrefixed(text("The profiler isn't running!"));
+ resp.replyPrefixed(text("To start a new one, run:"));
+ resp.replyPrefixed(cmdPrompt("/" + platform.getPlugin().getCommandName() + " profiler start"));
+ return;
+ }
+
+ Sampler.ExportProps exportProps = getExportProps(platform, resp, arguments);
+ handleOpen(platform, bytesocksClient, resp, sampler, exportProps);
+ }
+
+ private void profilerTrustViewer(SparkPlatform platform, CommandSender sender, CommandResponseHandler resp, Arguments arguments) {
+ Set<String> ids = arguments.stringFlag("id");
+ if (ids.isEmpty()) {
+ resp.replyPrefixed(text("Please provide a client id with '--id <client id>'."));
+ return;
+ }
+
+ for (String id : ids) {
+ boolean success = platform.getTrustedKeyStore().trustPendingKey(id);
+ if (success) {
+ Sampler sampler = platform.getSamplerContainer().getActiveSampler();
+ if (sampler != null) {
+ for (ViewerSocket socket : sampler.getAttachedSockets()) {
+ socket.sendClientTrustedMessage(id);
+ }
+ }
+ resp.replyPrefixed(text("Client connected to the viewer using id '" + id + "' is now trusted."));
+ } else {
+ resp.replyPrefixed(text("Unable to find pending client with id '" + id + "'."));
+ }
+ }
+ }
+
private void profilerCancel(SparkPlatform platform, CommandResponseHandler resp) {
Sampler sampler = platform.getSamplerContainer().getActiveSampler();
if (sampler == null) {
@@ -346,10 +404,8 @@ public class SamplerModule implements CommandModule {
resp.broadcastPrefixed(text("Stopping the profiler & uploading results, please wait..."));
}
- String comment = Iterables.getFirst(arguments.stringFlag("comment"), null);
- MethodDisambiguator methodDisambiguator = new MethodDisambiguator();
- MergeMode mergeMode = arguments.boolFlag("separate-parent-calls") ? MergeMode.separateParentCalls(methodDisambiguator) : MergeMode.sameMethod(methodDisambiguator);
- handleUpload(platform, resp, sampler, comment, mergeMode, saveToFile);
+ Sampler.ExportProps exportProps = getExportProps(platform, resp, arguments);
+ handleUpload(platform, resp, sampler, exportProps, saveToFile);
// if the previous sampler was running in the background, create a new one
if (platform.getBackgroundSamplerManager().restartBackgroundSampler()) {
@@ -362,15 +418,15 @@ public class SamplerModule implements CommandModule {
}
}
- private void handleUpload(SparkPlatform platform, CommandResponseHandler resp, Sampler sampler, String comment, MergeMode mergeMode, boolean saveToFileFlag) {
- SparkSamplerProtos.SamplerData output = sampler.toProto(platform, resp.sender(), comment, mergeMode, ClassSourceLookup.create(platform));
+ private void handleUpload(SparkPlatform platform, CommandResponseHandler resp, Sampler sampler, Sampler.ExportProps exportProps, boolean saveToFileFlag) {
+ SparkSamplerProtos.SamplerData output = sampler.toProto(platform, exportProps);
boolean saveToFile = false;
if (saveToFileFlag) {
saveToFile = true;
} else {
try {
- String key = platform.getBytebinClient().postContent(output, SPARK_SAMPLER_MEDIA_TYPE).key();
+ String key = platform.getBytebinClient().postContent(output, MediaTypes.SPARK_SAMPLER_MEDIA_TYPE).key();
String url = platform.getViewerUrl() + key;
resp.broadcastPrefixed(text("Profiler stopped & upload complete!", GOLD));
@@ -406,6 +462,45 @@ public class SamplerModule implements CommandModule {
}
}
+ private void handleOpen(SparkPlatform platform, BytesocksClient bytesocksClient, CommandResponseHandler resp, Sampler sampler, Sampler.ExportProps exportProps) {
+ try {
+ ViewerSocket socket = new ViewerSocket(platform, bytesocksClient, exportProps);
+ sampler.attachSocket(socket);
+ exportProps.channelInfo(socket.getPayload());
+
+ SparkSamplerProtos.SamplerData data = sampler.toProto(platform, exportProps);
+
+ String key = platform.getBytebinClient().postContent(data, MediaTypes.SPARK_SAMPLER_MEDIA_TYPE, "live").key();
+ String url = platform.getViewerUrl() + key;
+
+ resp.broadcastPrefixed(text("Profiler live viewer:", GOLD));
+ resp.broadcast(text()
+ .content(url)
+ .color(GRAY)
+ .clickEvent(ClickEvent.openUrl(url))
+ .build()
+ );
+
+ platform.getActivityLog().addToLog(Activity.urlActivity(resp.sender(), System.currentTimeMillis(), "Profiler (live)", url));
+ } catch (Exception e) {
+ resp.replyPrefixed(text("An error occurred whilst opening the live profiler.", RED));
+ e.printStackTrace();
+ }
+ }
+
+ private Sampler.ExportProps getExportProps(SparkPlatform platform, CommandResponseHandler resp, Arguments arguments) {
+ return new Sampler.ExportProps()
+ .creator(resp.sender().toData())
+ .comment(Iterables.getFirst(arguments.stringFlag("comment"), null))
+ .mergeMode(() -> {
+ MethodDisambiguator methodDisambiguator = new MethodDisambiguator();
+ return arguments.boolFlag("separate-parent-calls")
+ ? MergeMode.separateParentCalls(methodDisambiguator)
+ : MergeMode.sameMethod(methodDisambiguator);
+ })
+ .classSourceLookup(() -> ClassSourceLookup.create(platform));
+ }
+
private static Component cmdPrompt(String cmd) {
return text()
.append(text(" "))
diff --git a/spark-common/src/main/java/me/lucko/spark/common/heapdump/HeapDumpSummary.java b/spark-common/src/main/java/me/lucko/spark/common/heapdump/HeapDumpSummary.java
index c0980e7..eaedd31 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/heapdump/HeapDumpSummary.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/heapdump/HeapDumpSummary.java
@@ -130,7 +130,7 @@ public final class HeapDumpSummary {
.setPlatformMetadata(platform.getPlugin().getPlatformInfo().toData().toProto())
.setCreator(creator.toData().toProto());
try {
- metadata.setPlatformStatistics(platform.getStatisticsProvider().getPlatformStatistics(null));
+ metadata.setPlatformStatistics(platform.getStatisticsProvider().getPlatformStatistics(null, true));
} catch (Exception e) {
e.printStackTrace();
}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/platform/PlatformStatisticsProvider.java b/spark-common/src/main/java/me/lucko/spark/common/platform/PlatformStatisticsProvider.java
index fc7e78a..059590d 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/platform/PlatformStatisticsProvider.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/platform/PlatformStatisticsProvider.java
@@ -128,7 +128,7 @@ public class PlatformStatisticsProvider {
return builder.build();
}
- public PlatformStatistics getPlatformStatistics(Map<String, GarbageCollectorStatistics> startingGcStatistics) {
+ public PlatformStatistics getPlatformStatistics(Map<String, GarbageCollectorStatistics> startingGcStatistics, boolean includeWorld) {
PlatformStatistics.Builder builder = PlatformStatistics.newBuilder();
MemoryUsage memoryUsage = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage();
@@ -187,19 +187,20 @@ public class PlatformStatisticsProvider {
builder.setPlayerCount(playerCount);
}
- try {
- WorldStatisticsProvider worldStatisticsProvider = new WorldStatisticsProvider(
- new AsyncWorldInfoProvider(this.platform, this.platform.getPlugin().createWorldInfoProvider())
- );
- WorldStatistics worldStatistics = worldStatisticsProvider.getWorldStatistics();
- if (worldStatistics != null) {
- builder.setWorld(worldStatistics);
+ if (includeWorld) {
+ try {
+ WorldStatisticsProvider worldStatisticsProvider = new WorldStatisticsProvider(
+ new AsyncWorldInfoProvider(this.platform, this.platform.getPlugin().createWorldInfoProvider())
+ );
+ WorldStatistics worldStatistics = worldStatisticsProvider.getWorldStatistics();
+ if (worldStatistics != null) {
+ builder.setWorld(worldStatistics);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
}
- } catch (Exception e) {
- e.printStackTrace();
}
-
return builder.build();
}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/AbstractSampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/AbstractSampler.java
index 5abe71f..d814002 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/AbstractSampler.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/AbstractSampler.java
@@ -32,9 +32,12 @@ import me.lucko.spark.common.sampler.source.ClassSourceLookup;
import me.lucko.spark.common.sampler.source.SourceMetadata;
import me.lucko.spark.common.sampler.window.ProtoTimeEncoder;
import me.lucko.spark.common.sampler.window.WindowStatisticsCollector;
+import me.lucko.spark.common.ws.ViewerSocket;
+import me.lucko.spark.proto.SparkProtos;
import me.lucko.spark.proto.SparkSamplerProtos.SamplerData;
import me.lucko.spark.proto.SparkSamplerProtos.SamplerMetadata;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
@@ -74,6 +77,9 @@ public abstract class AbstractSampler implements Sampler {
/** The garbage collector statistics when profiling started */
protected Map<String, GarbageCollectorStatistics> initialGcStats;
+ /** A set of viewer sockets linked to the sampler */
+ protected List<ViewerSocket> viewerSockets = new ArrayList<>();
+
protected AbstractSampler(SparkPlatform platform, SamplerSettings settings) {
this.platform = platform;
this.interval = settings.interval();
@@ -122,13 +128,54 @@ public abstract class AbstractSampler implements Sampler {
@Override
public void stop(boolean cancelled) {
this.windowStatisticsCollector.stop();
+ for (ViewerSocket viewerSocket : this.viewerSockets) {
+ viewerSocket.processSamplerStopped(this);
+ }
+ }
+
+ @Override
+ public void attachSocket(ViewerSocket socket) {
+ this.viewerSockets.add(socket);
+ }
+
+ @Override
+ public Collection<ViewerSocket> getAttachedSockets() {
+ return this.viewerSockets;
+ }
+
+ protected void processWindowRotate() {
+ this.viewerSockets.removeIf(socket -> {
+ if (!socket.isOpen()) {
+ return true;
+ }
+
+ socket.processWindowRotate(this);
+ return false;
+ });
+ }
+
+ protected void sendStatisticsToSocket() {
+ try {
+ if (this.viewerSockets.isEmpty()) {
+ return;
+ }
+
+ SparkProtos.PlatformStatistics platform = this.platform.getStatisticsProvider().getPlatformStatistics(getInitialGcStats(), false);
+ SparkProtos.SystemStatistics system = this.platform.getStatisticsProvider().getSystemStatistics();
+
+ for (ViewerSocket viewerSocket : this.viewerSockets) {
+ viewerSocket.sendUpdatedStatistics(platform, system);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
}
- protected void writeMetadataToProto(SamplerData.Builder proto, SparkPlatform platform, CommandSender creator, String comment, DataAggregator dataAggregator) {
+ protected void writeMetadataToProto(SamplerData.Builder proto, SparkPlatform platform, CommandSender.Data creator, String comment, DataAggregator dataAggregator) {
SamplerMetadata.Builder metadata = SamplerMetadata.newBuilder()
.setSamplerMode(getMode().asProto())
.setPlatformMetadata(platform.getPlugin().getPlatformInfo().toData().toProto())
- .setCreator(creator.toData().toProto())
+ .setCreator(creator.toProto())
.setStartTime(this.startTime)
.setEndTime(System.currentTimeMillis())
.setInterval(this.interval)
@@ -145,7 +192,7 @@ public abstract class AbstractSampler implements Sampler {
}
try {
- metadata.setPlatformStatistics(platform.getStatisticsProvider().getPlatformStatistics(getInitialGcStats()));
+ metadata.setPlatformStatistics(platform.getStatisticsProvider().getPlatformStatistics(getInitialGcStats(), true));
} catch (Exception e) {
e.printStackTrace();
}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java
index aaf4f38..844ab0b 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java
@@ -24,9 +24,13 @@ import me.lucko.spark.common.SparkPlatform;
import me.lucko.spark.common.command.sender.CommandSender;
import me.lucko.spark.common.sampler.node.MergeMode;
import me.lucko.spark.common.sampler.source.ClassSourceLookup;
+import me.lucko.spark.common.ws.ViewerSocket;
import me.lucko.spark.proto.SparkSamplerProtos.SamplerData;
+import me.lucko.spark.proto.SparkSamplerProtos.SocketChannelInfo;
+import java.util.Collection;
import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
/**
* Abstract superinterface for all sampler implementations.
@@ -44,6 +48,20 @@ public interface Sampler {
void stop(boolean cancelled);
/**
+ * Attaches a viewer socket to this sampler.
+ *
+ * @param socket the socket
+ */
+ void attachSocket(ViewerSocket socket);
+
+ /**
+ * Gets the sockets attached to this sampler.
+ *
+ * @return the attached sockets
+ */
+ Collection<ViewerSocket> getAttachedSockets();
+
+ /**
* Gets the time when the sampler started (unix timestamp in millis)
*
* @return the start time
@@ -79,6 +97,62 @@ public interface Sampler {
CompletableFuture<Sampler> getFuture();
// Methods used to export the sampler data to the web viewer.
- SamplerData toProto(SparkPlatform platform, CommandSender creator, String comment, MergeMode mergeMode, ClassSourceLookup classSourceLookup);
+ SamplerData toProto(SparkPlatform platform, ExportProps exportProps);
+
+ final class ExportProps {
+ private CommandSender.Data creator;
+ private String comment;
+ private Supplier<MergeMode> mergeMode;
+ private Supplier<ClassSourceLookup> classSourceLookup;
+ private SocketChannelInfo channelInfo;
+
+ public ExportProps() {
+ }
+
+ public CommandSender.Data creator() {
+ return this.creator;
+ }
+
+ public String comment() {
+ return this.comment;
+ }
+
+ public Supplier<MergeMode> mergeMode() {
+ return this.mergeMode;
+ }
+
+ public Supplier<ClassSourceLookup> classSourceLookup() {
+ return this.classSourceLookup;
+ }
+
+ public SocketChannelInfo channelInfo() {
+ return this.channelInfo;
+ }
+
+ public ExportProps creator(CommandSender.Data creator) {
+ this.creator = creator;
+ return this;
+ }
+
+ public ExportProps comment(String comment) {
+ this.comment = comment;
+ return this;
+ }
+
+ public ExportProps mergeMode(Supplier<MergeMode> mergeMode) {
+ this.mergeMode = mergeMode;
+ return this;
+ }
+
+ public ExportProps classSourceLookup(Supplier<ClassSourceLookup> classSourceLookup) {
+ this.classSourceLookup = classSourceLookup;
+ return this;
+ }
+
+ public ExportProps channelInfo(SocketChannelInfo channelInfo) {
+ this.channelInfo = channelInfo;
+ return this;
+ }
+ }
}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java
index 2328582..ec88677 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java
@@ -23,18 +23,17 @@ package me.lucko.spark.common.sampler.async;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import me.lucko.spark.common.SparkPlatform;
-import me.lucko.spark.common.command.sender.CommandSender;
import me.lucko.spark.common.sampler.AbstractSampler;
import me.lucko.spark.common.sampler.SamplerMode;
import me.lucko.spark.common.sampler.SamplerSettings;
-import me.lucko.spark.common.sampler.node.MergeMode;
-import me.lucko.spark.common.sampler.source.ClassSourceLookup;
import me.lucko.spark.common.sampler.window.ProfilingWindowUtils;
import me.lucko.spark.common.tick.TickHook;
+import me.lucko.spark.common.ws.ViewerSocket;
import me.lucko.spark.proto.SparkSamplerProtos.SamplerData;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.IntPredicate;
@@ -61,6 +60,9 @@ public class AsyncSampler extends AbstractSampler {
/** The executor used for scheduling and management */
private ScheduledExecutorService scheduler;
+ /** The task to send statistics to the viewer socket */
+ private ScheduledFuture<?> socketStatisticsTask;
+
public AsyncSampler(SparkPlatform platform, SamplerSettings settings, SampleCollector<?> collector) {
super(platform, settings);
this.sampleCollector = collector;
@@ -143,6 +145,8 @@ public class AsyncSampler extends AbstractSampler {
IntPredicate predicate = ProfilingWindowUtils.keepHistoryBefore(window);
this.dataAggregator.pruneData(predicate);
this.windowStatisticsCollector.pruneStatistics(predicate);
+
+ this.scheduler.execute(this::processWindowRotate);
}
} catch (Throwable e) {
e.printStackTrace();
@@ -183,6 +187,10 @@ public class AsyncSampler extends AbstractSampler {
this.currentJob = null;
}
+ if (this.socketStatisticsTask != null) {
+ this.socketStatisticsTask.cancel(false);
+ }
+
if (this.scheduler != null) {
this.scheduler.shutdown();
this.scheduler = null;
@@ -190,15 +198,27 @@ public class AsyncSampler extends AbstractSampler {
}
@Override
+ public void attachSocket(ViewerSocket socket) {
+ super.attachSocket(socket);
+
+ if (this.socketStatisticsTask == null) {
+ this.socketStatisticsTask = this.scheduler.scheduleAtFixedRate(this::sendStatisticsToSocket, 10, 10, TimeUnit.SECONDS);
+ }
+ }
+
+ @Override
public SamplerMode getMode() {
return this.sampleCollector.getMode();
}
@Override
- public SamplerData toProto(SparkPlatform platform, CommandSender creator, String comment, MergeMode mergeMode, ClassSourceLookup classSourceLookup) {
+ public SamplerData toProto(SparkPlatform platform, ExportProps exportProps) {
SamplerData.Builder proto = SamplerData.newBuilder();
- writeMetadataToProto(proto, platform, creator, comment, this.dataAggregator);
- writeDataToProto(proto, this.dataAggregator, mergeMode, classSourceLookup);
+ if (exportProps.channelInfo() != null) {
+ proto.setChannelInfo(exportProps.channelInfo());
+ }
+ writeMetadataToProto(proto, platform, exportProps.creator(), exportProps.comment(), this.dataAggregator);
+ writeDataToProto(proto, this.dataAggregator, exportProps.mergeMode().get(), exportProps.classSourceLookup().get());
return proto.build();
}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java
index d5c965f..e9d7e0d 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java
@@ -23,15 +23,13 @@ package me.lucko.spark.common.sampler.java;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import me.lucko.spark.common.SparkPlatform;
-import me.lucko.spark.common.command.sender.CommandSender;
import me.lucko.spark.common.sampler.AbstractSampler;
import me.lucko.spark.common.sampler.SamplerMode;
import me.lucko.spark.common.sampler.SamplerSettings;
-import me.lucko.spark.common.sampler.node.MergeMode;
-import me.lucko.spark.common.sampler.source.ClassSourceLookup;
import me.lucko.spark.common.sampler.window.ProfilingWindowUtils;
import me.lucko.spark.common.sampler.window.WindowStatisticsCollector;
import me.lucko.spark.common.tick.TickHook;
+import me.lucko.spark.common.ws.ViewerSocket;
import me.lucko.spark.proto.SparkSamplerProtos.SamplerData;
import java.lang.management.ManagementFactory;
@@ -58,6 +56,9 @@ public class JavaSampler extends AbstractSampler implements Runnable {
/** The main sampling task */
private ScheduledFuture<?> task;
+ /** The task to send statistics to the viewer socket */
+ private ScheduledFuture<?> socketStatisticsTask;
+
/** The thread management interface for the current JVM */
private final ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
@@ -101,6 +102,10 @@ public class JavaSampler extends AbstractSampler implements Runnable {
this.task.cancel(false);
+ if (this.socketStatisticsTask != null) {
+ this.socketStatisticsTask.cancel(false);
+ }
+
if (!cancelled) {
// collect statistics for the final window
this.windowStatisticsCollector.measureNow(this.lastWindow.get());
@@ -129,6 +134,15 @@ public class JavaSampler extends AbstractSampler implements Runnable {
}
}
+ @Override
+ public void attachSocket(ViewerSocket socket) {
+ super.attachSocket(socket);
+
+ if (this.socketStatisticsTask == null) {
+ this.socketStatisticsTask = this.workerPool.scheduleAtFixedRate(this::sendStatisticsToSocket, 10, 10, TimeUnit.SECONDS);
+ }
+ }
+
private final class InsertDataTask implements Runnable {
private final ThreadInfo[] threadDumps;
private final int window;
@@ -161,15 +175,20 @@ public class JavaSampler extends AbstractSampler implements Runnable {
IntPredicate predicate = ProfilingWindowUtils.keepHistoryBefore(this.window);
JavaSampler.this.dataAggregator.pruneData(predicate);
JavaSampler.this.windowStatisticsCollector.pruneStatistics(predicate);
+
+ JavaSampler.this.workerPool.execute(JavaSampler.this::processWindowRotate);
}
}
}
@Override
- public SamplerData toProto(SparkPlatform platform, CommandSender creator, String comment, MergeMode mergeMode, ClassSourceLookup classSourceLookup) {
+ public SamplerData toProto(SparkPlatform platform, ExportProps exportProps) {
SamplerData.Builder proto = SamplerData.newBuilder();
- writeMetadataToProto(proto, platform, creator, comment, this.dataAggregator);
- writeDataToProto(proto, this.dataAggregator, mergeMode, classSourceLookup);
+ if (exportProps.channelInfo() != null) {
+ proto.setChannelInfo(exportProps.channelInfo());
+ }
+ writeMetadataToProto(proto, platform, exportProps.creator(), exportProps.comment(), this.dataAggregator);
+ writeDataToProto(proto, this.dataAggregator, exportProps.mergeMode().get(), exportProps.classSourceLookup().get());
return proto.build();
}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/window/WindowStatisticsCollector.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/window/WindowStatisticsCollector.java
index 1c05b00..7acbd6b 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/sampler/window/WindowStatisticsCollector.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/window/WindowStatisticsCollector.java
@@ -251,11 +251,13 @@ public class WindowStatisticsCollector {
if (this.startTick == -1) {
throw new IllegalStateException("start tick not recorded");
}
- if (this.stopTick == -1) {
- throw new IllegalStateException("stop tick not recorded");
+
+ int stopTick = this.stopTick;
+ if (stopTick == -1) {
+ stopTick = this.tickHook.getCurrentTick();
}
- return this.stopTick - this.startTick;
+ return stopTick - this.startTick;
}
}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/util/BytebinClient.java b/spark-common/src/main/java/me/lucko/spark/common/util/BytebinClient.java
index e69b94e..8f11edc 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/util/BytebinClient.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/util/BytebinClient.java
@@ -32,6 +32,8 @@ import java.util.zip.GZIPOutputStream;
/**
* Utility for posting content to bytebin.
+ *
+ * @see <a href="https://github.com/lucko/bytebin">https://github.com/lucko/bytebin</a>
*/
public class BytebinClient {
@@ -45,7 +47,7 @@ public class BytebinClient {
this.userAgent = userAgent;
}
- private Content postContent(String contentType, Consumer<OutputStream> consumer) throws IOException {
+ private Content postContent(String contentType, Consumer<OutputStream> consumer, String userAgent) throws IOException {
URL url = new URL(this.url + "post");
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
try {
@@ -55,7 +57,7 @@ public class BytebinClient {
connection.setDoOutput(true);
connection.setRequestMethod("POST");
connection.setRequestProperty("Content-Type", contentType);
- connection.setRequestProperty("User-Agent", this.userAgent);
+ connection.setRequestProperty("User-Agent", userAgent);
connection.setRequestProperty("Content-Encoding", "gzip");
connection.connect();
@@ -74,14 +76,18 @@ public class BytebinClient {
}
}
- public Content postContent(AbstractMessageLite<?, ?> proto, String contentType) throws IOException {
+ public Content postContent(AbstractMessageLite<?, ?> proto, String contentType, String userAgentExtra) throws IOException {
return postContent(contentType, outputStream -> {
try (OutputStream out = new GZIPOutputStream(outputStream)) {
proto.writeTo(out);
} catch (IOException e) {
throw new RuntimeException(e);
}
- });
+ }, this.userAgent + "/" + userAgentExtra);
+ }
+
+ public Content postContent(AbstractMessageLite<?, ?> proto, String contentType) throws IOException {
+ return postContent(proto, contentType, this.userAgent);
}
public static final class Content {
diff --git a/spark-common/src/main/java/me/lucko/spark/common/util/Configuration.java b/spark-common/src/main/java/me/lucko/spark/common/util/Configuration.java
index 32f3bc6..d19ba64 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/util/Configuration.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/util/Configuration.java
@@ -22,6 +22,7 @@ package me.lucko.spark.common.util;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
+import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonPrimitive;
@@ -32,6 +33,9 @@ import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
public final class Configuration {
private static final Gson GSON = new GsonBuilder().setPrettyPrinting().create();
@@ -103,6 +107,21 @@ public final class Configuration {
return val.isBoolean() ? val.getAsInt() : def;
}
+ public List<String> getStringList(String path) {
+ JsonElement el = this.root.get(path);
+ if (el == null || !el.isJsonArray()) {
+ return Collections.emptyList();
+ }
+
+ List<String> list = new ArrayList<>();
+ for (JsonElement child : el.getAsJsonArray()) {
+ if (child.isJsonPrimitive()) {
+ list.add(child.getAsJsonPrimitive().getAsString());
+ }
+ }
+ return list;
+ }
+
public void setString(String path, String value) {
this.root.add(path, new JsonPrimitive(value));
}
@@ -115,6 +134,14 @@ public final class Configuration {
this.root.add(path, new JsonPrimitive(value));
}
+ public void setStringList(String path, List<String> value) {
+ JsonArray array = new JsonArray();
+ for (String str : value) {
+ array.add(str);
+ }
+ this.root.add(path, array);
+ }
+
public boolean contains(String path) {
return this.root.has(path);
}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/util/MediaTypes.java b/spark-common/src/main/java/me/lucko/spark/common/util/MediaTypes.java
new file mode 100644
index 0000000..2c49540
--- /dev/null
+++ b/spark-common/src/main/java/me/lucko/spark/common/util/MediaTypes.java
@@ -0,0 +1,29 @@
+/*
+ * This file is part of spark.
+ *
+ * Copyright (c) lucko (Luck) <luck@lucko.me>
+ * Copyright (c) contributors
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+package me.lucko.spark.common.util;
+
+public enum MediaTypes {
+ ;
+
+ public static final String SPARK_SAMPLER_MEDIA_TYPE = "application/x-spark-sampler";
+ public static final String SPARK_HEAP_MEDIA_TYPE = "application/x-spark-heap";
+
+}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/util/ws/BytesocksClient.java b/spark-common/src/main/java/me/lucko/spark/common/util/ws/BytesocksClient.java
new file mode 100644
index 0000000..1db7a67
--- /dev/null
+++ b/spark-common/src/main/java/me/lucko/spark/common/util/ws/BytesocksClient.java
@@ -0,0 +1,118 @@
+/*
+ * This file is part of spark.
+ *
+ * Copyright (c) lucko (Luck) <luck@lucko.me>
+ * Copyright (c) contributors
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+package me.lucko.spark.common.util.ws;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A client that can interact with bytesocks.
+ *
+ * @see <a href="https://github.com/lucko/bytesocks">https://github.com/lucko/bytesocks</a>
+ */
+public interface BytesocksClient {
+
+ /**
+ * Creates a new {@link BytesocksClient}.
+ *
+ * <p>Returns {@code null} on Java versions before 11.</p>
+ *
+ * @param host the host
+ * @param userAgent the user agent
+ * @return the client
+ */
+ static BytesocksClient create(String host, String userAgent) {
+ try {
+ return new BytesocksClientImpl(host, userAgent);
+ } catch (UnsupportedOperationException e) {
+ return null;
+ }
+ }
+
+ /**
+ * Creates a new bytesocks channel and returns a socket connected to it.
+ *
+ * @param listener the listener
+ * @return the socket
+ * @throws Exception if something goes wrong
+ */
+ Socket createAndConnect(Listener listener) throws Exception;
+
+ /**
+ * Connects to an existing bytesocks channel.
+ *
+ * @param channelId the channel id
+ * @param listener the listener
+ * @return the socket
+ * @throws Exception if something goes wrong
+ */
+ Socket connect(String channelId, Listener listener) throws Exception;
+
+ /**
+ * A socket connected to a bytesocks channel.
+ */
+ interface Socket {
+
+ /**
+ * Gets the id of the connected channel.
+ *
+ * @return the id of the channel
+ */
+ String getChannelId();
+
+ /**
+ * Gets if the socket is open.
+ *
+ * @return true if the socket is open
+ */
+ boolean isOpen();
+
+ /**
+ * Sends a message to the channel using the socket.
+ *
+ * @param msg the message to send
+ * @return a future to encapsulate the progress of sending the message
+ */
+ CompletableFuture<?> send(CharSequence msg);
+
+ /**
+ * Closes the socket.
+ *
+ * @param statusCode the status code
+ * @param reason the reason
+ */
+ void close(int statusCode, String reason);
+ }
+
+ /**
+ * Socket listener
+ */
+ interface Listener {
+
+ default void onOpen() {}
+
+ default void onError(Throwable error) {}
+
+ default void onText(CharSequence data) {}
+
+ default void onClose(int statusCode, String reason) {}
+ }
+
+}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/util/ws/BytesocksClientImpl.java b/spark-common/src/main/java/me/lucko/spark/common/util/ws/BytesocksClientImpl.java
new file mode 100644
index 0000000..cf1489c
--- /dev/null
+++ b/spark-common/src/main/java/me/lucko/spark/common/util/ws/BytesocksClientImpl.java
@@ -0,0 +1,40 @@
+/*
+ * This file is part of spark.
+ *
+ * Copyright (c) lucko (Luck) <luck@lucko.me>
+ * Copyright (c) contributors
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+package me.lucko.spark.common.util.ws;
+
+// Overridden by java 11 source set
+
+class BytesocksClientImpl implements BytesocksClient {
+
+ BytesocksClientImpl(String host, String userAgent) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Socket createAndConnect(Listener listener) throws Exception {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Socket connect(String channelId, Listener listener) throws Exception {
+ throw new UnsupportedOperationException();
+ }
+}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/ws/CryptoAlgorithm.java b/spark-common/src/main/java/me/lucko/spark/common/ws/CryptoAlgorithm.java
new file mode 100644
index 0000000..f6cf1db
--- /dev/null
+++ b/spark-common/src/main/java/me/lucko/spark/common/ws/CryptoAlgorithm.java
@@ -0,0 +1,90 @@
+/*
+ * This file is part of spark.
+ *
+ * Copyright (c) lucko (Luck) <luck@lucko.me>
+ * Copyright (c) contributors
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+package me.lucko.spark.common.ws;
+
+import com.google.protobuf.ByteString;
+
+import java.security.KeyFactory;
+import java.security.KeyPair;
+import java.security.KeyPairGenerator;
+import java.security.NoSuchAlgorithmException;
+import java.security.PublicKey;
+import java.security.Signature;
+import java.security.spec.X509EncodedKeySpec;
+
+/**
+ * An algorithm for keypair/signature cryptography.
+ */
+public enum CryptoAlgorithm {
+
+ Ed25519("Ed25519", 255, "Ed25519"),
+ RSA2048("RSA", 2048, "SHA256withRSA");
+
+ private final String keyAlgorithm;
+ private final int keySize;
+ private final String signatureAlgorithm;
+
+ CryptoAlgorithm(String keyAlgorithm, int keySize, String signatureAlgorithm) {
+ this.keyAlgorithm = keyAlgorithm;
+ this.keySize = keySize;
+ this.signatureAlgorithm = signatureAlgorithm;
+ }
+
+ public KeyPairGenerator createKeyPairGenerator() throws NoSuchAlgorithmException {
+ return KeyPairGenerator.getInstance(this.keyAlgorithm);
+ }
+
+ public KeyFactory createKeyFactory() throws NoSuchAlgorithmException {
+ return KeyFactory.getInstance(this.keyAlgorithm);
+ }
+
+ public Signature createSignature() throws NoSuchAlgorithmException {
+ return Signature.getInstance(this.signatureAlgorithm);
+ }
+
+ public KeyPair generateKeyPair() {
+ try {
+ KeyPairGenerator generator = createKeyPairGenerator();
+ generator.initialize(this.keySize);
+ return generator.generateKeyPair();
+ } catch (Exception e) {
+ throw new RuntimeException("Exception generating keypair", e);
+ }
+ }
+
+ public PublicKey decodePublicKey(byte[] bytes) throws IllegalArgumentException {
+ try {
+ X509EncodedKeySpec spec = new X509EncodedKeySpec(bytes);
+ KeyFactory factory = createKeyFactory();
+ return factory.generatePublic(spec);
+ } catch (Exception e) {
+ throw new IllegalArgumentException("Exception parsing public key", e);
+ }
+ }
+
+ public PublicKey decodePublicKey(ByteString bytes) throws IllegalArgumentException {
+ if (bytes == null) {
+ return null;
+ }
+ return decodePublicKey(bytes.toByteArray());
+ }
+
+}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/ws/TrustedKeyStore.java b/spark-common/src/main/java/me/lucko/spark/common/ws/TrustedKeyStore.java
new file mode 100644
index 0000000..1605a38
--- /dev/null
+++ b/spark-common/src/main/java/me/lucko/spark/common/ws/TrustedKeyStore.java
@@ -0,0 +1,139 @@
+/*
+ * This file is part of spark.
+ *
+ * Copyright (c) lucko (Luck) <luck@lucko.me>
+ * Copyright (c) contributors
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+package me.lucko.spark.common.ws;
+
+import me.lucko.spark.common.util.Configuration;
+
+import java.security.KeyPair;
+import java.security.PrivateKey;
+import java.security.PublicKey;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/**
+ * A store of trusted public keys.
+ */
+public class TrustedKeyStore {
+ private static final String TRUSTED_KEYS_OPTION = "trustedKeys";
+
+ /** The spark configuration */
+ private final Configuration configuration;
+ /** Gets the local public/private key */
+ private final CompletableFuture<KeyPair> localKeyPair;
+ /** A set of remote public keys to trust */
+ private final Set<PublicKey> remoteTrustedKeys;
+ /** A mpa of pending remote public keys */
+ private final Map<String, PublicKey> remotePendingKeys = new HashMap<>();
+
+ public TrustedKeyStore(Configuration configuration) {
+ this.configuration = configuration;
+ this.localKeyPair = CompletableFuture.supplyAsync(ViewerSocketConnection.CRYPTO::generateKeyPair);
+ this.remoteTrustedKeys = new HashSet<>();
+ readTrustedKeys();
+ }
+
+ /**
+ * Gets the local public key.
+ *
+ * @return the local public key
+ */
+ public PublicKey getLocalPublicKey() {
+ return this.localKeyPair.join().getPublic();
+ }
+
+ /**
+ * Gets the local private key.
+ *
+ * @return the local private key
+ */
+ public PrivateKey getLocalPrivateKey() {
+ return this.localKeyPair.join().getPrivate();
+ }
+
+ /**
+ * Checks if a remote public key is trusted
+ *
+ * @param publicKey the public key
+ * @return if the key is trusted
+ */
+ public boolean isKeyTrusted(PublicKey publicKey) {
+ return publicKey != null && this.remoteTrustedKeys.contains(publicKey);
+ }
+
+ /**
+ * Adds a pending public key to be trusted in the future.
+ *
+ * @param clientId the client id submitting the key
+ * @param publicKey the public key
+ */
+ public void addPendingKey(String clientId, PublicKey publicKey) {
+ this.remotePendingKeys.put(clientId, publicKey);
+ }
+
+ /**
+ * Trusts a previously submitted remote public key
+ *
+ * @param clientId the id of the client that submitted the key
+ * @return true if the key was found and trusted
+ */
+ public boolean trustPendingKey(String clientId) {
+ PublicKey key = this.remotePendingKeys.remove(clientId);
+ if (key == null) {
+ return false;
+ }
+
+ this.remoteTrustedKeys.add(key);
+ writeTrustedKeys();
+ return true;
+ }
+
+ /**
+ * Reads trusted keys from the configuration
+ */
+ private void readTrustedKeys() {
+ for (String encodedKey : this.configuration.getStringList(TRUSTED_KEYS_OPTION)) {
+ try {
+ PublicKey publicKey = ViewerSocketConnection.CRYPTO.decodePublicKey(Base64.getDecoder().decode(encodedKey));
+ this.remoteTrustedKeys.add(publicKey);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ /**
+ * Writes trusted keys to the configuration
+ */
+ private void writeTrustedKeys() {
+ List<String> encodedKeys = this.remoteTrustedKeys.stream()
+ .map(key -> Base64.getEncoder().encodeToString(key.getEncoded()))
+ .collect(Collectors.toList());
+
+ this.configuration.setStringList(TRUSTED_KEYS_OPTION, encodedKeys);
+ }
+
+}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/ws/ViewerSocket.java b/spark-common/src/main/java/me/lucko/spark/common/ws/ViewerSocket.java
new file mode 100644
index 0000000..5c7e08c
--- /dev/null
+++ b/spark-common/src/main/java/me/lucko/spark/common/ws/ViewerSocket.java
@@ -0,0 +1,255 @@
+/*
+ * This file is part of spark.
+ *
+ * Copyright (c) lucko (Luck) <luck@lucko.me>
+ * Copyright (c) contributors
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+package me.lucko.spark.common.ws;
+
+import com.google.protobuf.ByteString;
+
+import me.lucko.spark.common.SparkPlatform;
+import me.lucko.spark.common.sampler.AbstractSampler;
+import me.lucko.spark.common.sampler.Sampler;
+import me.lucko.spark.common.sampler.window.ProfilingWindowUtils;
+import me.lucko.spark.common.util.MediaTypes;
+import me.lucko.spark.common.util.ws.BytesocksClient;
+import me.lucko.spark.proto.SparkProtos;
+import me.lucko.spark.proto.SparkSamplerProtos;
+import me.lucko.spark.proto.SparkWebSocketProtos.ClientConnect;
+import me.lucko.spark.proto.SparkWebSocketProtos.ClientPing;
+import me.lucko.spark.proto.SparkWebSocketProtos.PacketWrapper;
+import me.lucko.spark.proto.SparkWebSocketProtos.ServerConnectResponse;
+import me.lucko.spark.proto.SparkWebSocketProtos.ServerPong;
+import me.lucko.spark.proto.SparkWebSocketProtos.ServerUpdateSamplerData;
+import me.lucko.spark.proto.SparkWebSocketProtos.ServerUpdateStatistics;
+
+import java.security.PublicKey;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+
+/**
+ * Represents a connection with the spark viewer.
+ */
+public class ViewerSocket implements ViewerSocketConnection.Listener, AutoCloseable {
+
+ /** Allow 60 seconds for the first client to connect */
+ private static final long SOCKET_INITIAL_TIMEOUT = TimeUnit.SECONDS.toMillis(60);
+
+ /** Once established, expect a ping at least once every 30 seconds */
+ private static final long SOCKET_ESTABLISHED_TIMEOUT = TimeUnit.SECONDS.toMillis(30);
+
+ /** The spark platform */
+ private final SparkPlatform platform;
+ /** The export props to use when exporting the sampler data */
+ private final Sampler.ExportProps exportProps;
+ /** The underlying connection */
+ private final ViewerSocketConnection socket;
+
+ private boolean closed = false;
+ private final long socketOpenTime = System.currentTimeMillis();
+ private long lastPing = 0;
+ private String lastPayloadId = null;
+
+ public ViewerSocket(SparkPlatform platform, BytesocksClient client, Sampler.ExportProps exportProps) throws Exception {
+ this.platform = platform;
+ this.exportProps = exportProps;
+ this.socket = new ViewerSocketConnection(platform, client, this);
+ }
+
+ private void log(String message) {
+ this.platform.getPlugin().log(Level.INFO, "[Viewer - " + this.socket.getChannelId() + "] " + message);
+ }
+
+ /**
+ * Gets the initial payload to send to the viewer.
+ *
+ * @return the payload
+ */
+ public SparkSamplerProtos.SocketChannelInfo getPayload() {
+ return SparkSamplerProtos.SocketChannelInfo.newBuilder()
+ .setChannelId(this.socket.getChannelId())
+ .setPublicKey(ByteString.copyFrom(this.platform.getTrustedKeyStore().getLocalPublicKey().getEncoded()))
+ .build();
+ }
+
+ public boolean isOpen() {
+ return !this.closed && this.socket.isOpen();
+ }
+
+ /**
+ * Called each time the sampler rotates to a new window.
+ *
+ * @param sampler the sampler
+ */
+ public void processWindowRotate(AbstractSampler sampler) {
+ if (this.closed) {
+ return;
+ }
+
+ long time = System.currentTimeMillis();
+ if ((time - this.socketOpenTime) > SOCKET_INITIAL_TIMEOUT && (time - this.lastPing) > SOCKET_ESTABLISHED_TIMEOUT) {
+ log("No clients have pinged for 30s, closing socket");
+ close();
+ return;
+ }
+
+ // no clients connected yet!
+ if (this.lastPing == 0) {
+ return;
+ }
+
+ try {
+ SparkSamplerProtos.SamplerData samplerData = sampler.toProto(this.platform, this.exportProps);
+ String key = this.platform.getBytebinClient().postContent(samplerData, MediaTypes.SPARK_SAMPLER_MEDIA_TYPE, "live").key();
+ sendUpdatedSamplerData(key);
+ } catch (Exception e) {
+ this.platform.getPlugin().log(Level.WARNING, "Error whilst sending updated sampler data to the socket");
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * Called when the sampler stops.
+ *
+ * @param sampler the sampler
+ */
+ public void processSamplerStopped(AbstractSampler sampler) {
+ if (this.closed) {
+ return;
+ }
+
+ close();
+ }
+
+ @Override
+ public void close() {
+ this.socket.sendPacket(builder -> builder.setServerPong(ServerPong.newBuilder()
+ .setOk(false)
+ .build()
+ ));
+ this.socket.close();
+ this.closed = true;
+ }
+
+ @Override
+ public boolean isKeyTrusted(PublicKey publicKey) {
+ return this.platform.getTrustedKeyStore().isKeyTrusted(publicKey);
+ }
+
+ /**
+ * Sends a message to the socket to say that the given client is now trusted.
+ *
+ * @param clientId the client id
+ */
+ public void sendClientTrustedMessage(String clientId) {
+ this.socket.sendPacket(builder -> builder.setServerConnectResponse(ServerConnectResponse.newBuilder()
+ .setClientId(clientId)
+ .setState(ServerConnectResponse.State.ACCEPTED)
+ .build()
+ ));
+ }
+
+ /**
+ * Sends a message to the socket to indicate that updated sampler data is available
+ *
+ * @param payloadId the payload id of the updated data
+ */
+ public void sendUpdatedSamplerData(String payloadId) {
+ this.socket.sendPacket(builder -> builder.setServerUpdateSampler(ServerUpdateSamplerData.newBuilder()
+ .setPayloadId(payloadId)
+ .build()
+ ));
+ this.lastPayloadId = payloadId;
+ }
+
+ /**
+ * Sends a message to the socket with updated statistics
+ *
+ * @param platform the platform statistics
+ * @param system the system statistics
+ */
+ public void sendUpdatedStatistics(SparkProtos.PlatformStatistics platform, SparkProtos.SystemStatistics system) {
+ this.socket.sendPacket(builder -> builder.setServerUpdateStatistics(ServerUpdateStatistics.newBuilder()
+ .setPlatform(platform)
+ .setSystem(system)
+ .build()
+ ));
+ }
+
+ @Override
+ public void onPacket(PacketWrapper packet, boolean verified, PublicKey publicKey) throws Exception {
+ switch (packet.getPacketCase()) {
+ case CLIENT_PING:
+ onClientPing(packet.getClientPing(), publicKey);
+ break;
+ case CLIENT_CONNECT:
+ onClientConnect(packet.getClientConnect(), verified, publicKey);
+ break;
+ default:
+ throw new IllegalArgumentException("Unexpected packet: " + packet.getPacketCase());
+ }
+ }
+
+ private void onClientPing(ClientPing packet, PublicKey publicKey) {
+ this.lastPing = System.currentTimeMillis();
+ this.socket.sendPacket(builder -> builder.setServerPong(ServerPong.newBuilder()
+ .setOk(!this.closed)
+ .setData(packet.getData())
+ .build()
+ ));
+ }
+
+ private void onClientConnect(ClientConnect packet, boolean verified, PublicKey publicKey) {
+ if (publicKey == null) {
+ throw new IllegalStateException("Missing public key");
+ }
+
+ this.lastPing = System.currentTimeMillis();
+
+ String clientId = packet.getClientId();
+ log("Client connected: clientId=" + clientId + ", keyhash=" + hashPublicKey(publicKey) + ", desc=" + packet.getDescription());
+
+ ServerConnectResponse.Builder resp = ServerConnectResponse.newBuilder()
+ .setClientId(clientId)
+ .setSettings(ServerConnectResponse.Settings.newBuilder()
+ .setSamplerInterval(ProfilingWindowUtils.WINDOW_SIZE_SECONDS)
+ .setStatisticsInterval(10)
+ .build()
+ );
+
+ if (this.lastPayloadId != null) {
+ resp.setLastPayloadId(this.lastPayloadId);
+ }
+
+ if (this.closed) {
+ resp.setState(ServerConnectResponse.State.REJECTED);
+ } else if (verified) {
+ resp.setState(ServerConnectResponse.State.ACCEPTED);
+ } else {
+ resp.setState(ServerConnectResponse.State.UNTRUSTED);
+ this.platform.getTrustedKeyStore().addPendingKey(clientId, publicKey);
+ }
+
+ this.socket.sendPacket(builder -> builder.setServerConnectResponse(resp.build()));
+ }
+
+ private static String hashPublicKey(PublicKey publicKey) {
+ return publicKey == null ? "null" : Integer.toHexString(publicKey.hashCode());
+ }
+
+}
diff --git a/spark-common/src/main/java/me/lucko/spark/common/ws/ViewerSocketConnection.java b/spark-common/src/main/java/me/lucko/spark/common/ws/ViewerSocketConnection.java
new file mode 100644
index 0000000..f870cb7
--- /dev/null
+++ b/spark-common/src/main/java/me/lucko/spark/common/ws/ViewerSocketConnection.java
@@ -0,0 +1,218 @@
+/*
+ * This file is part of spark.
+ *
+ * Copyright (c) lucko (Luck) <luck@lucko.me>
+ * Copyright (c) contributors
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+package me.lucko.spark.common.ws;
+
+import com.google.protobuf.ByteString;
+
+import me.lucko.spark.common.SparkPlatform;
+import me.lucko.spark.common.util.ws.BytesocksClient;
+import me.lucko.spark.proto.SparkWebSocketProtos.PacketWrapper;
+import me.lucko.spark.proto.SparkWebSocketProtos.RawPacket;
+
+import java.io.IOException;
+import java.security.PrivateKey;
+import java.security.PublicKey;
+import java.security.Signature;
+import java.util.Base64;
+import java.util.function.Consumer;
+import java.util.logging.Level;
+
+/**
+ * Controls a websocket connection between a spark server (the plugin/mod) and a spark client (the web viewer).
+ */
+public class ViewerSocketConnection implements BytesocksClient.Listener, AutoCloseable {
+
+ /** The protocol version */
+ public static final int VERSION_1 = 1;
+ /** The crypto algorithm used to sign/verify messages sent between the server and client */
+ public static final CryptoAlgorithm CRYPTO = CryptoAlgorithm.RSA2048;
+
+ /** The platform */
+ private final SparkPlatform platform;
+ /** The underlying listener */
+ private final Listener listener;
+ /** The private key used to sign messages sent from this connection */
+ private final PrivateKey privateKey;
+ /** The bytesocks socket */
+ private final BytesocksClient.Socket socket;
+
+ public ViewerSocketConnection(SparkPlatform platform, BytesocksClient client, Listener listener) throws Exception {
+ this.platform = platform;
+ this.listener = listener;
+ this.privateKey = platform.getTrustedKeyStore().getLocalPrivateKey();
+ this.socket = client.createAndConnect(this);
+ }
+
+ public interface Listener {
+
+ /**
+ * Checks if the given public key is trusted
+ *
+ * @param publicKey the public key
+ * @return true if trusted
+ */
+ boolean isKeyTrusted(PublicKey publicKey);
+
+ /**
+ * Handles a packet sent to the socket
+ *
+ * @param packet the packet that was sent
+ * @param verified if the packet was signed by a trusted key
+ * @param publicKey the public key the packet was signed with
+ */
+ void onPacket(PacketWrapper packet, boolean verified, PublicKey publicKey) throws Exception;
+ }
+
+ /**
+ * Gets the bytesocks channel id
+ *
+ * @return the channel id
+ */
+ public String getChannelId() {
+ return this.socket.getChannelId();
+ }
+
+ /**
+ * Gets if the underlying socket is open
+ *
+ * @return true if the socket is open
+ */
+ public boolean isOpen() {
+ return this.socket.isOpen();
+ }
+
+ @Override
+ public void onText(CharSequence data) {
+ try {
+ RawPacket packet = decodeRawPacket(data);
+ handleRawPacket(packet);
+ } catch (Exception e) {
+ this.platform.getPlugin().log(Level.WARNING, "Exception occurred while reading data from the socket");
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void onError(Throwable error) {
+ this.platform.getPlugin().log(Level.INFO, "Socket error: " + error.getClass().getName() + " " + error.getMessage());
+ error.printStackTrace();
+ }
+
+ @Override
+ public void onClose(int statusCode, String reason) {
+ //this.platform.getPlugin().log(Level.INFO, "Socket closed with status " + statusCode + " and reason " + reason);
+ }
+
+ /**
+ * Sends a packet to the socket.
+ *
+ * @param packetBuilder the builder to construct the wrapper packet
+ */
+ public void sendPacket(Consumer<PacketWrapper.Builder> packetBuilder) {
+ PacketWrapper.Builder builder = PacketWrapper.newBuilder();
+ packetBuilder.accept(builder);
+ PacketWrapper wrapper = builder.build();
+
+ try {
+ sendPacket(wrapper);
+ } catch (Exception e) {
+ this.platform.getPlugin().log(Level.WARNING, "Exception occurred while sending data to the socket");
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * Sends a packet to the socket.
+ *
+ * @param packet the packet to send
+ */
+ private void sendPacket(PacketWrapper packet) throws Exception {
+ ByteString msg = packet.toByteString();
+
+ // sign the message using the server private key
+ Signature sign = CRYPTO.createSignature();
+ sign.initSign(this.privateKey);
+ sign.update(msg.asReadOnlyByteBuffer());
+ byte[] signature = sign.sign();
+
+ sendRawPacket(RawPacket.newBuilder()
+ .setVersion(VERSION_1)
+ .setSignature(ByteString.copyFrom(signature))
+ .setMessage(msg)
+ .build()
+ );
+ }
+
+ /**
+ * Sends a raw packet to the socket.
+ *
+ * @param packet the packet to send
+ */
+ private void sendRawPacket(RawPacket packet) throws IOException {
+ byte[] buf = packet.toByteArray();
+ String encoded = Base64.getEncoder().encodeToString(buf);
+ this.socket.send(encoded);
+ }
+
+ /**
+ * Decodes a raw packet sent to the socket.
+ *
+ * @param data the encoded data
+ * @return the decoded packet
+ */
+ private RawPacket decodeRawPacket(CharSequence data) throws IOException {
+ byte[] buf = Base64.getDecoder().decode(data.toString());
+ return RawPacket.parseFrom(buf);
+ }
+
+ /**
+ * Handles a raw packet sent to the socket
+ *
+ * @param packet the packet
+ */
+ private void handleRawPacket(RawPacket packet) throws Exception {
+ int version = packet.getVersion();
+ if (version != VERSION_1) {
+ throw new IllegalArgumentException("Unsupported packet version " + version);
+ }
+
+ ByteString message = packet.getMessage();
+ PublicKey publicKey = CRYPTO.decodePublicKey(packet.getPublicKey());
+ ByteString signature = packet.getSignature();
+
+ boolean verified = false;
+ if (signature != null && publicKey != null && this.listener.isKeyTrusted(publicKey)) {
+ Signature sign = CRYPTO.createSignature();
+ sign.initVerify(publicKey);
+ sign.update(message.asReadOnlyByteBuffer());
+
+ verified = sign.verify(signature.toByteArray());
+ }
+
+ PacketWrapper wrapper = PacketWrapper.parseFrom(message);
+ this.listener.onPacket(wrapper, verified, publicKey);
+ }
+
+ @Override
+ public void close() {
+ this.socket.close(1001 /* going away */, "spark plugin disconnected");
+ }
+}