diff options
author | lucko <git@lucko.me> | 2023-01-28 11:07:45 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-01-28 11:07:45 +0000 |
commit | 06b794dcea806150770fb88d43e366a3496a9d0f (patch) | |
tree | 15ed2a6b7de22d90844b67291250dab4ec14eeda /spark-common/src/main/java/me | |
parent | d83e49128ad59308f4b3ff19cf4b22b53236be8d (diff) | |
download | spark-06b794dcea806150770fb88d43e366a3496a9d0f.tar.gz spark-06b794dcea806150770fb88d43e366a3496a9d0f.tar.bz2 spark-06b794dcea806150770fb88d43e366a3496a9d0f.zip |
Stream live data to the viewer using WebSockets (#294)
Diffstat (limited to 'spark-common/src/main/java/me')
19 files changed, 1247 insertions, 51 deletions
diff --git a/spark-common/src/main/java/me/lucko/spark/common/SparkPlatform.java b/spark-common/src/main/java/me/lucko/spark/common/SparkPlatform.java index dae04ff..61c6062 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/SparkPlatform.java +++ b/spark-common/src/main/java/me/lucko/spark/common/SparkPlatform.java @@ -53,6 +53,8 @@ import me.lucko.spark.common.tick.TickReporter; import me.lucko.spark.common.util.BytebinClient; import me.lucko.spark.common.util.Configuration; import me.lucko.spark.common.util.TemporaryFiles; +import me.lucko.spark.common.util.ws.BytesocksClient; +import me.lucko.spark.common.ws.TrustedKeyStore; import net.kyori.adventure.text.Component; import net.kyori.adventure.text.event.ClickEvent; @@ -95,6 +97,8 @@ public class SparkPlatform { private final Configuration configuration; private final String viewerUrl; private final BytebinClient bytebinClient; + private final BytesocksClient bytesocksClient; + private final TrustedKeyStore trustedKeyStore; private final boolean disableResponseBroadcast; private final List<CommandModule> commandModules; private final List<Command> commands; @@ -118,8 +122,12 @@ public class SparkPlatform { this.configuration = new Configuration(this.plugin.getPluginDirectory().resolve("config.json")); this.viewerUrl = this.configuration.getString("viewerUrl", "https://spark.lucko.me/"); - String bytebinUrl = this.configuration.getString("bytebinUrl", "https://bytebin.lucko.me/"); + String bytebinUrl = this.configuration.getString("bytebinUrl", "https://spark-usercontent.lucko.me/"); + String bytesocksHost = this.configuration.getString("bytesocksHost", "spark-usersockets.lucko.me"); + this.bytebinClient = new BytebinClient(bytebinUrl, "spark-plugin"); + this.bytesocksClient = BytesocksClient.create(bytesocksHost, "spark-plugin"); + this.trustedKeyStore = new TrustedKeyStore(this.configuration); this.disableResponseBroadcast = this.configuration.getBoolean("disableResponseBroadcast", false); @@ -228,6 +236,14 @@ public class SparkPlatform { return this.bytebinClient; } + public BytesocksClient getBytesocksClient() { + return this.bytesocksClient; + } + + public TrustedKeyStore getTrustedKeyStore() { + return this.trustedKeyStore; + } + public boolean shouldBroadcastResponse() { return !this.disableResponseBroadcast; } diff --git a/spark-common/src/main/java/me/lucko/spark/common/command/modules/HeapAnalysisModule.java b/spark-common/src/main/java/me/lucko/spark/common/command/modules/HeapAnalysisModule.java index 5bd62a8..6ac3b2f 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/command/modules/HeapAnalysisModule.java +++ b/spark-common/src/main/java/me/lucko/spark/common/command/modules/HeapAnalysisModule.java @@ -32,6 +32,7 @@ import me.lucko.spark.common.heapdump.HeapDump; import me.lucko.spark.common.heapdump.HeapDumpSummary; import me.lucko.spark.common.util.Compression; import me.lucko.spark.common.util.FormatUtil; +import me.lucko.spark.common.util.MediaTypes; import me.lucko.spark.proto.SparkHeapProtos; import net.kyori.adventure.text.event.ClickEvent; @@ -52,7 +53,6 @@ import static net.kyori.adventure.text.format.NamedTextColor.GREEN; import static net.kyori.adventure.text.format.NamedTextColor.RED; public class HeapAnalysisModule implements CommandModule { - private static final String SPARK_HEAP_MEDIA_TYPE = "application/x-spark-heap"; @Override public void registerCommands(Consumer<Command> consumer) { @@ -97,7 +97,7 @@ public class HeapAnalysisModule implements CommandModule { saveToFile = true; } else { try { - String key = platform.getBytebinClient().postContent(output, SPARK_HEAP_MEDIA_TYPE).key(); + String key = platform.getBytebinClient().postContent(output, MediaTypes.SPARK_HEAP_MEDIA_TYPE).key(); String url = platform.getViewerUrl() + key; resp.broadcastPrefixed(text("Heap dump summmary output:", GOLD)); diff --git a/spark-common/src/main/java/me/lucko/spark/common/command/modules/SamplerModule.java b/spark-common/src/main/java/me/lucko/spark/common/command/modules/SamplerModule.java index 041cacf..049c817 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/command/modules/SamplerModule.java +++ b/spark-common/src/main/java/me/lucko/spark/common/command/modules/SamplerModule.java @@ -41,7 +41,10 @@ import me.lucko.spark.common.sampler.node.MergeMode; import me.lucko.spark.common.sampler.source.ClassSourceLookup; import me.lucko.spark.common.tick.TickHook; import me.lucko.spark.common.util.FormatUtil; +import me.lucko.spark.common.util.MediaTypes; import me.lucko.spark.common.util.MethodDisambiguator; +import me.lucko.spark.common.util.ws.BytesocksClient; +import me.lucko.spark.common.ws.ViewerSocket; import me.lucko.spark.proto.SparkSamplerProtos; import net.kyori.adventure.text.Component; @@ -68,7 +71,6 @@ import static net.kyori.adventure.text.format.NamedTextColor.RED; import static net.kyori.adventure.text.format.NamedTextColor.WHITE; public class SamplerModule implements CommandModule { - private static final String SPARK_SAMPLER_MEDIA_TYPE = "application/x-spark-sampler"; @Override public void registerCommands(Consumer<Command> consumer) { @@ -76,6 +78,7 @@ public class SamplerModule implements CommandModule { .aliases("profiler", "sampler") .allowSubCommand(true) .argumentUsage("info", "", null) + .argumentUsage("open", "", null) .argumentUsage("start", "timeout", "timeout seconds") .argumentUsage("start", "thread *", null) .argumentUsage("start", "thread", "thread name") @@ -103,7 +106,7 @@ public class SamplerModule implements CommandModule { } return TabCompleter.create() - .at(0, CompletionSupplier.startsWith(Arrays.asList("info", "start", "stop", "cancel"))) + .at(0, CompletionSupplier.startsWith(Arrays.asList("info", "start", "open", "stop", "cancel"))) .from(1, CompletionSupplier.startsWith(opts)) .complete(arguments); }) @@ -119,6 +122,16 @@ public class SamplerModule implements CommandModule { return; } + if (subCommand.equals("open") || arguments.boolFlag("open")) { + profilerOpen(platform, sender, resp, arguments); + return; + } + + if (subCommand.equals("trust-viewer") || arguments.boolFlag("trust-viewer")) { + profilerTrustViewer(platform, sender, resp, arguments); + return; + } + if (subCommand.equals("cancel") || arguments.boolFlag("cancel")) { profilerCancel(platform, resp); return; @@ -254,6 +267,8 @@ public class SamplerModule implements CommandModule { resp.broadcastPrefixed(text("It will run in the background until it is stopped by an admin.")); resp.broadcastPrefixed(text("To stop the profiler and upload the results, run:")); resp.broadcastPrefixed(cmdPrompt("/" + platform.getPlugin().getCommandName() + " profiler stop")); + resp.broadcastPrefixed(text("To view the profiler while it's running, run:")); + resp.broadcastPrefixed(cmdPrompt("/" + platform.getPlugin().getCommandName() + " profiler open")); } else { resp.broadcastPrefixed(text("The results will be automatically returned after the profiler has been running for " + FormatUtil.formatSeconds(timeoutSeconds) + ".")); } @@ -273,13 +288,11 @@ public class SamplerModule implements CommandModule { // await the result if (timeoutSeconds != -1) { - String comment = Iterables.getFirst(arguments.stringFlag("comment"), null); - MethodDisambiguator methodDisambiguator = new MethodDisambiguator(); - MergeMode mergeMode = arguments.boolFlag("separate-parent-calls") ? MergeMode.separateParentCalls(methodDisambiguator) : MergeMode.sameMethod(methodDisambiguator); + Sampler.ExportProps exportProps = getExportProps(platform, resp, arguments); boolean saveToFile = arguments.boolFlag("save-to-file"); future.thenAcceptAsync(s -> { resp.broadcastPrefixed(text("The active profiler has completed! Uploading results...")); - handleUpload(platform, resp, s, comment, mergeMode, saveToFile); + handleUpload(platform, resp, s, exportProps, saveToFile); }); } } @@ -306,6 +319,9 @@ public class SamplerModule implements CommandModule { resp.replyPrefixed(text("So far, it has profiled for " + FormatUtil.formatSeconds(runningTime) + ".")); } + resp.replyPrefixed(text("To view the profiler while it's running, run:")); + resp.replyPrefixed(cmdPrompt("/" + platform.getPlugin().getCommandName() + " profiler open")); + long timeout = sampler.getAutoEndTime(); if (timeout == -1) { resp.replyPrefixed(text("To stop the profiler and upload the results, run:")); @@ -320,6 +336,48 @@ public class SamplerModule implements CommandModule { } } + private void profilerOpen(SparkPlatform platform, CommandSender sender, CommandResponseHandler resp, Arguments arguments) { + BytesocksClient bytesocksClient = platform.getBytesocksClient(); + if (bytesocksClient == null) { + resp.replyPrefixed(text("The live viewer is only supported on Java 11 or newer.", RED)); + return; + } + + Sampler sampler = platform.getSamplerContainer().getActiveSampler(); + if (sampler == null) { + resp.replyPrefixed(text("The profiler isn't running!")); + resp.replyPrefixed(text("To start a new one, run:")); + resp.replyPrefixed(cmdPrompt("/" + platform.getPlugin().getCommandName() + " profiler start")); + return; + } + + Sampler.ExportProps exportProps = getExportProps(platform, resp, arguments); + handleOpen(platform, bytesocksClient, resp, sampler, exportProps); + } + + private void profilerTrustViewer(SparkPlatform platform, CommandSender sender, CommandResponseHandler resp, Arguments arguments) { + Set<String> ids = arguments.stringFlag("id"); + if (ids.isEmpty()) { + resp.replyPrefixed(text("Please provide a client id with '--id <client id>'.")); + return; + } + + for (String id : ids) { + boolean success = platform.getTrustedKeyStore().trustPendingKey(id); + if (success) { + Sampler sampler = platform.getSamplerContainer().getActiveSampler(); + if (sampler != null) { + for (ViewerSocket socket : sampler.getAttachedSockets()) { + socket.sendClientTrustedMessage(id); + } + } + resp.replyPrefixed(text("Client connected to the viewer using id '" + id + "' is now trusted.")); + } else { + resp.replyPrefixed(text("Unable to find pending client with id '" + id + "'.")); + } + } + } + private void profilerCancel(SparkPlatform platform, CommandResponseHandler resp) { Sampler sampler = platform.getSamplerContainer().getActiveSampler(); if (sampler == null) { @@ -346,10 +404,8 @@ public class SamplerModule implements CommandModule { resp.broadcastPrefixed(text("Stopping the profiler & uploading results, please wait...")); } - String comment = Iterables.getFirst(arguments.stringFlag("comment"), null); - MethodDisambiguator methodDisambiguator = new MethodDisambiguator(); - MergeMode mergeMode = arguments.boolFlag("separate-parent-calls") ? MergeMode.separateParentCalls(methodDisambiguator) : MergeMode.sameMethod(methodDisambiguator); - handleUpload(platform, resp, sampler, comment, mergeMode, saveToFile); + Sampler.ExportProps exportProps = getExportProps(platform, resp, arguments); + handleUpload(platform, resp, sampler, exportProps, saveToFile); // if the previous sampler was running in the background, create a new one if (platform.getBackgroundSamplerManager().restartBackgroundSampler()) { @@ -362,15 +418,15 @@ public class SamplerModule implements CommandModule { } } - private void handleUpload(SparkPlatform platform, CommandResponseHandler resp, Sampler sampler, String comment, MergeMode mergeMode, boolean saveToFileFlag) { - SparkSamplerProtos.SamplerData output = sampler.toProto(platform, resp.sender(), comment, mergeMode, ClassSourceLookup.create(platform)); + private void handleUpload(SparkPlatform platform, CommandResponseHandler resp, Sampler sampler, Sampler.ExportProps exportProps, boolean saveToFileFlag) { + SparkSamplerProtos.SamplerData output = sampler.toProto(platform, exportProps); boolean saveToFile = false; if (saveToFileFlag) { saveToFile = true; } else { try { - String key = platform.getBytebinClient().postContent(output, SPARK_SAMPLER_MEDIA_TYPE).key(); + String key = platform.getBytebinClient().postContent(output, MediaTypes.SPARK_SAMPLER_MEDIA_TYPE).key(); String url = platform.getViewerUrl() + key; resp.broadcastPrefixed(text("Profiler stopped & upload complete!", GOLD)); @@ -406,6 +462,45 @@ public class SamplerModule implements CommandModule { } } + private void handleOpen(SparkPlatform platform, BytesocksClient bytesocksClient, CommandResponseHandler resp, Sampler sampler, Sampler.ExportProps exportProps) { + try { + ViewerSocket socket = new ViewerSocket(platform, bytesocksClient, exportProps); + sampler.attachSocket(socket); + exportProps.channelInfo(socket.getPayload()); + + SparkSamplerProtos.SamplerData data = sampler.toProto(platform, exportProps); + + String key = platform.getBytebinClient().postContent(data, MediaTypes.SPARK_SAMPLER_MEDIA_TYPE, "live").key(); + String url = platform.getViewerUrl() + key; + + resp.broadcastPrefixed(text("Profiler live viewer:", GOLD)); + resp.broadcast(text() + .content(url) + .color(GRAY) + .clickEvent(ClickEvent.openUrl(url)) + .build() + ); + + platform.getActivityLog().addToLog(Activity.urlActivity(resp.sender(), System.currentTimeMillis(), "Profiler (live)", url)); + } catch (Exception e) { + resp.replyPrefixed(text("An error occurred whilst opening the live profiler.", RED)); + e.printStackTrace(); + } + } + + private Sampler.ExportProps getExportProps(SparkPlatform platform, CommandResponseHandler resp, Arguments arguments) { + return new Sampler.ExportProps() + .creator(resp.sender().toData()) + .comment(Iterables.getFirst(arguments.stringFlag("comment"), null)) + .mergeMode(() -> { + MethodDisambiguator methodDisambiguator = new MethodDisambiguator(); + return arguments.boolFlag("separate-parent-calls") + ? MergeMode.separateParentCalls(methodDisambiguator) + : MergeMode.sameMethod(methodDisambiguator); + }) + .classSourceLookup(() -> ClassSourceLookup.create(platform)); + } + private static Component cmdPrompt(String cmd) { return text() .append(text(" ")) diff --git a/spark-common/src/main/java/me/lucko/spark/common/heapdump/HeapDumpSummary.java b/spark-common/src/main/java/me/lucko/spark/common/heapdump/HeapDumpSummary.java index c0980e7..eaedd31 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/heapdump/HeapDumpSummary.java +++ b/spark-common/src/main/java/me/lucko/spark/common/heapdump/HeapDumpSummary.java @@ -130,7 +130,7 @@ public final class HeapDumpSummary { .setPlatformMetadata(platform.getPlugin().getPlatformInfo().toData().toProto()) .setCreator(creator.toData().toProto()); try { - metadata.setPlatformStatistics(platform.getStatisticsProvider().getPlatformStatistics(null)); + metadata.setPlatformStatistics(platform.getStatisticsProvider().getPlatformStatistics(null, true)); } catch (Exception e) { e.printStackTrace(); } diff --git a/spark-common/src/main/java/me/lucko/spark/common/platform/PlatformStatisticsProvider.java b/spark-common/src/main/java/me/lucko/spark/common/platform/PlatformStatisticsProvider.java index fc7e78a..059590d 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/platform/PlatformStatisticsProvider.java +++ b/spark-common/src/main/java/me/lucko/spark/common/platform/PlatformStatisticsProvider.java @@ -128,7 +128,7 @@ public class PlatformStatisticsProvider { return builder.build(); } - public PlatformStatistics getPlatformStatistics(Map<String, GarbageCollectorStatistics> startingGcStatistics) { + public PlatformStatistics getPlatformStatistics(Map<String, GarbageCollectorStatistics> startingGcStatistics, boolean includeWorld) { PlatformStatistics.Builder builder = PlatformStatistics.newBuilder(); MemoryUsage memoryUsage = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage(); @@ -187,19 +187,20 @@ public class PlatformStatisticsProvider { builder.setPlayerCount(playerCount); } - try { - WorldStatisticsProvider worldStatisticsProvider = new WorldStatisticsProvider( - new AsyncWorldInfoProvider(this.platform, this.platform.getPlugin().createWorldInfoProvider()) - ); - WorldStatistics worldStatistics = worldStatisticsProvider.getWorldStatistics(); - if (worldStatistics != null) { - builder.setWorld(worldStatistics); + if (includeWorld) { + try { + WorldStatisticsProvider worldStatisticsProvider = new WorldStatisticsProvider( + new AsyncWorldInfoProvider(this.platform, this.platform.getPlugin().createWorldInfoProvider()) + ); + WorldStatistics worldStatistics = worldStatisticsProvider.getWorldStatistics(); + if (worldStatistics != null) { + builder.setWorld(worldStatistics); + } + } catch (Exception e) { + e.printStackTrace(); } - } catch (Exception e) { - e.printStackTrace(); } - return builder.build(); } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/AbstractSampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/AbstractSampler.java index 5abe71f..d814002 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/AbstractSampler.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/AbstractSampler.java @@ -32,9 +32,12 @@ import me.lucko.spark.common.sampler.source.ClassSourceLookup; import me.lucko.spark.common.sampler.source.SourceMetadata; import me.lucko.spark.common.sampler.window.ProtoTimeEncoder; import me.lucko.spark.common.sampler.window.WindowStatisticsCollector; +import me.lucko.spark.common.ws.ViewerSocket; +import me.lucko.spark.proto.SparkProtos; import me.lucko.spark.proto.SparkSamplerProtos.SamplerData; import me.lucko.spark.proto.SparkSamplerProtos.SamplerMetadata; +import java.util.ArrayList; import java.util.Collection; import java.util.Comparator; import java.util.List; @@ -74,6 +77,9 @@ public abstract class AbstractSampler implements Sampler { /** The garbage collector statistics when profiling started */ protected Map<String, GarbageCollectorStatistics> initialGcStats; + /** A set of viewer sockets linked to the sampler */ + protected List<ViewerSocket> viewerSockets = new ArrayList<>(); + protected AbstractSampler(SparkPlatform platform, SamplerSettings settings) { this.platform = platform; this.interval = settings.interval(); @@ -122,13 +128,54 @@ public abstract class AbstractSampler implements Sampler { @Override public void stop(boolean cancelled) { this.windowStatisticsCollector.stop(); + for (ViewerSocket viewerSocket : this.viewerSockets) { + viewerSocket.processSamplerStopped(this); + } + } + + @Override + public void attachSocket(ViewerSocket socket) { + this.viewerSockets.add(socket); + } + + @Override + public Collection<ViewerSocket> getAttachedSockets() { + return this.viewerSockets; + } + + protected void processWindowRotate() { + this.viewerSockets.removeIf(socket -> { + if (!socket.isOpen()) { + return true; + } + + socket.processWindowRotate(this); + return false; + }); + } + + protected void sendStatisticsToSocket() { + try { + if (this.viewerSockets.isEmpty()) { + return; + } + + SparkProtos.PlatformStatistics platform = this.platform.getStatisticsProvider().getPlatformStatistics(getInitialGcStats(), false); + SparkProtos.SystemStatistics system = this.platform.getStatisticsProvider().getSystemStatistics(); + + for (ViewerSocket viewerSocket : this.viewerSockets) { + viewerSocket.sendUpdatedStatistics(platform, system); + } + } catch (Exception e) { + e.printStackTrace(); + } } - protected void writeMetadataToProto(SamplerData.Builder proto, SparkPlatform platform, CommandSender creator, String comment, DataAggregator dataAggregator) { + protected void writeMetadataToProto(SamplerData.Builder proto, SparkPlatform platform, CommandSender.Data creator, String comment, DataAggregator dataAggregator) { SamplerMetadata.Builder metadata = SamplerMetadata.newBuilder() .setSamplerMode(getMode().asProto()) .setPlatformMetadata(platform.getPlugin().getPlatformInfo().toData().toProto()) - .setCreator(creator.toData().toProto()) + .setCreator(creator.toProto()) .setStartTime(this.startTime) .setEndTime(System.currentTimeMillis()) .setInterval(this.interval) @@ -145,7 +192,7 @@ public abstract class AbstractSampler implements Sampler { } try { - metadata.setPlatformStatistics(platform.getStatisticsProvider().getPlatformStatistics(getInitialGcStats())); + metadata.setPlatformStatistics(platform.getStatisticsProvider().getPlatformStatistics(getInitialGcStats(), true)); } catch (Exception e) { e.printStackTrace(); } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java index aaf4f38..844ab0b 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/Sampler.java @@ -24,9 +24,13 @@ import me.lucko.spark.common.SparkPlatform; import me.lucko.spark.common.command.sender.CommandSender; import me.lucko.spark.common.sampler.node.MergeMode; import me.lucko.spark.common.sampler.source.ClassSourceLookup; +import me.lucko.spark.common.ws.ViewerSocket; import me.lucko.spark.proto.SparkSamplerProtos.SamplerData; +import me.lucko.spark.proto.SparkSamplerProtos.SocketChannelInfo; +import java.util.Collection; import java.util.concurrent.CompletableFuture; +import java.util.function.Supplier; /** * Abstract superinterface for all sampler implementations. @@ -44,6 +48,20 @@ public interface Sampler { void stop(boolean cancelled); /** + * Attaches a viewer socket to this sampler. + * + * @param socket the socket + */ + void attachSocket(ViewerSocket socket); + + /** + * Gets the sockets attached to this sampler. + * + * @return the attached sockets + */ + Collection<ViewerSocket> getAttachedSockets(); + + /** * Gets the time when the sampler started (unix timestamp in millis) * * @return the start time @@ -79,6 +97,62 @@ public interface Sampler { CompletableFuture<Sampler> getFuture(); // Methods used to export the sampler data to the web viewer. - SamplerData toProto(SparkPlatform platform, CommandSender creator, String comment, MergeMode mergeMode, ClassSourceLookup classSourceLookup); + SamplerData toProto(SparkPlatform platform, ExportProps exportProps); + + final class ExportProps { + private CommandSender.Data creator; + private String comment; + private Supplier<MergeMode> mergeMode; + private Supplier<ClassSourceLookup> classSourceLookup; + private SocketChannelInfo channelInfo; + + public ExportProps() { + } + + public CommandSender.Data creator() { + return this.creator; + } + + public String comment() { + return this.comment; + } + + public Supplier<MergeMode> mergeMode() { + return this.mergeMode; + } + + public Supplier<ClassSourceLookup> classSourceLookup() { + return this.classSourceLookup; + } + + public SocketChannelInfo channelInfo() { + return this.channelInfo; + } + + public ExportProps creator(CommandSender.Data creator) { + this.creator = creator; + return this; + } + + public ExportProps comment(String comment) { + this.comment = comment; + return this; + } + + public ExportProps mergeMode(Supplier<MergeMode> mergeMode) { + this.mergeMode = mergeMode; + return this; + } + + public ExportProps classSourceLookup(Supplier<ClassSourceLookup> classSourceLookup) { + this.classSourceLookup = classSourceLookup; + return this; + } + + public ExportProps channelInfo(SocketChannelInfo channelInfo) { + this.channelInfo = channelInfo; + return this; + } + } } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java index 2328582..ec88677 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java @@ -23,18 +23,17 @@ package me.lucko.spark.common.sampler.async; import com.google.common.util.concurrent.ThreadFactoryBuilder; import me.lucko.spark.common.SparkPlatform; -import me.lucko.spark.common.command.sender.CommandSender; import me.lucko.spark.common.sampler.AbstractSampler; import me.lucko.spark.common.sampler.SamplerMode; import me.lucko.spark.common.sampler.SamplerSettings; -import me.lucko.spark.common.sampler.node.MergeMode; -import me.lucko.spark.common.sampler.source.ClassSourceLookup; import me.lucko.spark.common.sampler.window.ProfilingWindowUtils; import me.lucko.spark.common.tick.TickHook; +import me.lucko.spark.common.ws.ViewerSocket; import me.lucko.spark.proto.SparkSamplerProtos.SamplerData; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.function.IntPredicate; @@ -61,6 +60,9 @@ public class AsyncSampler extends AbstractSampler { /** The executor used for scheduling and management */ private ScheduledExecutorService scheduler; + /** The task to send statistics to the viewer socket */ + private ScheduledFuture<?> socketStatisticsTask; + public AsyncSampler(SparkPlatform platform, SamplerSettings settings, SampleCollector<?> collector) { super(platform, settings); this.sampleCollector = collector; @@ -143,6 +145,8 @@ public class AsyncSampler extends AbstractSampler { IntPredicate predicate = ProfilingWindowUtils.keepHistoryBefore(window); this.dataAggregator.pruneData(predicate); this.windowStatisticsCollector.pruneStatistics(predicate); + + this.scheduler.execute(this::processWindowRotate); } } catch (Throwable e) { e.printStackTrace(); @@ -183,6 +187,10 @@ public class AsyncSampler extends AbstractSampler { this.currentJob = null; } + if (this.socketStatisticsTask != null) { + this.socketStatisticsTask.cancel(false); + } + if (this.scheduler != null) { this.scheduler.shutdown(); this.scheduler = null; @@ -190,15 +198,27 @@ public class AsyncSampler extends AbstractSampler { } @Override + public void attachSocket(ViewerSocket socket) { + super.attachSocket(socket); + + if (this.socketStatisticsTask == null) { + this.socketStatisticsTask = this.scheduler.scheduleAtFixedRate(this::sendStatisticsToSocket, 10, 10, TimeUnit.SECONDS); + } + } + + @Override public SamplerMode getMode() { return this.sampleCollector.getMode(); } @Override - public SamplerData toProto(SparkPlatform platform, CommandSender creator, String comment, MergeMode mergeMode, ClassSourceLookup classSourceLookup) { + public SamplerData toProto(SparkPlatform platform, ExportProps exportProps) { SamplerData.Builder proto = SamplerData.newBuilder(); - writeMetadataToProto(proto, platform, creator, comment, this.dataAggregator); - writeDataToProto(proto, this.dataAggregator, mergeMode, classSourceLookup); + if (exportProps.channelInfo() != null) { + proto.setChannelInfo(exportProps.channelInfo()); + } + writeMetadataToProto(proto, platform, exportProps.creator(), exportProps.comment(), this.dataAggregator); + writeDataToProto(proto, this.dataAggregator, exportProps.mergeMode().get(), exportProps.classSourceLookup().get()); return proto.build(); } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java index d5c965f..e9d7e0d 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java @@ -23,15 +23,13 @@ package me.lucko.spark.common.sampler.java; import com.google.common.util.concurrent.ThreadFactoryBuilder; import me.lucko.spark.common.SparkPlatform; -import me.lucko.spark.common.command.sender.CommandSender; import me.lucko.spark.common.sampler.AbstractSampler; import me.lucko.spark.common.sampler.SamplerMode; import me.lucko.spark.common.sampler.SamplerSettings; -import me.lucko.spark.common.sampler.node.MergeMode; -import me.lucko.spark.common.sampler.source.ClassSourceLookup; import me.lucko.spark.common.sampler.window.ProfilingWindowUtils; import me.lucko.spark.common.sampler.window.WindowStatisticsCollector; import me.lucko.spark.common.tick.TickHook; +import me.lucko.spark.common.ws.ViewerSocket; import me.lucko.spark.proto.SparkSamplerProtos.SamplerData; import java.lang.management.ManagementFactory; @@ -58,6 +56,9 @@ public class JavaSampler extends AbstractSampler implements Runnable { /** The main sampling task */ private ScheduledFuture<?> task; + /** The task to send statistics to the viewer socket */ + private ScheduledFuture<?> socketStatisticsTask; + /** The thread management interface for the current JVM */ private final ThreadMXBean threadBean = ManagementFactory.getThreadMXBean(); @@ -101,6 +102,10 @@ public class JavaSampler extends AbstractSampler implements Runnable { this.task.cancel(false); + if (this.socketStatisticsTask != null) { + this.socketStatisticsTask.cancel(false); + } + if (!cancelled) { // collect statistics for the final window this.windowStatisticsCollector.measureNow(this.lastWindow.get()); @@ -129,6 +134,15 @@ public class JavaSampler extends AbstractSampler implements Runnable { } } + @Override + public void attachSocket(ViewerSocket socket) { + super.attachSocket(socket); + + if (this.socketStatisticsTask == null) { + this.socketStatisticsTask = this.workerPool.scheduleAtFixedRate(this::sendStatisticsToSocket, 10, 10, TimeUnit.SECONDS); + } + } + private final class InsertDataTask implements Runnable { private final ThreadInfo[] threadDumps; private final int window; @@ -161,15 +175,20 @@ public class JavaSampler extends AbstractSampler implements Runnable { IntPredicate predicate = ProfilingWindowUtils.keepHistoryBefore(this.window); JavaSampler.this.dataAggregator.pruneData(predicate); JavaSampler.this.windowStatisticsCollector.pruneStatistics(predicate); + + JavaSampler.this.workerPool.execute(JavaSampler.this::processWindowRotate); } } } @Override - public SamplerData toProto(SparkPlatform platform, CommandSender creator, String comment, MergeMode mergeMode, ClassSourceLookup classSourceLookup) { + public SamplerData toProto(SparkPlatform platform, ExportProps exportProps) { SamplerData.Builder proto = SamplerData.newBuilder(); - writeMetadataToProto(proto, platform, creator, comment, this.dataAggregator); - writeDataToProto(proto, this.dataAggregator, mergeMode, classSourceLookup); + if (exportProps.channelInfo() != null) { + proto.setChannelInfo(exportProps.channelInfo()); + } + writeMetadataToProto(proto, platform, exportProps.creator(), exportProps.comment(), this.dataAggregator); + writeDataToProto(proto, this.dataAggregator, exportProps.mergeMode().get(), exportProps.classSourceLookup().get()); return proto.build(); } diff --git a/spark-common/src/main/java/me/lucko/spark/common/sampler/window/WindowStatisticsCollector.java b/spark-common/src/main/java/me/lucko/spark/common/sampler/window/WindowStatisticsCollector.java index 1c05b00..7acbd6b 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/sampler/window/WindowStatisticsCollector.java +++ b/spark-common/src/main/java/me/lucko/spark/common/sampler/window/WindowStatisticsCollector.java @@ -251,11 +251,13 @@ public class WindowStatisticsCollector { if (this.startTick == -1) { throw new IllegalStateException("start tick not recorded"); } - if (this.stopTick == -1) { - throw new IllegalStateException("stop tick not recorded"); + + int stopTick = this.stopTick; + if (stopTick == -1) { + stopTick = this.tickHook.getCurrentTick(); } - return this.stopTick - this.startTick; + return stopTick - this.startTick; } } diff --git a/spark-common/src/main/java/me/lucko/spark/common/util/BytebinClient.java b/spark-common/src/main/java/me/lucko/spark/common/util/BytebinClient.java index e69b94e..8f11edc 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/util/BytebinClient.java +++ b/spark-common/src/main/java/me/lucko/spark/common/util/BytebinClient.java @@ -32,6 +32,8 @@ import java.util.zip.GZIPOutputStream; /** * Utility for posting content to bytebin. + * + * @see <a href="https://github.com/lucko/bytebin">https://github.com/lucko/bytebin</a> */ public class BytebinClient { @@ -45,7 +47,7 @@ public class BytebinClient { this.userAgent = userAgent; } - private Content postContent(String contentType, Consumer<OutputStream> consumer) throws IOException { + private Content postContent(String contentType, Consumer<OutputStream> consumer, String userAgent) throws IOException { URL url = new URL(this.url + "post"); HttpURLConnection connection = (HttpURLConnection) url.openConnection(); try { @@ -55,7 +57,7 @@ public class BytebinClient { connection.setDoOutput(true); connection.setRequestMethod("POST"); connection.setRequestProperty("Content-Type", contentType); - connection.setRequestProperty("User-Agent", this.userAgent); + connection.setRequestProperty("User-Agent", userAgent); connection.setRequestProperty("Content-Encoding", "gzip"); connection.connect(); @@ -74,14 +76,18 @@ public class BytebinClient { } } - public Content postContent(AbstractMessageLite<?, ?> proto, String contentType) throws IOException { + public Content postContent(AbstractMessageLite<?, ?> proto, String contentType, String userAgentExtra) throws IOException { return postContent(contentType, outputStream -> { try (OutputStream out = new GZIPOutputStream(outputStream)) { proto.writeTo(out); } catch (IOException e) { throw new RuntimeException(e); } - }); + }, this.userAgent + "/" + userAgentExtra); + } + + public Content postContent(AbstractMessageLite<?, ?> proto, String contentType) throws IOException { + return postContent(proto, contentType, this.userAgent); } public static final class Content { diff --git a/spark-common/src/main/java/me/lucko/spark/common/util/Configuration.java b/spark-common/src/main/java/me/lucko/spark/common/util/Configuration.java index 32f3bc6..d19ba64 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/util/Configuration.java +++ b/spark-common/src/main/java/me/lucko/spark/common/util/Configuration.java @@ -22,6 +22,7 @@ package me.lucko.spark.common.util; import com.google.gson.Gson; import com.google.gson.GsonBuilder; +import com.google.gson.JsonArray; import com.google.gson.JsonElement; import com.google.gson.JsonObject; import com.google.gson.JsonPrimitive; @@ -32,6 +33,9 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; public final class Configuration { private static final Gson GSON = new GsonBuilder().setPrettyPrinting().create(); @@ -103,6 +107,21 @@ public final class Configuration { return val.isBoolean() ? val.getAsInt() : def; } + public List<String> getStringList(String path) { + JsonElement el = this.root.get(path); + if (el == null || !el.isJsonArray()) { + return Collections.emptyList(); + } + + List<String> list = new ArrayList<>(); + for (JsonElement child : el.getAsJsonArray()) { + if (child.isJsonPrimitive()) { + list.add(child.getAsJsonPrimitive().getAsString()); + } + } + return list; + } + public void setString(String path, String value) { this.root.add(path, new JsonPrimitive(value)); } @@ -115,6 +134,14 @@ public final class Configuration { this.root.add(path, new JsonPrimitive(value)); } + public void setStringList(String path, List<String> value) { + JsonArray array = new JsonArray(); + for (String str : value) { + array.add(str); + } + this.root.add(path, array); + } + public boolean contains(String path) { return this.root.has(path); } diff --git a/spark-common/src/main/java/me/lucko/spark/common/util/MediaTypes.java b/spark-common/src/main/java/me/lucko/spark/common/util/MediaTypes.java new file mode 100644 index 0000000..2c49540 --- /dev/null +++ b/spark-common/src/main/java/me/lucko/spark/common/util/MediaTypes.java @@ -0,0 +1,29 @@ +/* + * This file is part of spark. + * + * Copyright (c) lucko (Luck) <luck@lucko.me> + * Copyright (c) contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +package me.lucko.spark.common.util; + +public enum MediaTypes { + ; + + public static final String SPARK_SAMPLER_MEDIA_TYPE = "application/x-spark-sampler"; + public static final String SPARK_HEAP_MEDIA_TYPE = "application/x-spark-heap"; + +} diff --git a/spark-common/src/main/java/me/lucko/spark/common/util/ws/BytesocksClient.java b/spark-common/src/main/java/me/lucko/spark/common/util/ws/BytesocksClient.java new file mode 100644 index 0000000..1db7a67 --- /dev/null +++ b/spark-common/src/main/java/me/lucko/spark/common/util/ws/BytesocksClient.java @@ -0,0 +1,118 @@ +/* + * This file is part of spark. + * + * Copyright (c) lucko (Luck) <luck@lucko.me> + * Copyright (c) contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +package me.lucko.spark.common.util.ws; + +import java.util.concurrent.CompletableFuture; + +/** + * A client that can interact with bytesocks. + * + * @see <a href="https://github.com/lucko/bytesocks">https://github.com/lucko/bytesocks</a> + */ +public interface BytesocksClient { + + /** + * Creates a new {@link BytesocksClient}. + * + * <p>Returns {@code null} on Java versions before 11.</p> + * + * @param host the host + * @param userAgent the user agent + * @return the client + */ + static BytesocksClient create(String host, String userAgent) { + try { + return new BytesocksClientImpl(host, userAgent); + } catch (UnsupportedOperationException e) { + return null; + } + } + + /** + * Creates a new bytesocks channel and returns a socket connected to it. + * + * @param listener the listener + * @return the socket + * @throws Exception if something goes wrong + */ + Socket createAndConnect(Listener listener) throws Exception; + + /** + * Connects to an existing bytesocks channel. + * + * @param channelId the channel id + * @param listener the listener + * @return the socket + * @throws Exception if something goes wrong + */ + Socket connect(String channelId, Listener listener) throws Exception; + + /** + * A socket connected to a bytesocks channel. + */ + interface Socket { + + /** + * Gets the id of the connected channel. + * + * @return the id of the channel + */ + String getChannelId(); + + /** + * Gets if the socket is open. + * + * @return true if the socket is open + */ + boolean isOpen(); + + /** + * Sends a message to the channel using the socket. + * + * @param msg the message to send + * @return a future to encapsulate the progress of sending the message + */ + CompletableFuture<?> send(CharSequence msg); + + /** + * Closes the socket. + * + * @param statusCode the status code + * @param reason the reason + */ + void close(int statusCode, String reason); + } + + /** + * Socket listener + */ + interface Listener { + + default void onOpen() {} + + default void onError(Throwable error) {} + + default void onText(CharSequence data) {} + + default void onClose(int statusCode, String reason) {} + } + +} diff --git a/spark-common/src/main/java/me/lucko/spark/common/util/ws/BytesocksClientImpl.java b/spark-common/src/main/java/me/lucko/spark/common/util/ws/BytesocksClientImpl.java new file mode 100644 index 0000000..cf1489c --- /dev/null +++ b/spark-common/src/main/java/me/lucko/spark/common/util/ws/BytesocksClientImpl.java @@ -0,0 +1,40 @@ +/* + * This file is part of spark. + * + * Copyright (c) lucko (Luck) <luck@lucko.me> + * Copyright (c) contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +package me.lucko.spark.common.util.ws; + +// Overridden by java 11 source set + +class BytesocksClientImpl implements BytesocksClient { + + BytesocksClientImpl(String host, String userAgent) { + throw new UnsupportedOperationException(); + } + + @Override + public Socket createAndConnect(Listener listener) throws Exception { + throw new UnsupportedOperationException(); + } + + @Override + public Socket connect(String channelId, Listener listener) throws Exception { + throw new UnsupportedOperationException(); + } +} diff --git a/spark-common/src/main/java/me/lucko/spark/common/ws/CryptoAlgorithm.java b/spark-common/src/main/java/me/lucko/spark/common/ws/CryptoAlgorithm.java new file mode 100644 index 0000000..f6cf1db --- /dev/null +++ b/spark-common/src/main/java/me/lucko/spark/common/ws/CryptoAlgorithm.java @@ -0,0 +1,90 @@ +/* + * This file is part of spark. + * + * Copyright (c) lucko (Luck) <luck@lucko.me> + * Copyright (c) contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +package me.lucko.spark.common.ws; + +import com.google.protobuf.ByteString; + +import java.security.KeyFactory; +import java.security.KeyPair; +import java.security.KeyPairGenerator; +import java.security.NoSuchAlgorithmException; +import java.security.PublicKey; +import java.security.Signature; +import java.security.spec.X509EncodedKeySpec; + +/** + * An algorithm for keypair/signature cryptography. + */ +public enum CryptoAlgorithm { + + Ed25519("Ed25519", 255, "Ed25519"), + RSA2048("RSA", 2048, "SHA256withRSA"); + + private final String keyAlgorithm; + private final int keySize; + private final String signatureAlgorithm; + + CryptoAlgorithm(String keyAlgorithm, int keySize, String signatureAlgorithm) { + this.keyAlgorithm = keyAlgorithm; + this.keySize = keySize; + this.signatureAlgorithm = signatureAlgorithm; + } + + public KeyPairGenerator createKeyPairGenerator() throws NoSuchAlgorithmException { + return KeyPairGenerator.getInstance(this.keyAlgorithm); + } + + public KeyFactory createKeyFactory() throws NoSuchAlgorithmException { + return KeyFactory.getInstance(this.keyAlgorithm); + } + + public Signature createSignature() throws NoSuchAlgorithmException { + return Signature.getInstance(this.signatureAlgorithm); + } + + public KeyPair generateKeyPair() { + try { + KeyPairGenerator generator = createKeyPairGenerator(); + generator.initialize(this.keySize); + return generator.generateKeyPair(); + } catch (Exception e) { + throw new RuntimeException("Exception generating keypair", e); + } + } + + public PublicKey decodePublicKey(byte[] bytes) throws IllegalArgumentException { + try { + X509EncodedKeySpec spec = new X509EncodedKeySpec(bytes); + KeyFactory factory = createKeyFactory(); + return factory.generatePublic(spec); + } catch (Exception e) { + throw new IllegalArgumentException("Exception parsing public key", e); + } + } + + public PublicKey decodePublicKey(ByteString bytes) throws IllegalArgumentException { + if (bytes == null) { + return null; + } + return decodePublicKey(bytes.toByteArray()); + } + +} diff --git a/spark-common/src/main/java/me/lucko/spark/common/ws/TrustedKeyStore.java b/spark-common/src/main/java/me/lucko/spark/common/ws/TrustedKeyStore.java new file mode 100644 index 0000000..1605a38 --- /dev/null +++ b/spark-common/src/main/java/me/lucko/spark/common/ws/TrustedKeyStore.java @@ -0,0 +1,139 @@ +/* + * This file is part of spark. + * + * Copyright (c) lucko (Luck) <luck@lucko.me> + * Copyright (c) contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +package me.lucko.spark.common.ws; + +import me.lucko.spark.common.util.Configuration; + +import java.security.KeyPair; +import java.security.PrivateKey; +import java.security.PublicKey; +import java.util.Base64; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +/** + * A store of trusted public keys. + */ +public class TrustedKeyStore { + private static final String TRUSTED_KEYS_OPTION = "trustedKeys"; + + /** The spark configuration */ + private final Configuration configuration; + /** Gets the local public/private key */ + private final CompletableFuture<KeyPair> localKeyPair; + /** A set of remote public keys to trust */ + private final Set<PublicKey> remoteTrustedKeys; + /** A mpa of pending remote public keys */ + private final Map<String, PublicKey> remotePendingKeys = new HashMap<>(); + + public TrustedKeyStore(Configuration configuration) { + this.configuration = configuration; + this.localKeyPair = CompletableFuture.supplyAsync(ViewerSocketConnection.CRYPTO::generateKeyPair); + this.remoteTrustedKeys = new HashSet<>(); + readTrustedKeys(); + } + + /** + * Gets the local public key. + * + * @return the local public key + */ + public PublicKey getLocalPublicKey() { + return this.localKeyPair.join().getPublic(); + } + + /** + * Gets the local private key. + * + * @return the local private key + */ + public PrivateKey getLocalPrivateKey() { + return this.localKeyPair.join().getPrivate(); + } + + /** + * Checks if a remote public key is trusted + * + * @param publicKey the public key + * @return if the key is trusted + */ + public boolean isKeyTrusted(PublicKey publicKey) { + return publicKey != null && this.remoteTrustedKeys.contains(publicKey); + } + + /** + * Adds a pending public key to be trusted in the future. + * + * @param clientId the client id submitting the key + * @param publicKey the public key + */ + public void addPendingKey(String clientId, PublicKey publicKey) { + this.remotePendingKeys.put(clientId, publicKey); + } + + /** + * Trusts a previously submitted remote public key + * + * @param clientId the id of the client that submitted the key + * @return true if the key was found and trusted + */ + public boolean trustPendingKey(String clientId) { + PublicKey key = this.remotePendingKeys.remove(clientId); + if (key == null) { + return false; + } + + this.remoteTrustedKeys.add(key); + writeTrustedKeys(); + return true; + } + + /** + * Reads trusted keys from the configuration + */ + private void readTrustedKeys() { + for (String encodedKey : this.configuration.getStringList(TRUSTED_KEYS_OPTION)) { + try { + PublicKey publicKey = ViewerSocketConnection.CRYPTO.decodePublicKey(Base64.getDecoder().decode(encodedKey)); + this.remoteTrustedKeys.add(publicKey); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + + /** + * Writes trusted keys to the configuration + */ + private void writeTrustedKeys() { + List<String> encodedKeys = this.remoteTrustedKeys.stream() + .map(key -> Base64.getEncoder().encodeToString(key.getEncoded())) + .collect(Collectors.toList()); + + this.configuration.setStringList(TRUSTED_KEYS_OPTION, encodedKeys); + } + +} diff --git a/spark-common/src/main/java/me/lucko/spark/common/ws/ViewerSocket.java b/spark-common/src/main/java/me/lucko/spark/common/ws/ViewerSocket.java new file mode 100644 index 0000000..5c7e08c --- /dev/null +++ b/spark-common/src/main/java/me/lucko/spark/common/ws/ViewerSocket.java @@ -0,0 +1,255 @@ +/* + * This file is part of spark. + * + * Copyright (c) lucko (Luck) <luck@lucko.me> + * Copyright (c) contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +package me.lucko.spark.common.ws; + +import com.google.protobuf.ByteString; + +import me.lucko.spark.common.SparkPlatform; +import me.lucko.spark.common.sampler.AbstractSampler; +import me.lucko.spark.common.sampler.Sampler; +import me.lucko.spark.common.sampler.window.ProfilingWindowUtils; +import me.lucko.spark.common.util.MediaTypes; +import me.lucko.spark.common.util.ws.BytesocksClient; +import me.lucko.spark.proto.SparkProtos; +import me.lucko.spark.proto.SparkSamplerProtos; +import me.lucko.spark.proto.SparkWebSocketProtos.ClientConnect; +import me.lucko.spark.proto.SparkWebSocketProtos.ClientPing; +import me.lucko.spark.proto.SparkWebSocketProtos.PacketWrapper; +import me.lucko.spark.proto.SparkWebSocketProtos.ServerConnectResponse; +import me.lucko.spark.proto.SparkWebSocketProtos.ServerPong; +import me.lucko.spark.proto.SparkWebSocketProtos.ServerUpdateSamplerData; +import me.lucko.spark.proto.SparkWebSocketProtos.ServerUpdateStatistics; + +import java.security.PublicKey; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; + +/** + * Represents a connection with the spark viewer. + */ +public class ViewerSocket implements ViewerSocketConnection.Listener, AutoCloseable { + + /** Allow 60 seconds for the first client to connect */ + private static final long SOCKET_INITIAL_TIMEOUT = TimeUnit.SECONDS.toMillis(60); + + /** Once established, expect a ping at least once every 30 seconds */ + private static final long SOCKET_ESTABLISHED_TIMEOUT = TimeUnit.SECONDS.toMillis(30); + + /** The spark platform */ + private final SparkPlatform platform; + /** The export props to use when exporting the sampler data */ + private final Sampler.ExportProps exportProps; + /** The underlying connection */ + private final ViewerSocketConnection socket; + + private boolean closed = false; + private final long socketOpenTime = System.currentTimeMillis(); + private long lastPing = 0; + private String lastPayloadId = null; + + public ViewerSocket(SparkPlatform platform, BytesocksClient client, Sampler.ExportProps exportProps) throws Exception { + this.platform = platform; + this.exportProps = exportProps; + this.socket = new ViewerSocketConnection(platform, client, this); + } + + private void log(String message) { + this.platform.getPlugin().log(Level.INFO, "[Viewer - " + this.socket.getChannelId() + "] " + message); + } + + /** + * Gets the initial payload to send to the viewer. + * + * @return the payload + */ + public SparkSamplerProtos.SocketChannelInfo getPayload() { + return SparkSamplerProtos.SocketChannelInfo.newBuilder() + .setChannelId(this.socket.getChannelId()) + .setPublicKey(ByteString.copyFrom(this.platform.getTrustedKeyStore().getLocalPublicKey().getEncoded())) + .build(); + } + + public boolean isOpen() { + return !this.closed && this.socket.isOpen(); + } + + /** + * Called each time the sampler rotates to a new window. + * + * @param sampler the sampler + */ + public void processWindowRotate(AbstractSampler sampler) { + if (this.closed) { + return; + } + + long time = System.currentTimeMillis(); + if ((time - this.socketOpenTime) > SOCKET_INITIAL_TIMEOUT && (time - this.lastPing) > SOCKET_ESTABLISHED_TIMEOUT) { + log("No clients have pinged for 30s, closing socket"); + close(); + return; + } + + // no clients connected yet! + if (this.lastPing == 0) { + return; + } + + try { + SparkSamplerProtos.SamplerData samplerData = sampler.toProto(this.platform, this.exportProps); + String key = this.platform.getBytebinClient().postContent(samplerData, MediaTypes.SPARK_SAMPLER_MEDIA_TYPE, "live").key(); + sendUpdatedSamplerData(key); + } catch (Exception e) { + this.platform.getPlugin().log(Level.WARNING, "Error whilst sending updated sampler data to the socket"); + e.printStackTrace(); + } + } + + /** + * Called when the sampler stops. + * + * @param sampler the sampler + */ + public void processSamplerStopped(AbstractSampler sampler) { + if (this.closed) { + return; + } + + close(); + } + + @Override + public void close() { + this.socket.sendPacket(builder -> builder.setServerPong(ServerPong.newBuilder() + .setOk(false) + .build() + )); + this.socket.close(); + this.closed = true; + } + + @Override + public boolean isKeyTrusted(PublicKey publicKey) { + return this.platform.getTrustedKeyStore().isKeyTrusted(publicKey); + } + + /** + * Sends a message to the socket to say that the given client is now trusted. + * + * @param clientId the client id + */ + public void sendClientTrustedMessage(String clientId) { + this.socket.sendPacket(builder -> builder.setServerConnectResponse(ServerConnectResponse.newBuilder() + .setClientId(clientId) + .setState(ServerConnectResponse.State.ACCEPTED) + .build() + )); + } + + /** + * Sends a message to the socket to indicate that updated sampler data is available + * + * @param payloadId the payload id of the updated data + */ + public void sendUpdatedSamplerData(String payloadId) { + this.socket.sendPacket(builder -> builder.setServerUpdateSampler(ServerUpdateSamplerData.newBuilder() + .setPayloadId(payloadId) + .build() + )); + this.lastPayloadId = payloadId; + } + + /** + * Sends a message to the socket with updated statistics + * + * @param platform the platform statistics + * @param system the system statistics + */ + public void sendUpdatedStatistics(SparkProtos.PlatformStatistics platform, SparkProtos.SystemStatistics system) { + this.socket.sendPacket(builder -> builder.setServerUpdateStatistics(ServerUpdateStatistics.newBuilder() + .setPlatform(platform) + .setSystem(system) + .build() + )); + } + + @Override + public void onPacket(PacketWrapper packet, boolean verified, PublicKey publicKey) throws Exception { + switch (packet.getPacketCase()) { + case CLIENT_PING: + onClientPing(packet.getClientPing(), publicKey); + break; + case CLIENT_CONNECT: + onClientConnect(packet.getClientConnect(), verified, publicKey); + break; + default: + throw new IllegalArgumentException("Unexpected packet: " + packet.getPacketCase()); + } + } + + private void onClientPing(ClientPing packet, PublicKey publicKey) { + this.lastPing = System.currentTimeMillis(); + this.socket.sendPacket(builder -> builder.setServerPong(ServerPong.newBuilder() + .setOk(!this.closed) + .setData(packet.getData()) + .build() + )); + } + + private void onClientConnect(ClientConnect packet, boolean verified, PublicKey publicKey) { + if (publicKey == null) { + throw new IllegalStateException("Missing public key"); + } + + this.lastPing = System.currentTimeMillis(); + + String clientId = packet.getClientId(); + log("Client connected: clientId=" + clientId + ", keyhash=" + hashPublicKey(publicKey) + ", desc=" + packet.getDescription()); + + ServerConnectResponse.Builder resp = ServerConnectResponse.newBuilder() + .setClientId(clientId) + .setSettings(ServerConnectResponse.Settings.newBuilder() + .setSamplerInterval(ProfilingWindowUtils.WINDOW_SIZE_SECONDS) + .setStatisticsInterval(10) + .build() + ); + + if (this.lastPayloadId != null) { + resp.setLastPayloadId(this.lastPayloadId); + } + + if (this.closed) { + resp.setState(ServerConnectResponse.State.REJECTED); + } else if (verified) { + resp.setState(ServerConnectResponse.State.ACCEPTED); + } else { + resp.setState(ServerConnectResponse.State.UNTRUSTED); + this.platform.getTrustedKeyStore().addPendingKey(clientId, publicKey); + } + + this.socket.sendPacket(builder -> builder.setServerConnectResponse(resp.build())); + } + + private static String hashPublicKey(PublicKey publicKey) { + return publicKey == null ? "null" : Integer.toHexString(publicKey.hashCode()); + } + +} diff --git a/spark-common/src/main/java/me/lucko/spark/common/ws/ViewerSocketConnection.java b/spark-common/src/main/java/me/lucko/spark/common/ws/ViewerSocketConnection.java new file mode 100644 index 0000000..f870cb7 --- /dev/null +++ b/spark-common/src/main/java/me/lucko/spark/common/ws/ViewerSocketConnection.java @@ -0,0 +1,218 @@ +/* + * This file is part of spark. + * + * Copyright (c) lucko (Luck) <luck@lucko.me> + * Copyright (c) contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +package me.lucko.spark.common.ws; + +import com.google.protobuf.ByteString; + +import me.lucko.spark.common.SparkPlatform; +import me.lucko.spark.common.util.ws.BytesocksClient; +import me.lucko.spark.proto.SparkWebSocketProtos.PacketWrapper; +import me.lucko.spark.proto.SparkWebSocketProtos.RawPacket; + +import java.io.IOException; +import java.security.PrivateKey; +import java.security.PublicKey; +import java.security.Signature; +import java.util.Base64; +import java.util.function.Consumer; +import java.util.logging.Level; + +/** + * Controls a websocket connection between a spark server (the plugin/mod) and a spark client (the web viewer). + */ +public class ViewerSocketConnection implements BytesocksClient.Listener, AutoCloseable { + + /** The protocol version */ + public static final int VERSION_1 = 1; + /** The crypto algorithm used to sign/verify messages sent between the server and client */ + public static final CryptoAlgorithm CRYPTO = CryptoAlgorithm.RSA2048; + + /** The platform */ + private final SparkPlatform platform; + /** The underlying listener */ + private final Listener listener; + /** The private key used to sign messages sent from this connection */ + private final PrivateKey privateKey; + /** The bytesocks socket */ + private final BytesocksClient.Socket socket; + + public ViewerSocketConnection(SparkPlatform platform, BytesocksClient client, Listener listener) throws Exception { + this.platform = platform; + this.listener = listener; + this.privateKey = platform.getTrustedKeyStore().getLocalPrivateKey(); + this.socket = client.createAndConnect(this); + } + + public interface Listener { + + /** + * Checks if the given public key is trusted + * + * @param publicKey the public key + * @return true if trusted + */ + boolean isKeyTrusted(PublicKey publicKey); + + /** + * Handles a packet sent to the socket + * + * @param packet the packet that was sent + * @param verified if the packet was signed by a trusted key + * @param publicKey the public key the packet was signed with + */ + void onPacket(PacketWrapper packet, boolean verified, PublicKey publicKey) throws Exception; + } + + /** + * Gets the bytesocks channel id + * + * @return the channel id + */ + public String getChannelId() { + return this.socket.getChannelId(); + } + + /** + * Gets if the underlying socket is open + * + * @return true if the socket is open + */ + public boolean isOpen() { + return this.socket.isOpen(); + } + + @Override + public void onText(CharSequence data) { + try { + RawPacket packet = decodeRawPacket(data); + handleRawPacket(packet); + } catch (Exception e) { + this.platform.getPlugin().log(Level.WARNING, "Exception occurred while reading data from the socket"); + e.printStackTrace(); + } + } + + @Override + public void onError(Throwable error) { + this.platform.getPlugin().log(Level.INFO, "Socket error: " + error.getClass().getName() + " " + error.getMessage()); + error.printStackTrace(); + } + + @Override + public void onClose(int statusCode, String reason) { + //this.platform.getPlugin().log(Level.INFO, "Socket closed with status " + statusCode + " and reason " + reason); + } + + /** + * Sends a packet to the socket. + * + * @param packetBuilder the builder to construct the wrapper packet + */ + public void sendPacket(Consumer<PacketWrapper.Builder> packetBuilder) { + PacketWrapper.Builder builder = PacketWrapper.newBuilder(); + packetBuilder.accept(builder); + PacketWrapper wrapper = builder.build(); + + try { + sendPacket(wrapper); + } catch (Exception e) { + this.platform.getPlugin().log(Level.WARNING, "Exception occurred while sending data to the socket"); + e.printStackTrace(); + } + } + + /** + * Sends a packet to the socket. + * + * @param packet the packet to send + */ + private void sendPacket(PacketWrapper packet) throws Exception { + ByteString msg = packet.toByteString(); + + // sign the message using the server private key + Signature sign = CRYPTO.createSignature(); + sign.initSign(this.privateKey); + sign.update(msg.asReadOnlyByteBuffer()); + byte[] signature = sign.sign(); + + sendRawPacket(RawPacket.newBuilder() + .setVersion(VERSION_1) + .setSignature(ByteString.copyFrom(signature)) + .setMessage(msg) + .build() + ); + } + + /** + * Sends a raw packet to the socket. + * + * @param packet the packet to send + */ + private void sendRawPacket(RawPacket packet) throws IOException { + byte[] buf = packet.toByteArray(); + String encoded = Base64.getEncoder().encodeToString(buf); + this.socket.send(encoded); + } + + /** + * Decodes a raw packet sent to the socket. + * + * @param data the encoded data + * @return the decoded packet + */ + private RawPacket decodeRawPacket(CharSequence data) throws IOException { + byte[] buf = Base64.getDecoder().decode(data.toString()); + return RawPacket.parseFrom(buf); + } + + /** + * Handles a raw packet sent to the socket + * + * @param packet the packet + */ + private void handleRawPacket(RawPacket packet) throws Exception { + int version = packet.getVersion(); + if (version != VERSION_1) { + throw new IllegalArgumentException("Unsupported packet version " + version); + } + + ByteString message = packet.getMessage(); + PublicKey publicKey = CRYPTO.decodePublicKey(packet.getPublicKey()); + ByteString signature = packet.getSignature(); + + boolean verified = false; + if (signature != null && publicKey != null && this.listener.isKeyTrusted(publicKey)) { + Signature sign = CRYPTO.createSignature(); + sign.initVerify(publicKey); + sign.update(message.asReadOnlyByteBuffer()); + + verified = sign.verify(signature.toByteArray()); + } + + PacketWrapper wrapper = PacketWrapper.parseFrom(message); + this.listener.onPacket(wrapper, verified, publicKey); + } + + @Override + public void close() { + this.socket.close(1001 /* going away */, "spark plugin disconnected"); + } +} |