diff options
-rw-r--r-- | spark-common/src/main/java/me/lucko/spark/common/legacy/LegacyBytesocksClient.java | 50 |
1 files changed, 45 insertions, 5 deletions
diff --git a/spark-common/src/main/java/me/lucko/spark/common/legacy/LegacyBytesocksClient.java b/spark-common/src/main/java/me/lucko/spark/common/legacy/LegacyBytesocksClient.java index e552b0e..b3e774e 100644 --- a/spark-common/src/main/java/me/lucko/spark/common/legacy/LegacyBytesocksClient.java +++ b/spark-common/src/main/java/me/lucko/spark/common/legacy/LegacyBytesocksClient.java @@ -1,11 +1,16 @@ package me.lucko.spark.common.legacy; +import com.google.common.collect.ImmutableList; import com.neovisionaries.ws.client.*; import me.lucko.bytesocks.client.BytesocksClient; +import java.lang.invoke.MethodHandle; +import java.lang.invoke.MethodHandles; +import java.lang.reflect.Method; import java.net.HttpURLConnection; import java.net.URI; import java.net.URL; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.WeakHashMap; @@ -15,6 +20,7 @@ import java.util.concurrent.CompletableFuture; * Implementation of BytesocksClient that works on Java 8. */ public class LegacyBytesocksClient implements BytesocksClient { + /* The bytesocks urls */ private final String httpUrl; private final String wsUrl; @@ -74,6 +80,19 @@ public class LegacyBytesocksClient implements BytesocksClient { private final WebSocket ws; private final WeakHashMap<WebSocketFrame, CompletableFuture<?>> frameFutures = new WeakHashMap<>(); + /* ugly hacks to track sending of websocket */ + private static final MethodHandle SPLIT_METHOD; + + static { + try { + Method m = WebSocket.class.getDeclaredMethod("splitIfNecessary", WebSocketFrame.class); + m.setAccessible(true); + SPLIT_METHOD = MethodHandles.lookup().unreflect(m); + } catch(ReflectiveOperationException e) { + throw new RuntimeException(e); + } + } + private SocketImpl(String id, WebSocket ws) { this.id = id; this.ws = ws; @@ -84,6 +103,9 @@ public class LegacyBytesocksClient implements BytesocksClient { CompletableFuture<?> future = frameFutures.remove(frame); if(future != null) future.complete(null); + else { + System.err.println("Sent frame without associated CompletableFuture"); + } } } @@ -93,6 +115,8 @@ public class LegacyBytesocksClient implements BytesocksClient { CompletableFuture<?> future = frameFutures.remove(frame); if(future != null) future.completeExceptionally(new Exception("Failed to send frame")); + else + System.err.println("Received error without associated CompletableFuture"); } } }); @@ -111,12 +135,28 @@ public class LegacyBytesocksClient implements BytesocksClient { @Override public CompletableFuture<?> send(CharSequence msg) { WebSocketFrame targetFrame = WebSocketFrame.createTextFrame(msg.toString()); - CompletableFuture<?> future = new CompletableFuture<>(); - synchronized (frameFutures) { - frameFutures.put(targetFrame, future); + // split ourselves so we know what the last frame was + List<WebSocketFrame> splitFrames; + try { + splitFrames = (List<WebSocketFrame>)SPLIT_METHOD.invokeExact(this.ws, targetFrame); + } catch(Throwable e) { + throw new RuntimeException(e); + } + if(splitFrames == null) + splitFrames = ImmutableList.of(targetFrame); + // FIXME this code is not really that efficient (allocating a whole new CompletableFuture for every frame), but + // it's the simplest solution for now and seems to be good enough. We have to track all frames to correctly + // report errors/success + List<CompletableFuture<?>> futures = new ArrayList<>(); + for(WebSocketFrame frame : splitFrames) { + CompletableFuture<?> future = new CompletableFuture<>(); + synchronized (frameFutures) { + frameFutures.put(frame, future); + } + futures.add(future); + this.ws.sendFrame(frame); } - this.ws.sendText(msg.toString()); - return future; + return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); } @Override |