aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--spark-common/src/main/java/me/lucko/spark/common/legacy/LegacyBytesocksClient.java50
1 files changed, 45 insertions, 5 deletions
diff --git a/spark-common/src/main/java/me/lucko/spark/common/legacy/LegacyBytesocksClient.java b/spark-common/src/main/java/me/lucko/spark/common/legacy/LegacyBytesocksClient.java
index e552b0e..b3e774e 100644
--- a/spark-common/src/main/java/me/lucko/spark/common/legacy/LegacyBytesocksClient.java
+++ b/spark-common/src/main/java/me/lucko/spark/common/legacy/LegacyBytesocksClient.java
@@ -1,11 +1,16 @@
package me.lucko.spark.common.legacy;
+import com.google.common.collect.ImmutableList;
import com.neovisionaries.ws.client.*;
import me.lucko.bytesocks.client.BytesocksClient;
+import java.lang.invoke.MethodHandle;
+import java.lang.invoke.MethodHandles;
+import java.lang.reflect.Method;
import java.net.HttpURLConnection;
import java.net.URI;
import java.net.URL;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.WeakHashMap;
@@ -15,6 +20,7 @@ import java.util.concurrent.CompletableFuture;
* Implementation of BytesocksClient that works on Java 8.
*/
public class LegacyBytesocksClient implements BytesocksClient {
+
/* The bytesocks urls */
private final String httpUrl;
private final String wsUrl;
@@ -74,6 +80,19 @@ public class LegacyBytesocksClient implements BytesocksClient {
private final WebSocket ws;
private final WeakHashMap<WebSocketFrame, CompletableFuture<?>> frameFutures = new WeakHashMap<>();
+ /* ugly hacks to track sending of websocket */
+ private static final MethodHandle SPLIT_METHOD;
+
+ static {
+ try {
+ Method m = WebSocket.class.getDeclaredMethod("splitIfNecessary", WebSocketFrame.class);
+ m.setAccessible(true);
+ SPLIT_METHOD = MethodHandles.lookup().unreflect(m);
+ } catch(ReflectiveOperationException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
private SocketImpl(String id, WebSocket ws) {
this.id = id;
this.ws = ws;
@@ -84,6 +103,9 @@ public class LegacyBytesocksClient implements BytesocksClient {
CompletableFuture<?> future = frameFutures.remove(frame);
if(future != null)
future.complete(null);
+ else {
+ System.err.println("Sent frame without associated CompletableFuture");
+ }
}
}
@@ -93,6 +115,8 @@ public class LegacyBytesocksClient implements BytesocksClient {
CompletableFuture<?> future = frameFutures.remove(frame);
if(future != null)
future.completeExceptionally(new Exception("Failed to send frame"));
+ else
+ System.err.println("Received error without associated CompletableFuture");
}
}
});
@@ -111,12 +135,28 @@ public class LegacyBytesocksClient implements BytesocksClient {
@Override
public CompletableFuture<?> send(CharSequence msg) {
WebSocketFrame targetFrame = WebSocketFrame.createTextFrame(msg.toString());
- CompletableFuture<?> future = new CompletableFuture<>();
- synchronized (frameFutures) {
- frameFutures.put(targetFrame, future);
+ // split ourselves so we know what the last frame was
+ List<WebSocketFrame> splitFrames;
+ try {
+ splitFrames = (List<WebSocketFrame>)SPLIT_METHOD.invokeExact(this.ws, targetFrame);
+ } catch(Throwable e) {
+ throw new RuntimeException(e);
+ }
+ if(splitFrames == null)
+ splitFrames = ImmutableList.of(targetFrame);
+ // FIXME this code is not really that efficient (allocating a whole new CompletableFuture for every frame), but
+ // it's the simplest solution for now and seems to be good enough. We have to track all frames to correctly
+ // report errors/success
+ List<CompletableFuture<?>> futures = new ArrayList<>();
+ for(WebSocketFrame frame : splitFrames) {
+ CompletableFuture<?> future = new CompletableFuture<>();
+ synchronized (frameFutures) {
+ frameFutures.put(frame, future);
+ }
+ futures.add(future);
+ this.ws.sendFrame(frame);
}
- this.ws.sendText(msg.toString());
- return future;
+ return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
}
@Override