From a579072c7e20c77a68dada36724d23e929800921 Mon Sep 17 00:00:00 2001 From: Greg Lincoln Date: Sun, 24 Oct 2021 15:03:48 -0400 Subject: [PATCH 01/11] WIP: Initial websocket change watcher implementation. --- .../java/com/arcadedb/server/ChangeEvent.java | 38 ++++++++ .../com/arcadedb/server/http/HttpServer.java | 32 +++---- .../server/http/WebSocketEventBus.java | 96 +++++++++++++++++++ .../server/http/WebSocketEventListener.java | 31 ++++++ .../handler/WebSocketConnectionHandler.java | 29 ++++++ .../handler/WebSocketReceiveListener.java | 60 ++++++++++++ 6 files changed, 265 insertions(+), 21 deletions(-) create mode 100644 server/src/main/java/com/arcadedb/server/ChangeEvent.java create mode 100644 server/src/main/java/com/arcadedb/server/http/WebSocketEventBus.java create mode 100644 server/src/main/java/com/arcadedb/server/http/WebSocketEventListener.java create mode 100644 server/src/main/java/com/arcadedb/server/http/handler/WebSocketConnectionHandler.java create mode 100644 server/src/main/java/com/arcadedb/server/http/handler/WebSocketReceiveListener.java diff --git a/server/src/main/java/com/arcadedb/server/ChangeEvent.java b/server/src/main/java/com/arcadedb/server/ChangeEvent.java new file mode 100644 index 0000000000..188e5aee18 --- /dev/null +++ b/server/src/main/java/com/arcadedb/server/ChangeEvent.java @@ -0,0 +1,38 @@ +package com.arcadedb.server; + +import com.arcadedb.database.Record; +import org.json.JSONObject; + +public class ChangeEvent { + private final TYPE type; + private final Record record; + private final String database; + + public enum TYPE {CREATE, UPDATE, DELETE} + + public ChangeEvent(TYPE type, Record record, String database) { + this.type = type; + this.record = record; + this.database = database; + } + + public String getDatabase() { + return database; + } + + public Record getRecord() { + return record; + } + + public TYPE getType() { + return type; + } + + public String toJSON() { + var jsonObject = new JSONObject(); + jsonObject.put("changeType", this.type.toString().toLowerCase()); + jsonObject.put("record", this.record.toJSON().toString()); + jsonObject.put("database", this.database); + return jsonObject.toString(); + } +} diff --git a/server/src/main/java/com/arcadedb/server/http/HttpServer.java b/server/src/main/java/com/arcadedb/server/http/HttpServer.java index 5d57816d87..11042d27e8 100644 --- a/server/src/main/java/com/arcadedb/server/http/HttpServer.java +++ b/server/src/main/java/com/arcadedb/server/http/HttpServer.java @@ -21,38 +21,25 @@ import com.arcadedb.server.ArcadeDBServer; import com.arcadedb.server.ServerException; import com.arcadedb.server.ServerPlugin; -import com.arcadedb.server.http.handler.GetDatabasesHandler; -import com.arcadedb.server.http.handler.GetDocumentHandler; -import com.arcadedb.server.http.handler.GetDynamicContentHandler; -import com.arcadedb.server.http.handler.GetExistsDatabaseHandler; -import com.arcadedb.server.http.handler.GetQueryHandler; -import com.arcadedb.server.http.handler.PostBeginHandler; -import com.arcadedb.server.http.handler.PostCommandHandler; -import com.arcadedb.server.http.handler.PostCommitHandler; -import com.arcadedb.server.http.handler.PostCreateDatabaseHandler; -import com.arcadedb.server.http.handler.PostCreateDocumentHandler; -import com.arcadedb.server.http.handler.PostDropDatabaseHandler; -import com.arcadedb.server.http.handler.PostQueryHandler; -import com.arcadedb.server.http.handler.PostRollbackHandler; -import com.arcadedb.server.http.handler.PostServersHandler; +import com.arcadedb.server.http.handler.*; import io.undertow.Handlers; import io.undertow.Undertow; import io.undertow.server.RoutingHandler; import io.undertow.server.handlers.PathHandler; -import java.net.*; -import java.util.logging.*; +import java.net.BindException; +import java.util.logging.Level; import static io.undertow.UndertowOptions.SHUTDOWN_TIMEOUT; public class HttpServer implements ServerPlugin { private final ArcadeDBServer server; private final HttpSessionManager transactionManager; - private final JsonSerializer jsonSerializer = new JsonSerializer(); - private Undertow undertow; - private String listeningAddress; - private String host; - private int port; + private final JsonSerializer jsonSerializer = new JsonSerializer(); + private Undertow undertow; + private String listeningAddress; + private String host; + private int port; public HttpServer(final ArcadeDBServer server) { this.server = server; @@ -87,6 +74,9 @@ public void startService() { final PathHandler routes = new PathHandler(); final RoutingHandler basicRoutes = Handlers.routing(); + + routes.addPrefixPath("/ws", new WebSocketConnectionHandler(this)); + routes.addPrefixPath("/api/v1",// basicRoutes// .post("/begin/{database}", new PostBeginHandler(this))// diff --git a/server/src/main/java/com/arcadedb/server/http/WebSocketEventBus.java b/server/src/main/java/com/arcadedb/server/http/WebSocketEventBus.java new file mode 100644 index 0000000000..08a40cdcb0 --- /dev/null +++ b/server/src/main/java/com/arcadedb/server/http/WebSocketEventBus.java @@ -0,0 +1,96 @@ +package com.arcadedb.server.http; + +import com.arcadedb.event.AfterRecordCreateListener; +import com.arcadedb.event.AfterRecordDeleteListener; +import com.arcadedb.event.AfterRecordUpdateListener; +import com.arcadedb.server.ArcadeDBServer; +import com.arcadedb.server.ChangeEvent; +import com.arcadedb.utility.Pair; +import io.undertow.websockets.core.WebSocketChannel; +import io.undertow.websockets.core.WebSockets; + +import java.util.HashSet; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.*; + +public class WebSocketEventBus { + private final ConcurrentLinkedQueue events = new ConcurrentLinkedQueue<>(); + private final ConcurrentHashMap>>> + subscribers = new ConcurrentHashMap<>(); + private final ConcurrentHashMap> + databaseWatchers = new ConcurrentHashMap<>(); + private final ArcadeDBServer arcadeServer; + + public static final String CHANNEL_ID = "ID"; + + public WebSocketEventBus(final ArcadeDBServer server) { + this.arcadeServer = server; + } + + public void push(ChangeEvent event) { + this.events.add(event); + } + + public void subscribe(String database, String type, WebSocketChannel channel) { + this.getSubscriberSet(database, type).add(new Pair<>((UUID) channel.getAttribute(CHANNEL_ID), channel)); + if (!this.databaseWatchers.containsKey(database)) this.startDatabaseWatcher(database); + } + + public void unsubscribe(String database, String type, UUID id) { + this.getSubscriberSet(database, type).removeIf(pair -> pair.getFirst() == id); + } + + private void startDatabaseWatcher(String database) { + WebSocketEventListener listener = new WebSocketEventListener(this, database); + this.arcadeServer.getDatabase(database).getEvents() + .registerListener((AfterRecordCreateListener) listener) + .registerListener((AfterRecordUpdateListener) listener) + .registerListener((AfterRecordDeleteListener) listener); + + ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); + executor.scheduleAtFixedRate(() -> { + ChangeEvent event; + while ((event = events.poll()) != null) { + var databaseWatchers = this.getSubscriberSet(database, "*"); + var typeWatchers = this.getSubscriberSet(database, event.getRecord().asDocument().getTypeName()); + HashSet> matchingSubscribers = new HashSet<>() {{ + addAll(databaseWatchers); + addAll(typeWatchers); + }}; + + var json = event.toJSON(); + if (subscribers.isEmpty()) { + this.stopDatabaseWatcher(database); + } else { + matchingSubscribers.forEach(pair -> WebSockets.sendText(json, pair.getSecond(), null)); + } + } + }, 0, 500, TimeUnit.MILLISECONDS); + + this.databaseWatchers.put(database, new Pair<>(listener, executor)); + } + + private void stopDatabaseWatcher(String database) { + var pair = this.databaseWatchers.get(database); + this.arcadeServer.getDatabase(database).getEvents() + .unregisterListener((AfterRecordCreateListener) pair.getFirst()) + .unregisterListener((AfterRecordUpdateListener) pair.getFirst()) + .unregisterListener((AfterRecordDeleteListener) pair.getFirst()); + + pair.getSecond().shutdown(); + this.databaseWatchers.remove(database); + } + + private Set> getSubscriberSet(String database, String type) { + if (type == null) type = "*"; + if (!this.subscribers.containsKey(database)) this.subscribers.put(database, new ConcurrentHashMap<>()); + if (!this.subscribers.get(database).containsKey(type)) this.subscribers.get(database).put(type, new HashSet<>()); + return this.subscribers.get(database).get(type); + } + + public void unsubscribeAll(UUID id) { + this.subscribers.values().forEach(types -> types.values().forEach(sets -> sets.removeIf(pair -> pair.getFirst() == id))); + } +} + diff --git a/server/src/main/java/com/arcadedb/server/http/WebSocketEventListener.java b/server/src/main/java/com/arcadedb/server/http/WebSocketEventListener.java new file mode 100644 index 0000000000..494d3f940c --- /dev/null +++ b/server/src/main/java/com/arcadedb/server/http/WebSocketEventListener.java @@ -0,0 +1,31 @@ +package com.arcadedb.server.http; + +import com.arcadedb.database.Record; +import com.arcadedb.event.*; +import com.arcadedb.server.ChangeEvent; + +public class WebSocketEventListener implements AfterRecordCreateListener, AfterRecordUpdateListener, AfterRecordDeleteListener { + + private final WebSocketEventBus eventBus; + private final String database; + + public WebSocketEventListener(final WebSocketEventBus eventBus, final String database) { + this.eventBus = eventBus; + this.database = database; + } + + @Override + public void onAfterCreate(Record record) { + this.eventBus.push(new ChangeEvent(ChangeEvent.TYPE.CREATE, record, database)); + } + + @Override + public void onAfterUpdate(Record record) { + this.eventBus.push(new ChangeEvent(ChangeEvent.TYPE.UPDATE, record, database)); + } + + @Override + public void onAfterDelete(Record record) { + this.eventBus.push(new ChangeEvent(ChangeEvent.TYPE.DELETE, record, database)); + } +} diff --git a/server/src/main/java/com/arcadedb/server/http/handler/WebSocketConnectionHandler.java b/server/src/main/java/com/arcadedb/server/http/handler/WebSocketConnectionHandler.java new file mode 100644 index 0000000000..f894c52ba5 --- /dev/null +++ b/server/src/main/java/com/arcadedb/server/http/handler/WebSocketConnectionHandler.java @@ -0,0 +1,29 @@ +package com.arcadedb.server.http.handler; + +import com.arcadedb.server.http.HttpServer; +import com.arcadedb.server.http.WebSocketEventBus; +import com.arcadedb.server.security.ServerSecurityUser; +import io.undertow.server.HttpServerExchange; +import io.undertow.websockets.WebSocketConnectionCallback; +import io.undertow.websockets.WebSocketProtocolHandshakeHandler; + +import java.util.UUID; + +public class WebSocketConnectionHandler extends AbstractHandler { + private final WebSocketEventBus webSocketEventBus; + + public WebSocketConnectionHandler(final HttpServer httpServer) { + super(httpServer); + this.webSocketEventBus = new WebSocketEventBus(httpServer.getServer()); + } + + @Override + protected void execute(HttpServerExchange exchange, ServerSecurityUser user) throws Exception { + var handler = new WebSocketProtocolHandshakeHandler((WebSocketConnectionCallback) (webSocketHttpExchange, channel) -> { + channel.getReceiveSetter().set(new WebSocketReceiveListener(webSocketEventBus)); + channel.setAttribute(WebSocketEventBus.CHANNEL_ID, UUID.randomUUID()); + channel.resumeReceives(); + }); + handler.handleRequest(exchange); + } +} diff --git a/server/src/main/java/com/arcadedb/server/http/handler/WebSocketReceiveListener.java b/server/src/main/java/com/arcadedb/server/http/handler/WebSocketReceiveListener.java new file mode 100644 index 0000000000..8b9bcb81b3 --- /dev/null +++ b/server/src/main/java/com/arcadedb/server/http/handler/WebSocketReceiveListener.java @@ -0,0 +1,60 @@ +package com.arcadedb.server.http.handler; + +import com.arcadedb.server.http.WebSocketEventBus; +import io.undertow.websockets.core.*; +import org.json.JSONObject; + +import java.io.IOException; +import java.util.UUID; + +public class WebSocketReceiveListener extends AbstractReceiveListener { + private final WebSocketEventBus webSocketEventBus; + + public enum ACTION {UNKNOWN, SUBSCRIBE, UNSUBSCRIBE} + + public WebSocketReceiveListener(WebSocketEventBus webSocketEventBus) { + this.webSocketEventBus = webSocketEventBus; + } + + @Override + protected void onFullTextMessage(WebSocketChannel channel, BufferedTextMessage textMessage) throws IOException { + JSONObject message = new JSONObject(textMessage.getData()); + String rawAction = message.getString("action"); + ACTION action = ACTION.UNKNOWN; + try { + action = ACTION.valueOf(rawAction.toUpperCase()); + } catch (IllegalArgumentException ignored) { + } + + switch (action) { + case SUBSCRIBE: + this.webSocketEventBus.subscribe(message.getString("database"), message.optString("type", null), channel); + this.sendAck(action, channel); + break; + case UNSUBSCRIBE: + this.webSocketEventBus.unsubscribe(message.getString("database"), message.optString("type", null), + (UUID) channel.getAttribute(WebSocketEventBus.CHANNEL_ID)); + this.sendAck(action, channel); + break; + default: + sendError(String.format("Unknown action: %s", rawAction), channel); + break; + } + } + + @Override + protected void onClose(WebSocketChannel channel, StreamSourceFrameChannel frameChannel) throws IOException { + // TODO: Make sure this gets called for zombie connections + this.webSocketEventBus.unsubscribeAll((UUID) channel.getAttribute(WebSocketEventBus.CHANNEL_ID)); + } + + private void sendAck(ACTION action, WebSocketChannel channel) { + String message = String.format("{\"action\": \"%s\", \"result\": \"ok\"}", action.toString().toLowerCase()); + WebSockets.sendText(message, channel, null); + } + + private void sendError(String detail, WebSocketChannel channel) { + String message = String.format("{\"result\": \"error\", \"detail\":\"%s\"}", detail); + WebSockets.sendText(message, channel, null); + } +} From 4e1558320edfe25c3529e20ad21ce196d3101c33 Mon Sep 17 00:00:00 2001 From: Greg Lincoln Date: Sun, 24 Oct 2021 15:47:56 -0400 Subject: [PATCH 02/11] Hopefully address Codacy issues. --- server/src/main/java/com/arcadedb/server/ChangeEvent.java | 2 +- .../main/java/com/arcadedb/server/http/WebSocketEventBus.java | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/com/arcadedb/server/ChangeEvent.java b/server/src/main/java/com/arcadedb/server/ChangeEvent.java index 188e5aee18..5e767604d5 100644 --- a/server/src/main/java/com/arcadedb/server/ChangeEvent.java +++ b/server/src/main/java/com/arcadedb/server/ChangeEvent.java @@ -31,7 +31,7 @@ public TYPE getType() { public String toJSON() { var jsonObject = new JSONObject(); jsonObject.put("changeType", this.type.toString().toLowerCase()); - jsonObject.put("record", this.record.toJSON().toString()); + jsonObject.put("record", this.record.toJSON()); jsonObject.put("database", this.database); return jsonObject.toString(); } diff --git a/server/src/main/java/com/arcadedb/server/http/WebSocketEventBus.java b/server/src/main/java/com/arcadedb/server/http/WebSocketEventBus.java index 08a40cdcb0..c77ebc58c9 100644 --- a/server/src/main/java/com/arcadedb/server/http/WebSocketEventBus.java +++ b/server/src/main/java/com/arcadedb/server/http/WebSocketEventBus.java @@ -82,8 +82,8 @@ private void stopDatabaseWatcher(String database) { this.databaseWatchers.remove(database); } - private Set> getSubscriberSet(String database, String type) { - if (type == null) type = "*"; + private Set> getSubscriberSet(String database, String typeFilter) { + var type = typeFilter == null || typeFilter.trim().isEmpty() ? "*" : typeFilter; if (!this.subscribers.containsKey(database)) this.subscribers.put(database, new ConcurrentHashMap<>()); if (!this.subscribers.get(database).containsKey(type)) this.subscribers.get(database).put(type, new HashSet<>()); return this.subscribers.get(database).get(type); From 868043542c84590f41bcbcea70649b6f078894e0 Mon Sep 17 00:00:00 2001 From: Greg Lincoln Date: Mon, 25 Oct 2021 14:44:40 -0400 Subject: [PATCH 03/11] Move classes to ws package. Remove database property. Use matchingSubscribers. Cleanups. --- .../main/java/com/arcadedb/server/ChangeEvent.java | 10 ++-------- .../java/com/arcadedb/server/http/HttpServer.java | 1 + .../handler => ws}/WebSocketConnectionHandler.java | 4 ++-- .../server/{http => ws}/WebSocketEventBus.java | 9 +++++---- .../server/{http => ws}/WebSocketEventListener.java | 12 +++++------- .../handler => ws}/WebSocketReceiveListener.java | 3 +-- 6 files changed, 16 insertions(+), 23 deletions(-) rename server/src/main/java/com/arcadedb/server/{http/handler => ws}/WebSocketConnectionHandler.java (91%) rename server/src/main/java/com/arcadedb/server/{http => ws}/WebSocketEventBus.java (95%) rename server/src/main/java/com/arcadedb/server/{http => ws}/WebSocketEventListener.java (79%) rename server/src/main/java/com/arcadedb/server/{http/handler => ws}/WebSocketReceiveListener.java (95%) diff --git a/server/src/main/java/com/arcadedb/server/ChangeEvent.java b/server/src/main/java/com/arcadedb/server/ChangeEvent.java index 5e767604d5..4a514ba35d 100644 --- a/server/src/main/java/com/arcadedb/server/ChangeEvent.java +++ b/server/src/main/java/com/arcadedb/server/ChangeEvent.java @@ -6,18 +6,12 @@ public class ChangeEvent { private final TYPE type; private final Record record; - private final String database; public enum TYPE {CREATE, UPDATE, DELETE} - public ChangeEvent(TYPE type, Record record, String database) { + public ChangeEvent(TYPE type, Record record) { this.type = type; this.record = record; - this.database = database; - } - - public String getDatabase() { - return database; } public Record getRecord() { @@ -32,7 +26,7 @@ public String toJSON() { var jsonObject = new JSONObject(); jsonObject.put("changeType", this.type.toString().toLowerCase()); jsonObject.put("record", this.record.toJSON()); - jsonObject.put("database", this.database); + jsonObject.put("database", this.record.getDatabase().getName()); return jsonObject.toString(); } } diff --git a/server/src/main/java/com/arcadedb/server/http/HttpServer.java b/server/src/main/java/com/arcadedb/server/http/HttpServer.java index 11042d27e8..652cc7ca51 100644 --- a/server/src/main/java/com/arcadedb/server/http/HttpServer.java +++ b/server/src/main/java/com/arcadedb/server/http/HttpServer.java @@ -22,6 +22,7 @@ import com.arcadedb.server.ServerException; import com.arcadedb.server.ServerPlugin; import com.arcadedb.server.http.handler.*; +import com.arcadedb.server.ws.WebSocketConnectionHandler; import io.undertow.Handlers; import io.undertow.Undertow; import io.undertow.server.RoutingHandler; diff --git a/server/src/main/java/com/arcadedb/server/http/handler/WebSocketConnectionHandler.java b/server/src/main/java/com/arcadedb/server/ws/WebSocketConnectionHandler.java similarity index 91% rename from server/src/main/java/com/arcadedb/server/http/handler/WebSocketConnectionHandler.java rename to server/src/main/java/com/arcadedb/server/ws/WebSocketConnectionHandler.java index f894c52ba5..f43b0aa358 100644 --- a/server/src/main/java/com/arcadedb/server/http/handler/WebSocketConnectionHandler.java +++ b/server/src/main/java/com/arcadedb/server/ws/WebSocketConnectionHandler.java @@ -1,7 +1,7 @@ -package com.arcadedb.server.http.handler; +package com.arcadedb.server.ws; import com.arcadedb.server.http.HttpServer; -import com.arcadedb.server.http.WebSocketEventBus; +import com.arcadedb.server.http.handler.AbstractHandler; import com.arcadedb.server.security.ServerSecurityUser; import io.undertow.server.HttpServerExchange; import io.undertow.websockets.WebSocketConnectionCallback; diff --git a/server/src/main/java/com/arcadedb/server/http/WebSocketEventBus.java b/server/src/main/java/com/arcadedb/server/ws/WebSocketEventBus.java similarity index 95% rename from server/src/main/java/com/arcadedb/server/http/WebSocketEventBus.java rename to server/src/main/java/com/arcadedb/server/ws/WebSocketEventBus.java index c77ebc58c9..343de5b984 100644 --- a/server/src/main/java/com/arcadedb/server/http/WebSocketEventBus.java +++ b/server/src/main/java/com/arcadedb/server/ws/WebSocketEventBus.java @@ -1,4 +1,4 @@ -package com.arcadedb.server.http; +package com.arcadedb.server.ws; import com.arcadedb.event.AfterRecordCreateListener; import com.arcadedb.event.AfterRecordDeleteListener; @@ -9,6 +9,7 @@ import io.undertow.websockets.core.WebSocketChannel; import io.undertow.websockets.core.WebSockets; +import java.util.Collections; import java.util.HashSet; import java.util.Set; import java.util.UUID; @@ -42,7 +43,7 @@ public void unsubscribe(String database, String type, UUID id) { } private void startDatabaseWatcher(String database) { - WebSocketEventListener listener = new WebSocketEventListener(this, database); + WebSocketEventListener listener = new WebSocketEventListener(this); this.arcadeServer.getDatabase(database).getEvents() .registerListener((AfterRecordCreateListener) listener) .registerListener((AfterRecordUpdateListener) listener) @@ -60,7 +61,7 @@ private void startDatabaseWatcher(String database) { }}; var json = event.toJSON(); - if (subscribers.isEmpty()) { + if (matchingSubscribers.isEmpty()) { this.stopDatabaseWatcher(database); } else { matchingSubscribers.forEach(pair -> WebSockets.sendText(json, pair.getSecond(), null)); @@ -85,7 +86,7 @@ private void stopDatabaseWatcher(String database) { private Set> getSubscriberSet(String database, String typeFilter) { var type = typeFilter == null || typeFilter.trim().isEmpty() ? "*" : typeFilter; if (!this.subscribers.containsKey(database)) this.subscribers.put(database, new ConcurrentHashMap<>()); - if (!this.subscribers.get(database).containsKey(type)) this.subscribers.get(database).put(type, new HashSet<>()); + if (!this.subscribers.get(database).containsKey(type)) this.subscribers.get(database).put(type, Collections.emptySet()); return this.subscribers.get(database).get(type); } diff --git a/server/src/main/java/com/arcadedb/server/http/WebSocketEventListener.java b/server/src/main/java/com/arcadedb/server/ws/WebSocketEventListener.java similarity index 79% rename from server/src/main/java/com/arcadedb/server/http/WebSocketEventListener.java rename to server/src/main/java/com/arcadedb/server/ws/WebSocketEventListener.java index 494d3f940c..2b1b9a5021 100644 --- a/server/src/main/java/com/arcadedb/server/http/WebSocketEventListener.java +++ b/server/src/main/java/com/arcadedb/server/ws/WebSocketEventListener.java @@ -1,4 +1,4 @@ -package com.arcadedb.server.http; +package com.arcadedb.server.ws; import com.arcadedb.database.Record; import com.arcadedb.event.*; @@ -7,25 +7,23 @@ public class WebSocketEventListener implements AfterRecordCreateListener, AfterRecordUpdateListener, AfterRecordDeleteListener { private final WebSocketEventBus eventBus; - private final String database; - public WebSocketEventListener(final WebSocketEventBus eventBus, final String database) { + public WebSocketEventListener(final WebSocketEventBus eventBus) { this.eventBus = eventBus; - this.database = database; } @Override public void onAfterCreate(Record record) { - this.eventBus.push(new ChangeEvent(ChangeEvent.TYPE.CREATE, record, database)); + this.eventBus.push(new ChangeEvent(ChangeEvent.TYPE.CREATE, record)); } @Override public void onAfterUpdate(Record record) { - this.eventBus.push(new ChangeEvent(ChangeEvent.TYPE.UPDATE, record, database)); + this.eventBus.push(new ChangeEvent(ChangeEvent.TYPE.UPDATE, record)); } @Override public void onAfterDelete(Record record) { - this.eventBus.push(new ChangeEvent(ChangeEvent.TYPE.DELETE, record, database)); + this.eventBus.push(new ChangeEvent(ChangeEvent.TYPE.DELETE, record)); } } diff --git a/server/src/main/java/com/arcadedb/server/http/handler/WebSocketReceiveListener.java b/server/src/main/java/com/arcadedb/server/ws/WebSocketReceiveListener.java similarity index 95% rename from server/src/main/java/com/arcadedb/server/http/handler/WebSocketReceiveListener.java rename to server/src/main/java/com/arcadedb/server/ws/WebSocketReceiveListener.java index 8b9bcb81b3..f25bae3ab6 100644 --- a/server/src/main/java/com/arcadedb/server/http/handler/WebSocketReceiveListener.java +++ b/server/src/main/java/com/arcadedb/server/ws/WebSocketReceiveListener.java @@ -1,6 +1,5 @@ -package com.arcadedb.server.http.handler; +package com.arcadedb.server.ws; -import com.arcadedb.server.http.WebSocketEventBus; import io.undertow.websockets.core.*; import org.json.JSONObject; From b59623f4ec3cfa380060461db1046b8e38fd9cb3 Mon Sep 17 00:00:00 2001 From: Greg Lincoln Date: Mon, 25 Oct 2021 17:21:44 -0400 Subject: [PATCH 04/11] Switch to thread and configurable ArrayBlockingQueue for eventQueue. Cleanup getSubscriberSet. --- .../com/arcadedb/GlobalConfiguration.java | 4 + .../arcadedb/server/ws/WebSocketEventBus.java | 75 +++++++++++-------- 2 files changed, 48 insertions(+), 31 deletions(-) diff --git a/engine/src/main/java/com/arcadedb/GlobalConfiguration.java b/engine/src/main/java/com/arcadedb/GlobalConfiguration.java index daf63af6a6..1f5292cdb7 100644 --- a/engine/src/main/java/com/arcadedb/GlobalConfiguration.java +++ b/engine/src/main/java/com/arcadedb/GlobalConfiguration.java @@ -223,6 +223,10 @@ public Object call(final Object value) { SERVER_HTTP_TX_EXPIRE_TIMEOUT("arcadedb.server.httpTxExpireTimeout", "Timeout in seconds for a HTTP transaction to expire. This timeout is computed from the latest command against the transaction", Long.class, 30), + // SERVER WS + SERVER_WS_EVENT_BUS_QUEUE_SIZE("arcadedb.server.eventBusQueueSize", + "Size of the queue used as a buffer for unserviced database change events.", Integer.class, 1000), + // SERVER SECURITY SERVER_SECURITY_ALGORITHM("arcadedb.server.securityAlgorithm", "Default encryption algorithm used for passwords hashing", String.class, "PBKDF2WithHmacSHA256"), diff --git a/server/src/main/java/com/arcadedb/server/ws/WebSocketEventBus.java b/server/src/main/java/com/arcadedb/server/ws/WebSocketEventBus.java index 343de5b984..1f44b1e0fd 100644 --- a/server/src/main/java/com/arcadedb/server/ws/WebSocketEventBus.java +++ b/server/src/main/java/com/arcadedb/server/ws/WebSocketEventBus.java @@ -1,5 +1,6 @@ package com.arcadedb.server.ws; +import com.arcadedb.GlobalConfiguration; import com.arcadedb.event.AfterRecordCreateListener; import com.arcadedb.event.AfterRecordDeleteListener; import com.arcadedb.event.AfterRecordUpdateListener; @@ -9,28 +10,34 @@ import io.undertow.websockets.core.WebSocketChannel; import io.undertow.websockets.core.WebSockets; -import java.util.Collections; import java.util.HashSet; import java.util.Set; import java.util.UUID; -import java.util.concurrent.*; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.logging.Level; public class WebSocketEventBus { - private final ConcurrentLinkedQueue events = new ConcurrentLinkedQueue<>(); - private final ConcurrentHashMap>>> - subscribers = new ConcurrentHashMap<>(); - private final ConcurrentHashMap> - databaseWatchers = new ConcurrentHashMap<>(); - private final ArcadeDBServer arcadeServer; + private final ArrayBlockingQueue eventQueue; + private final ConcurrentHashMap>>> + subscribers = new ConcurrentHashMap<>(); + private final ConcurrentHashMap> + databaseWatchers = new ConcurrentHashMap<>(); + private final ArcadeDBServer arcadeServer; public static final String CHANNEL_ID = "ID"; public WebSocketEventBus(final ArcadeDBServer server) { this.arcadeServer = server; + this.eventQueue = new ArrayBlockingQueue<>(server.getConfiguration().getValueAsInteger(GlobalConfiguration.SERVER_WS_EVENT_BUS_QUEUE_SIZE), true); } public void push(ChangeEvent event) { - this.events.add(event); + try { + this.eventQueue.add(event); + } catch (IllegalStateException ex) { + this.arcadeServer.log(this, Level.WARNING, "Skipping event as eventQueue is full. Consider increasing eventBusQueueSize."); + } } public void subscribe(String database, String type, WebSocketChannel channel) { @@ -49,27 +56,33 @@ private void startDatabaseWatcher(String database) { .registerListener((AfterRecordUpdateListener) listener) .registerListener((AfterRecordDeleteListener) listener); - ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); - executor.scheduleAtFixedRate(() -> { - ChangeEvent event; - while ((event = events.poll()) != null) { - var databaseWatchers = this.getSubscriberSet(database, "*"); - var typeWatchers = this.getSubscriberSet(database, event.getRecord().asDocument().getTypeName()); - HashSet> matchingSubscribers = new HashSet<>() {{ - addAll(databaseWatchers); - addAll(typeWatchers); - }}; - - var json = event.toJSON(); - if (matchingSubscribers.isEmpty()) { - this.stopDatabaseWatcher(database); - } else { - matchingSubscribers.forEach(pair -> WebSockets.sendText(json, pair.getSecond(), null)); + var watcherThread = new Thread(() -> { + try { + this.arcadeServer.log(this, Level.INFO, "Starting up watcher thread for %s.", database); + + while (true) { + var event = eventQueue.take(); + var databaseWatchers = this.getSubscriberSet(database, "*"); + var typeWatchers = this.getSubscriberSet(database, event.getRecord().asDocument().getTypeName()); + var matchingSubscribers = new HashSet>() {{ + addAll(databaseWatchers); + addAll(typeWatchers); + }}; + + var json = event.toJSON(); + if (matchingSubscribers.isEmpty()) { + this.stopDatabaseWatcher(database); + } else { + matchingSubscribers.forEach(pair -> WebSockets.sendText(json, pair.getSecond(), null)); + } } + } catch (InterruptedException ignored) { + this.arcadeServer.log(this, Level.INFO, "Shutting down watcher thread for %s.", database); } - }, 0, 500, TimeUnit.MILLISECONDS); + }); + watcherThread.start(); - this.databaseWatchers.put(database, new Pair<>(listener, executor)); + this.databaseWatchers.put(database, new Pair<>(listener, watcherThread)); } private void stopDatabaseWatcher(String database) { @@ -79,15 +92,15 @@ private void stopDatabaseWatcher(String database) { .unregisterListener((AfterRecordUpdateListener) pair.getFirst()) .unregisterListener((AfterRecordDeleteListener) pair.getFirst()); - pair.getSecond().shutdown(); + pair.getSecond().interrupt(); this.databaseWatchers.remove(database); } private Set> getSubscriberSet(String database, String typeFilter) { var type = typeFilter == null || typeFilter.trim().isEmpty() ? "*" : typeFilter; - if (!this.subscribers.containsKey(database)) this.subscribers.put(database, new ConcurrentHashMap<>()); - if (!this.subscribers.get(database).containsKey(type)) this.subscribers.get(database).put(type, Collections.emptySet()); - return this.subscribers.get(database).get(type); + return this.subscribers + .computeIfAbsent(database, key -> new ConcurrentHashMap<>()) + .computeIfAbsent(type, key -> new HashSet<>()); } public void unsubscribeAll(UUID id) { From d18e239ca82daf38022eabbf9e5c7be487a1d6e2 Mon Sep 17 00:00:00 2001 From: Greg Lincoln Date: Mon, 25 Oct 2021 20:18:06 -0400 Subject: [PATCH 05/11] Create only one WebSocketEventBus per server. Add two basic tests. Cleanups. --- .../com/arcadedb/server/http/HttpServer.java | 7 +- .../server/ws/WebSocketConnectionHandler.java | 6 +- .../arcadedb/server/ws/WebSocketEventBus.java | 2 +- .../server/ws/WebSocketReceiveListener.java | 67 ++++++++----- .../server/ws/WebSocketClientHelper.java | 96 +++++++++++++++++++ .../server/ws/WebSocketEventBusIT.java | 63 ++++++++++++ 6 files changed, 213 insertions(+), 28 deletions(-) create mode 100644 server/src/test/java/com/arcadedb/server/ws/WebSocketClientHelper.java create mode 100644 server/src/test/java/com/arcadedb/server/ws/WebSocketEventBusIT.java diff --git a/server/src/main/java/com/arcadedb/server/http/HttpServer.java b/server/src/main/java/com/arcadedb/server/http/HttpServer.java index 652cc7ca51..9b9c053953 100644 --- a/server/src/main/java/com/arcadedb/server/http/HttpServer.java +++ b/server/src/main/java/com/arcadedb/server/http/HttpServer.java @@ -23,6 +23,7 @@ import com.arcadedb.server.ServerPlugin; import com.arcadedb.server.http.handler.*; import com.arcadedb.server.ws.WebSocketConnectionHandler; +import com.arcadedb.server.ws.WebSocketEventBus; import io.undertow.Handlers; import io.undertow.Undertow; import io.undertow.server.RoutingHandler; @@ -36,7 +37,8 @@ public class HttpServer implements ServerPlugin { private final ArcadeDBServer server; private final HttpSessionManager transactionManager; - private final JsonSerializer jsonSerializer = new JsonSerializer(); + private final JsonSerializer jsonSerializer = new JsonSerializer(); + private final WebSocketEventBus webSocketEventBus; private Undertow undertow; private String listeningAddress; private String host; @@ -45,6 +47,7 @@ public class HttpServer implements ServerPlugin { public HttpServer(final ArcadeDBServer server) { this.server = server; this.transactionManager = new HttpSessionManager(server.getConfiguration().getValueAsInteger(GlobalConfiguration.SERVER_HTTP_TX_EXPIRE_TIMEOUT) * 1000); + this.webSocketEventBus = new WebSocketEventBus(this.server); } @Override @@ -76,7 +79,7 @@ public void startService() { final RoutingHandler basicRoutes = Handlers.routing(); - routes.addPrefixPath("/ws", new WebSocketConnectionHandler(this)); + routes.addPrefixPath("/ws", new WebSocketConnectionHandler(this, webSocketEventBus)); routes.addPrefixPath("/api/v1",// basicRoutes// diff --git a/server/src/main/java/com/arcadedb/server/ws/WebSocketConnectionHandler.java b/server/src/main/java/com/arcadedb/server/ws/WebSocketConnectionHandler.java index f43b0aa358..8cf096b256 100644 --- a/server/src/main/java/com/arcadedb/server/ws/WebSocketConnectionHandler.java +++ b/server/src/main/java/com/arcadedb/server/ws/WebSocketConnectionHandler.java @@ -12,15 +12,15 @@ public class WebSocketConnectionHandler extends AbstractHandler { private final WebSocketEventBus webSocketEventBus; - public WebSocketConnectionHandler(final HttpServer httpServer) { + public WebSocketConnectionHandler(final HttpServer httpServer, final WebSocketEventBus webSocketEventBus) { super(httpServer); - this.webSocketEventBus = new WebSocketEventBus(httpServer.getServer()); + this.webSocketEventBus = webSocketEventBus; } @Override protected void execute(HttpServerExchange exchange, ServerSecurityUser user) throws Exception { var handler = new WebSocketProtocolHandshakeHandler((WebSocketConnectionCallback) (webSocketHttpExchange, channel) -> { - channel.getReceiveSetter().set(new WebSocketReceiveListener(webSocketEventBus)); + channel.getReceiveSetter().set(new WebSocketReceiveListener(this.httpServer, webSocketEventBus)); channel.setAttribute(WebSocketEventBus.CHANNEL_ID, UUID.randomUUID()); channel.resumeReceives(); }); diff --git a/server/src/main/java/com/arcadedb/server/ws/WebSocketEventBus.java b/server/src/main/java/com/arcadedb/server/ws/WebSocketEventBus.java index 1f44b1e0fd..25550fca65 100644 --- a/server/src/main/java/com/arcadedb/server/ws/WebSocketEventBus.java +++ b/server/src/main/java/com/arcadedb/server/ws/WebSocketEventBus.java @@ -50,7 +50,7 @@ public void unsubscribe(String database, String type, UUID id) { } private void startDatabaseWatcher(String database) { - WebSocketEventListener listener = new WebSocketEventListener(this); + var listener = new WebSocketEventListener(this); this.arcadeServer.getDatabase(database).getEvents() .registerListener((AfterRecordCreateListener) listener) .registerListener((AfterRecordUpdateListener) listener) diff --git a/server/src/main/java/com/arcadedb/server/ws/WebSocketReceiveListener.java b/server/src/main/java/com/arcadedb/server/ws/WebSocketReceiveListener.java index f25bae3ab6..6c7b00950c 100644 --- a/server/src/main/java/com/arcadedb/server/ws/WebSocketReceiveListener.java +++ b/server/src/main/java/com/arcadedb/server/ws/WebSocketReceiveListener.java @@ -1,17 +1,23 @@ package com.arcadedb.server.ws; +import com.arcadedb.GlobalConfiguration; +import com.arcadedb.log.LogManager; +import com.arcadedb.server.http.HttpServer; import io.undertow.websockets.core.*; import org.json.JSONObject; import java.io.IOException; import java.util.UUID; +import java.util.logging.Level; public class WebSocketReceiveListener extends AbstractReceiveListener { + private final HttpServer httpServer; private final WebSocketEventBus webSocketEventBus; public enum ACTION {UNKNOWN, SUBSCRIBE, UNSUBSCRIBE} - public WebSocketReceiveListener(WebSocketEventBus webSocketEventBus) { + public WebSocketReceiveListener(final HttpServer httpServer, final WebSocketEventBus webSocketEventBus) { + this.httpServer = httpServer; this.webSocketEventBus = webSocketEventBus; } @@ -25,35 +31,52 @@ protected void onFullTextMessage(WebSocketChannel channel, BufferedTextMessage t } catch (IllegalArgumentException ignored) { } - switch (action) { - case SUBSCRIBE: - this.webSocketEventBus.subscribe(message.getString("database"), message.optString("type", null), channel); - this.sendAck(action, channel); - break; - case UNSUBSCRIBE: - this.webSocketEventBus.unsubscribe(message.getString("database"), message.optString("type", null), - (UUID) channel.getAttribute(WebSocketEventBus.CHANNEL_ID)); - this.sendAck(action, channel); - break; - default: - sendError(String.format("Unknown action: %s", rawAction), channel); - break; + try { + switch (action) { + case SUBSCRIBE: + this.webSocketEventBus.subscribe(message.getString("database"), message.optString("type", null), channel); + this.sendAck(channel, action); + break; + case UNSUBSCRIBE: + this.webSocketEventBus.unsubscribe(message.getString("database"), message.optString("type", null), + (UUID) channel.getAttribute(WebSocketEventBus.CHANNEL_ID)); + this.sendAck(channel, action); + break; + default: + sendError(channel, "Unknown action", String.format("%s is not a valid action.", rawAction), null); + break; + } + } catch (Exception e) { + LogManager.instance().log(this, getErrorLogLevel(), "Error on command execution (%s)", e, getClass().getSimpleName()); + sendError(channel, "Internal error", e.getMessage(), e); } } @Override - protected void onClose(WebSocketChannel channel, StreamSourceFrameChannel frameChannel) throws IOException { - // TODO: Make sure this gets called for zombie connections + protected void onClose(final WebSocketChannel channel, final StreamSourceFrameChannel frameChannel) throws IOException { + // TODO: Make sure unsubscribeAll gets called for zombie connections this.webSocketEventBus.unsubscribeAll((UUID) channel.getAttribute(WebSocketEventBus.CHANNEL_ID)); } - private void sendAck(ACTION action, WebSocketChannel channel) { - String message = String.format("{\"action\": \"%s\", \"result\": \"ok\"}", action.toString().toLowerCase()); - WebSockets.sendText(message, channel, null); + private void sendAck(final WebSocketChannel channel, final ACTION action) { + final var json = new JSONObject("{\"result\": \"ok\"}"); + json.put("action", action.toString().toLowerCase()); + WebSockets.sendText(json.toString(), channel, null); + } + + private void sendError(final WebSocketChannel channel, final String error, final String detail, final Throwable exception) { + final var json = new JSONObject("{\"result\": \"error\"}"); + json.put("error", error); + if (detail != null) json.put("detail", encodeError(detail)); + if (exception != null) json.put("exception", exception.getClass().getName()); + WebSockets.sendText(json.toString(), channel, null); + } + + private String encodeError(final String message) { + return message.replaceAll("\\\\", " ").replaceAll("\n", " "); } - private void sendError(String detail, WebSocketChannel channel) { - String message = String.format("{\"result\": \"error\", \"detail\":\"%s\"}", detail); - WebSockets.sendText(message, channel, null); + private Level getErrorLogLevel() { + return "development".equals(httpServer.getServer().getConfiguration().getValueAsString(GlobalConfiguration.SERVER_MODE)) ? Level.INFO : Level.FINE; } } diff --git a/server/src/test/java/com/arcadedb/server/ws/WebSocketClientHelper.java b/server/src/test/java/com/arcadedb/server/ws/WebSocketClientHelper.java new file mode 100644 index 0000000000..32b341a590 --- /dev/null +++ b/server/src/test/java/com/arcadedb/server/ws/WebSocketClientHelper.java @@ -0,0 +1,96 @@ +package com.arcadedb.server.ws; + +import com.arcadedb.server.BaseGraphServerTest; +import io.undertow.connector.ByteBufferPool; +import io.undertow.server.DefaultByteBufferPool; +import io.undertow.util.StringWriteChannelListener; +import io.undertow.websockets.client.WebSocketClient; +import io.undertow.websockets.client.WebSocketClientNegotiation; +import io.undertow.websockets.core.AbstractReceiveListener; +import io.undertow.websockets.core.BufferedTextMessage; +import io.undertow.websockets.core.WebSocketChannel; +import io.undertow.websockets.core.WebSocketFrameType; +import org.xnio.OptionMap; +import org.xnio.Options; +import org.xnio.Xnio; +import org.xnio.XnioWorker; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; + +import static org.apache.lucene.store.BufferedIndexInput.BUFFER_SIZE; + +public class WebSocketClientHelper { + private static XnioWorker worker; + private WebSocketChannel channel; + private static final ByteBufferPool pool = new DefaultByteBufferPool(true, BUFFER_SIZE, 1000, 10, 100); + + static { + Xnio xnio = Xnio.getInstance(BaseGraphServerTest.class.getClassLoader()); + try { + worker = xnio.createWorker(OptionMap.builder() + .set(Options.WORKER_IO_THREADS, 2) + .set(Options.CONNECTION_HIGH_WATER, 1000000) + .set(Options.CONNECTION_LOW_WATER, 1000000) + .set(Options.WORKER_TASK_CORE_THREADS, 30) + .set(Options.WORKER_TASK_MAX_THREADS, 30) + .set(Options.TCP_NODELAY, true) + .set(Options.CORK, true) + .getMap()); + } catch (IOException e) { + } + } + + public WebSocketClientHelper(String uri, String user, String pass) throws URISyntaxException, IOException { + var builder = WebSocketClient.connectionBuilder(worker, pool, new URI(uri)); + if (user != null) { + builder.setClientNegotiation(new WebSocketClientNegotiation(new ArrayList<>(), new ArrayList<>()) { + @Override + public void beforeRequest(Map> headers) { + headers.put("Authorization", + Collections.singletonList("Basic " + Base64.getEncoder().encodeToString((user + ":" + pass).getBytes()))); + } + }); + } + this.channel = builder.connect().get(); + } + + public Future send(String payload) throws URISyntaxException, IOException { + var future = this.get(); + + var sendChannel = channel.send(WebSocketFrameType.TEXT); + new StringWriteChannelListener(payload).setup(sendChannel); + + return future; + } + + public Future get() throws URISyntaxException, IOException { + var future = new CompletableFuture(); + channel.suspendReceives(); + channel.getReceiveSetter().set(new AbstractReceiveListener() { + @Override + protected void onFullTextMessage(WebSocketChannel channel, BufferedTextMessage message) throws IOException { + String data = message.getData(); + future.complete(data); + } + + @Override + protected void onError(WebSocketChannel channel, Throwable error) { + super.onError(channel, error); + error.printStackTrace(); + future.complete(null); + } + }); + channel.resumeReceives(); + + return future; + } + + public void close() throws IOException { + this.channel.sendClose(); + } +} diff --git a/server/src/test/java/com/arcadedb/server/ws/WebSocketEventBusIT.java b/server/src/test/java/com/arcadedb/server/ws/WebSocketEventBusIT.java new file mode 100644 index 0000000000..f702180f16 --- /dev/null +++ b/server/src/test/java/com/arcadedb/server/ws/WebSocketEventBusIT.java @@ -0,0 +1,63 @@ +package com.arcadedb.server.ws; + +import com.arcadedb.server.BaseGraphServerTest; +import org.json.JSONObject; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +public class WebSocketEventBusIT extends BaseGraphServerTest { + + @Test + public void subscribeDatabaseWorks() throws Exception { + var client = new WebSocketClientHelper("ws://localhost:2480/ws", "root", BaseGraphServerTest.DEFAULT_PASSWORD_FOR_TESTS); + + var result = client.send(buildActionMessage("subscribe", "graph", null)); + var json = new JSONObject(result.get(1, TimeUnit.SECONDS)); + Assertions.assertEquals("ok", json.get("result")); + + result = client.get(); + var db = this.getServerDatabase(0, "graph"); + db.execute("sql", "INSERT INTO V1 content {name: 'Test'};"); + json = new JSONObject(result.get(1, TimeUnit.SECONDS)); + Assertions.assertEquals("create", json.get("changeType")); + + client.close(); + } + + @Test + public void unsubscribeDatabaseWorks() throws Exception { + var client = new WebSocketClientHelper("ws://localhost:2480/ws", "root", BaseGraphServerTest.DEFAULT_PASSWORD_FOR_TESTS); + + var result = client.send(buildActionMessage("subscribe", "graph", null)); + var json = new JSONObject(result.get(1, TimeUnit.SECONDS)); + Assertions.assertEquals("ok", json.get("result")); + + result = client.send(buildActionMessage("unsubscribe", "graph", null)); + json = new JSONObject(result.get(1, TimeUnit.SECONDS)); + Assertions.assertEquals("ok", json.get("result")); + + result = client.get(); + var db = this.getServerDatabase(0, "graph"); + db.execute("sql", "INSERT INTO V1 content {name: 'Test'};"); + + try { + result.get(100, TimeUnit.MILLISECONDS); + } catch (TimeoutException ignored) { + } + + Assertions.assertFalse(result.isDone()); + + client.close(); + } + + private static String buildActionMessage(String action, String database, String type) { + var obj = new JSONObject(); + obj.put("action", action); + if (database != null) obj.put("database", database); + if (type != null) obj.put("type", type); + return obj.toString(); + } +} From 6b733a4a0cebd8e14e71ff7b89866db011814a0a Mon Sep 17 00:00:00 2001 From: Greg Lincoln Date: Tue, 26 Oct 2021 18:31:34 -0400 Subject: [PATCH 06/11] Move ws package under http. --- server/src/main/java/com/arcadedb/server/http/HttpServer.java | 4 ++-- .../server/{ => http}/ws/WebSocketConnectionHandler.java | 2 +- .../com/arcadedb/server/{ => http}/ws/WebSocketEventBus.java | 2 +- .../arcadedb/server/{ => http}/ws/WebSocketEventListener.java | 2 +- .../server/{ => http}/ws/WebSocketReceiveListener.java | 2 +- 5 files changed, 6 insertions(+), 6 deletions(-) rename server/src/main/java/com/arcadedb/server/{ => http}/ws/WebSocketConnectionHandler.java (96%) rename server/src/main/java/com/arcadedb/server/{ => http}/ws/WebSocketEventBus.java (99%) rename server/src/main/java/com/arcadedb/server/{ => http}/ws/WebSocketEventListener.java (95%) rename server/src/main/java/com/arcadedb/server/{ => http}/ws/WebSocketReceiveListener.java (98%) diff --git a/server/src/main/java/com/arcadedb/server/http/HttpServer.java b/server/src/main/java/com/arcadedb/server/http/HttpServer.java index 9b9c053953..13b236066c 100644 --- a/server/src/main/java/com/arcadedb/server/http/HttpServer.java +++ b/server/src/main/java/com/arcadedb/server/http/HttpServer.java @@ -22,8 +22,8 @@ import com.arcadedb.server.ServerException; import com.arcadedb.server.ServerPlugin; import com.arcadedb.server.http.handler.*; -import com.arcadedb.server.ws.WebSocketConnectionHandler; -import com.arcadedb.server.ws.WebSocketEventBus; +import com.arcadedb.server.http.ws.WebSocketConnectionHandler; +import com.arcadedb.server.http.ws.WebSocketEventBus; import io.undertow.Handlers; import io.undertow.Undertow; import io.undertow.server.RoutingHandler; diff --git a/server/src/main/java/com/arcadedb/server/ws/WebSocketConnectionHandler.java b/server/src/main/java/com/arcadedb/server/http/ws/WebSocketConnectionHandler.java similarity index 96% rename from server/src/main/java/com/arcadedb/server/ws/WebSocketConnectionHandler.java rename to server/src/main/java/com/arcadedb/server/http/ws/WebSocketConnectionHandler.java index 8cf096b256..e6db074a25 100644 --- a/server/src/main/java/com/arcadedb/server/ws/WebSocketConnectionHandler.java +++ b/server/src/main/java/com/arcadedb/server/http/ws/WebSocketConnectionHandler.java @@ -1,4 +1,4 @@ -package com.arcadedb.server.ws; +package com.arcadedb.server.http.ws; import com.arcadedb.server.http.HttpServer; import com.arcadedb.server.http.handler.AbstractHandler; diff --git a/server/src/main/java/com/arcadedb/server/ws/WebSocketEventBus.java b/server/src/main/java/com/arcadedb/server/http/ws/WebSocketEventBus.java similarity index 99% rename from server/src/main/java/com/arcadedb/server/ws/WebSocketEventBus.java rename to server/src/main/java/com/arcadedb/server/http/ws/WebSocketEventBus.java index 25550fca65..a05f0f2d51 100644 --- a/server/src/main/java/com/arcadedb/server/ws/WebSocketEventBus.java +++ b/server/src/main/java/com/arcadedb/server/http/ws/WebSocketEventBus.java @@ -1,4 +1,4 @@ -package com.arcadedb.server.ws; +package com.arcadedb.server.http.ws; import com.arcadedb.GlobalConfiguration; import com.arcadedb.event.AfterRecordCreateListener; diff --git a/server/src/main/java/com/arcadedb/server/ws/WebSocketEventListener.java b/server/src/main/java/com/arcadedb/server/http/ws/WebSocketEventListener.java similarity index 95% rename from server/src/main/java/com/arcadedb/server/ws/WebSocketEventListener.java rename to server/src/main/java/com/arcadedb/server/http/ws/WebSocketEventListener.java index 2b1b9a5021..36e0442191 100644 --- a/server/src/main/java/com/arcadedb/server/ws/WebSocketEventListener.java +++ b/server/src/main/java/com/arcadedb/server/http/ws/WebSocketEventListener.java @@ -1,4 +1,4 @@ -package com.arcadedb.server.ws; +package com.arcadedb.server.http.ws; import com.arcadedb.database.Record; import com.arcadedb.event.*; diff --git a/server/src/main/java/com/arcadedb/server/ws/WebSocketReceiveListener.java b/server/src/main/java/com/arcadedb/server/http/ws/WebSocketReceiveListener.java similarity index 98% rename from server/src/main/java/com/arcadedb/server/ws/WebSocketReceiveListener.java rename to server/src/main/java/com/arcadedb/server/http/ws/WebSocketReceiveListener.java index 6c7b00950c..a65ed213dc 100644 --- a/server/src/main/java/com/arcadedb/server/ws/WebSocketReceiveListener.java +++ b/server/src/main/java/com/arcadedb/server/http/ws/WebSocketReceiveListener.java @@ -1,4 +1,4 @@ -package com.arcadedb.server.ws; +package com.arcadedb.server.http.ws; import com.arcadedb.GlobalConfiguration; import com.arcadedb.log.LogManager; From 126b088b9aa0a867e29e5094db183095a52755e8 Mon Sep 17 00:00:00 2001 From: Greg Lincoln Date: Tue, 26 Oct 2021 19:49:41 -0400 Subject: [PATCH 07/11] Move the watcher thread logic into a Thread subclass. Properly shutdown the thread. Don't leak listeners on exceptions. --- .../http/ws/DatabaseEventWatcherThread.java | 70 +++++++++++++++ .../server/http/ws/WebSocketEventBus.java | 85 ++++++------------- .../http/ws/WebSocketEventListener.java | 12 +-- 3 files changed, 103 insertions(+), 64 deletions(-) create mode 100644 server/src/main/java/com/arcadedb/server/http/ws/DatabaseEventWatcherThread.java diff --git a/server/src/main/java/com/arcadedb/server/http/ws/DatabaseEventWatcherThread.java b/server/src/main/java/com/arcadedb/server/http/ws/DatabaseEventWatcherThread.java new file mode 100644 index 0000000000..81e61cae34 --- /dev/null +++ b/server/src/main/java/com/arcadedb/server/http/ws/DatabaseEventWatcherThread.java @@ -0,0 +1,70 @@ +package com.arcadedb.server.http.ws; + +import com.arcadedb.database.Database; +import com.arcadedb.event.AfterRecordCreateListener; +import com.arcadedb.event.AfterRecordDeleteListener; +import com.arcadedb.event.AfterRecordUpdateListener; +import com.arcadedb.log.LogManager; +import com.arcadedb.server.ChangeEvent; + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; + +final public class DatabaseEventWatcherThread extends Thread { + private final WebSocketEventBus eventBus; + private final ArrayBlockingQueue eventQueue; + private final Database database; + + volatile boolean running = true; + private WebSocketEventListener listener; + + public boolean isRunning() { + return running; + } + + public void shutdown() { + this.running = false; + } + + public DatabaseEventWatcherThread(final WebSocketEventBus eventBus, final Database database, final int queueSize) { + this.eventBus = eventBus; + this.eventQueue = new ArrayBlockingQueue<>(queueSize, true); + this.database = database; + } + + public void push(ChangeEvent event) { + if (!this.eventQueue.offer(event)) { + LogManager.instance().log(this, Level.WARNING, "Skipping event for database %s as eventQueue is full. Consider increasing eventBusQueueSize.", + null, this.database.getName()); + } + } + + @Override + public void run() { + try { + LogManager.instance().log(this, Level.INFO, "Starting up watcher thread for %s.", null, database); + + this.listener = new WebSocketEventListener(this); + this.database.getEvents() + .registerListener((AfterRecordCreateListener) this.listener) + .registerListener((AfterRecordUpdateListener) this.listener) + .registerListener((AfterRecordDeleteListener) this.listener); + + while (this.running) { + var event = this.eventQueue.poll(500, TimeUnit.MILLISECONDS); + if (event == null) continue; + this.eventBus.publish(event); + } + + } catch (InterruptedException ignored) { + } finally { + this.database.getEvents() + .unregisterListener((AfterRecordCreateListener) this.listener) + .unregisterListener((AfterRecordUpdateListener) this.listener) + .unregisterListener((AfterRecordDeleteListener) this.listener); + + LogManager.instance().log(this, Level.INFO, "Shutting down watcher thread for %s.", null, database); + } + } +} diff --git a/server/src/main/java/com/arcadedb/server/http/ws/WebSocketEventBus.java b/server/src/main/java/com/arcadedb/server/http/ws/WebSocketEventBus.java index a05f0f2d51..029f8d97c0 100644 --- a/server/src/main/java/com/arcadedb/server/http/ws/WebSocketEventBus.java +++ b/server/src/main/java/com/arcadedb/server/http/ws/WebSocketEventBus.java @@ -1,9 +1,6 @@ package com.arcadedb.server.http.ws; import com.arcadedb.GlobalConfiguration; -import com.arcadedb.event.AfterRecordCreateListener; -import com.arcadedb.event.AfterRecordDeleteListener; -import com.arcadedb.event.AfterRecordUpdateListener; import com.arcadedb.server.ArcadeDBServer; import com.arcadedb.server.ChangeEvent; import com.arcadedb.utility.Pair; @@ -13,31 +10,19 @@ import java.util.HashSet; import java.util.Set; import java.util.UUID; -import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ConcurrentHashMap; -import java.util.logging.Level; public class WebSocketEventBus { - private final ArrayBlockingQueue eventQueue; private final ConcurrentHashMap>>> - subscribers = new ConcurrentHashMap<>(); - private final ConcurrentHashMap> - databaseWatchers = new ConcurrentHashMap<>(); - private final ArcadeDBServer arcadeServer; + subscribers = new ConcurrentHashMap<>(); + private final ConcurrentHashMap + databaseWatchers = new ConcurrentHashMap<>(); + private final ArcadeDBServer arcadeServer; public static final String CHANNEL_ID = "ID"; public WebSocketEventBus(final ArcadeDBServer server) { this.arcadeServer = server; - this.eventQueue = new ArrayBlockingQueue<>(server.getConfiguration().getValueAsInteger(GlobalConfiguration.SERVER_WS_EVENT_BUS_QUEUE_SIZE), true); - } - - public void push(ChangeEvent event) { - try { - this.eventQueue.add(event); - } catch (IllegalStateException ex) { - this.arcadeServer.log(this, Level.WARNING, "Skipping event as eventQueue is full. Consider increasing eventBusQueueSize."); - } } public void subscribe(String database, String type, WebSocketChannel channel) { @@ -49,50 +34,34 @@ public void unsubscribe(String database, String type, UUID id) { this.getSubscriberSet(database, type).removeIf(pair -> pair.getFirst() == id); } - private void startDatabaseWatcher(String database) { - var listener = new WebSocketEventListener(this); - this.arcadeServer.getDatabase(database).getEvents() - .registerListener((AfterRecordCreateListener) listener) - .registerListener((AfterRecordUpdateListener) listener) - .registerListener((AfterRecordDeleteListener) listener); - - var watcherThread = new Thread(() -> { - try { - this.arcadeServer.log(this, Level.INFO, "Starting up watcher thread for %s.", database); - - while (true) { - var event = eventQueue.take(); - var databaseWatchers = this.getSubscriberSet(database, "*"); - var typeWatchers = this.getSubscriberSet(database, event.getRecord().asDocument().getTypeName()); - var matchingSubscribers = new HashSet>() {{ - addAll(databaseWatchers); - addAll(typeWatchers); - }}; + public void publish(ChangeEvent event) { + var databaseName = event.getRecord().getDatabase().getName(); + var typeName = event.getRecord().asDocument().getTypeName(); + + var databaseSubscribers = this.getSubscriberSet(databaseName, "*"); + var typeSubscribers = this.getSubscriberSet(databaseName, typeName); + var matchingSubscribers = new HashSet>() {{ + addAll(databaseSubscribers); + addAll(typeSubscribers); + }}; + + if (matchingSubscribers.isEmpty()) { + // If we no longer have any subscribers for this database, stop the watcher. + this.stopDatabaseWatcher(databaseName); + } else { + matchingSubscribers.forEach(pair -> WebSockets.sendText(event.toJSON(), pair.getSecond(), null)); + } + } - var json = event.toJSON(); - if (matchingSubscribers.isEmpty()) { - this.stopDatabaseWatcher(database); - } else { - matchingSubscribers.forEach(pair -> WebSockets.sendText(json, pair.getSecond(), null)); - } - } - } catch (InterruptedException ignored) { - this.arcadeServer.log(this, Level.INFO, "Shutting down watcher thread for %s.", database); - } - }); + private void startDatabaseWatcher(String database) { + var queueSize = this.arcadeServer.getConfiguration().getValueAsInteger(GlobalConfiguration.SERVER_WS_EVENT_BUS_QUEUE_SIZE); + var watcherThread = new DatabaseEventWatcherThread(this, this.arcadeServer.getDatabase(database), queueSize); watcherThread.start(); - - this.databaseWatchers.put(database, new Pair<>(listener, watcherThread)); + this.databaseWatchers.put(database, watcherThread); } private void stopDatabaseWatcher(String database) { - var pair = this.databaseWatchers.get(database); - this.arcadeServer.getDatabase(database).getEvents() - .unregisterListener((AfterRecordCreateListener) pair.getFirst()) - .unregisterListener((AfterRecordUpdateListener) pair.getFirst()) - .unregisterListener((AfterRecordDeleteListener) pair.getFirst()); - - pair.getSecond().interrupt(); + this.databaseWatchers.get(database).shutdown(); this.databaseWatchers.remove(database); } diff --git a/server/src/main/java/com/arcadedb/server/http/ws/WebSocketEventListener.java b/server/src/main/java/com/arcadedb/server/http/ws/WebSocketEventListener.java index 36e0442191..0529e59fb9 100644 --- a/server/src/main/java/com/arcadedb/server/http/ws/WebSocketEventListener.java +++ b/server/src/main/java/com/arcadedb/server/http/ws/WebSocketEventListener.java @@ -6,24 +6,24 @@ public class WebSocketEventListener implements AfterRecordCreateListener, AfterRecordUpdateListener, AfterRecordDeleteListener { - private final WebSocketEventBus eventBus; + private final DatabaseEventWatcherThread watcherThread; - public WebSocketEventListener(final WebSocketEventBus eventBus) { - this.eventBus = eventBus; + public WebSocketEventListener(final DatabaseEventWatcherThread watcherThread) { + this.watcherThread = watcherThread; } @Override public void onAfterCreate(Record record) { - this.eventBus.push(new ChangeEvent(ChangeEvent.TYPE.CREATE, record)); + this.watcherThread.push(new ChangeEvent(ChangeEvent.TYPE.CREATE, record)); } @Override public void onAfterUpdate(Record record) { - this.eventBus.push(new ChangeEvent(ChangeEvent.TYPE.UPDATE, record)); + this.watcherThread.push(new ChangeEvent(ChangeEvent.TYPE.UPDATE, record)); } @Override public void onAfterDelete(Record record) { - this.eventBus.push(new ChangeEvent(ChangeEvent.TYPE.DELETE, record)); + this.watcherThread.push(new ChangeEvent(ChangeEvent.TYPE.DELETE, record)); } } From b75c769dd9610ac485a9ff084eb63dbb8a23c109 Mon Sep 17 00:00:00 2001 From: Greg Lincoln Date: Tue, 26 Oct 2021 20:13:52 -0400 Subject: [PATCH 08/11] Switch field for local. (Codacy) --- .../http/ws/DatabaseEventWatcherThread.java | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/server/src/main/java/com/arcadedb/server/http/ws/DatabaseEventWatcherThread.java b/server/src/main/java/com/arcadedb/server/http/ws/DatabaseEventWatcherThread.java index 81e61cae34..76edf1d8e1 100644 --- a/server/src/main/java/com/arcadedb/server/http/ws/DatabaseEventWatcherThread.java +++ b/server/src/main/java/com/arcadedb/server/http/ws/DatabaseEventWatcherThread.java @@ -16,8 +16,7 @@ final public class DatabaseEventWatcherThread extends Thread { private final ArrayBlockingQueue eventQueue; private final Database database; - volatile boolean running = true; - private WebSocketEventListener listener; + volatile boolean running = true; public boolean isRunning() { return running; @@ -42,14 +41,15 @@ public void push(ChangeEvent event) { @Override public void run() { + var listener = new WebSocketEventListener(this); + try { LogManager.instance().log(this, Level.INFO, "Starting up watcher thread for %s.", null, database); - this.listener = new WebSocketEventListener(this); this.database.getEvents() - .registerListener((AfterRecordCreateListener) this.listener) - .registerListener((AfterRecordUpdateListener) this.listener) - .registerListener((AfterRecordDeleteListener) this.listener); + .registerListener((AfterRecordCreateListener) listener) + .registerListener((AfterRecordUpdateListener) listener) + .registerListener((AfterRecordDeleteListener) listener); while (this.running) { var event = this.eventQueue.poll(500, TimeUnit.MILLISECONDS); @@ -60,9 +60,9 @@ public void run() { } catch (InterruptedException ignored) { } finally { this.database.getEvents() - .unregisterListener((AfterRecordCreateListener) this.listener) - .unregisterListener((AfterRecordUpdateListener) this.listener) - .unregisterListener((AfterRecordDeleteListener) this.listener); + .unregisterListener((AfterRecordCreateListener) listener) + .unregisterListener((AfterRecordUpdateListener) listener) + .unregisterListener((AfterRecordDeleteListener) listener); LogManager.instance().log(this, Level.INFO, "Shutting down watcher thread for %s.", null, database); } From c59e4eed8bb5fca380a8c7d77b2d79bda0b31a5d Mon Sep 17 00:00:00 2001 From: Greg Lincoln Date: Tue, 26 Oct 2021 21:22:18 -0400 Subject: [PATCH 09/11] Implement changeType filter. More cleanups. More tests. --- .../server/{ => http/ws}/ChangeEvent.java | 2 +- .../http/ws/DatabaseEventWatcherThread.java | 1 - .../http/ws/EventWatcherSubscription.java | 47 +++++++ .../server/http/ws/WebSocketEventBus.java | 54 ++++---- .../http/ws/WebSocketEventListener.java | 1 - .../http/ws/WebSocketReceiveListener.java | 7 +- .../server/ws/WebSocketEventBusIT.java | 119 ++++++++++++++++-- 7 files changed, 189 insertions(+), 42 deletions(-) rename server/src/main/java/com/arcadedb/server/{ => http/ws}/ChangeEvent.java (94%) create mode 100644 server/src/main/java/com/arcadedb/server/http/ws/EventWatcherSubscription.java diff --git a/server/src/main/java/com/arcadedb/server/ChangeEvent.java b/server/src/main/java/com/arcadedb/server/http/ws/ChangeEvent.java similarity index 94% rename from server/src/main/java/com/arcadedb/server/ChangeEvent.java rename to server/src/main/java/com/arcadedb/server/http/ws/ChangeEvent.java index 4a514ba35d..a63487cb5c 100644 --- a/server/src/main/java/com/arcadedb/server/ChangeEvent.java +++ b/server/src/main/java/com/arcadedb/server/http/ws/ChangeEvent.java @@ -1,4 +1,4 @@ -package com.arcadedb.server; +package com.arcadedb.server.http.ws; import com.arcadedb.database.Record; import org.json.JSONObject; diff --git a/server/src/main/java/com/arcadedb/server/http/ws/DatabaseEventWatcherThread.java b/server/src/main/java/com/arcadedb/server/http/ws/DatabaseEventWatcherThread.java index 76edf1d8e1..e7b8c26025 100644 --- a/server/src/main/java/com/arcadedb/server/http/ws/DatabaseEventWatcherThread.java +++ b/server/src/main/java/com/arcadedb/server/http/ws/DatabaseEventWatcherThread.java @@ -5,7 +5,6 @@ import com.arcadedb.event.AfterRecordDeleteListener; import com.arcadedb.event.AfterRecordUpdateListener; import com.arcadedb.log.LogManager; -import com.arcadedb.server.ChangeEvent; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.TimeUnit; diff --git a/server/src/main/java/com/arcadedb/server/http/ws/EventWatcherSubscription.java b/server/src/main/java/com/arcadedb/server/http/ws/EventWatcherSubscription.java new file mode 100644 index 0000000000..505e55c6bb --- /dev/null +++ b/server/src/main/java/com/arcadedb/server/http/ws/EventWatcherSubscription.java @@ -0,0 +1,47 @@ +package com.arcadedb.server.http.ws; + +import io.undertow.websockets.core.WebSocketChannel; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +public class EventWatcherSubscription { + private final String database; + private final String type; + private final Set changeTypes; + private final WebSocketChannel channel; + + public EventWatcherSubscription(String database, String type, List changeTypes, WebSocketChannel channel) { + this.database = database; + this.type = type; + if (changeTypes != null) { + this.changeTypes = new HashSet<>(); + changeTypes.forEach(changeType -> this.changeTypes.add(ChangeEvent.TYPE.valueOf(changeType.toString().toUpperCase()))); + } else { + this.changeTypes = null; + } + this.channel = channel; + } + + public String getDatabase() { + return database; + } + + public String getType() { + return type; + } + + public Set getChangeTypes() { + return changeTypes; + } + + public WebSocketChannel getChannel() { + return channel; + } + + public boolean isMatch(ChangeEvent event) { + return (this.changeTypes == null || this.changeTypes.contains(event.getType())) && + (this.type == null || this.type.equals(event.getRecord().asDocument().getTypeName())); + } +} diff --git a/server/src/main/java/com/arcadedb/server/http/ws/WebSocketEventBus.java b/server/src/main/java/com/arcadedb/server/http/ws/WebSocketEventBus.java index 029f8d97c0..077bdf6f06 100644 --- a/server/src/main/java/com/arcadedb/server/http/ws/WebSocketEventBus.java +++ b/server/src/main/java/com/arcadedb/server/http/ws/WebSocketEventBus.java @@ -2,22 +2,18 @@ import com.arcadedb.GlobalConfiguration; import com.arcadedb.server.ArcadeDBServer; -import com.arcadedb.server.ChangeEvent; import com.arcadedb.utility.Pair; -import io.undertow.websockets.core.WebSocketChannel; import io.undertow.websockets.core.WebSockets; -import java.util.HashSet; -import java.util.Set; +import java.util.LinkedList; +import java.util.List; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; public class WebSocketEventBus { - private final ConcurrentHashMap>>> - subscribers = new ConcurrentHashMap<>(); - private final ConcurrentHashMap - databaseWatchers = new ConcurrentHashMap<>(); - private final ArcadeDBServer arcadeServer; + private final ConcurrentHashMap>> subscribers = new ConcurrentHashMap<>(); + private final ConcurrentHashMap databaseWatchers = new ConcurrentHashMap<>(); + private final ArcadeDBServer arcadeServer; public static final String CHANNEL_ID = "ID"; @@ -25,31 +21,34 @@ public WebSocketEventBus(final ArcadeDBServer server) { this.arcadeServer = server; } - public void subscribe(String database, String type, WebSocketChannel channel) { - this.getSubscriberSet(database, type).add(new Pair<>((UUID) channel.getAttribute(CHANNEL_ID), channel)); - if (!this.databaseWatchers.containsKey(database)) this.startDatabaseWatcher(database); + public void subscribe(EventWatcherSubscription subscription) { + var channelId = (UUID) subscription.getChannel().getAttribute(CHANNEL_ID); + this.getSubscriberSet(subscription.getDatabase()).add(new Pair<>(channelId, subscription)); + if (!this.databaseWatchers.containsKey(subscription.getDatabase())) { + this.startDatabaseWatcher(subscription.getDatabase()); + } } - public void unsubscribe(String database, String type, UUID id) { - this.getSubscriberSet(database, type).removeIf(pair -> pair.getFirst() == id); + public void unsubscribe(String database, UUID id) { + this.getSubscriberSet(database).removeIf(pair -> pair.getFirst() == id); } public void publish(ChangeEvent event) { var databaseName = event.getRecord().getDatabase().getName(); - var typeName = event.getRecord().asDocument().getTypeName(); - var databaseSubscribers = this.getSubscriberSet(databaseName, "*"); - var typeSubscribers = this.getSubscriberSet(databaseName, typeName); - var matchingSubscribers = new HashSet>() {{ - addAll(databaseSubscribers); - addAll(typeSubscribers); - }}; + var subscribers = this.subscribers.get(databaseName); + if (subscribers == null) return; - if (matchingSubscribers.isEmpty()) { + if (subscribers.isEmpty()) { // If we no longer have any subscribers for this database, stop the watcher. this.stopDatabaseWatcher(databaseName); } else { - matchingSubscribers.forEach(pair -> WebSockets.sendText(event.toJSON(), pair.getSecond(), null)); + subscribers.forEach(pair -> { + var subscription = pair.getSecond(); + if (subscription.isMatch(event)) { + WebSockets.sendText(event.toJSON(), subscription.getChannel(), null); + } + }); } } @@ -65,15 +64,12 @@ private void stopDatabaseWatcher(String database) { this.databaseWatchers.remove(database); } - private Set> getSubscriberSet(String database, String typeFilter) { - var type = typeFilter == null || typeFilter.trim().isEmpty() ? "*" : typeFilter; - return this.subscribers - .computeIfAbsent(database, key -> new ConcurrentHashMap<>()) - .computeIfAbsent(type, key -> new HashSet<>()); + private List> getSubscriberSet(String database) { + return this.subscribers.computeIfAbsent(database, key -> new LinkedList<>()); } public void unsubscribeAll(UUID id) { - this.subscribers.values().forEach(types -> types.values().forEach(sets -> sets.removeIf(pair -> pair.getFirst() == id))); + this.subscribers.values().forEach(sets -> sets.removeIf(pair -> pair.getFirst() == id)); } } diff --git a/server/src/main/java/com/arcadedb/server/http/ws/WebSocketEventListener.java b/server/src/main/java/com/arcadedb/server/http/ws/WebSocketEventListener.java index 0529e59fb9..17b813a45c 100644 --- a/server/src/main/java/com/arcadedb/server/http/ws/WebSocketEventListener.java +++ b/server/src/main/java/com/arcadedb/server/http/ws/WebSocketEventListener.java @@ -2,7 +2,6 @@ import com.arcadedb.database.Record; import com.arcadedb.event.*; -import com.arcadedb.server.ChangeEvent; public class WebSocketEventListener implements AfterRecordCreateListener, AfterRecordUpdateListener, AfterRecordDeleteListener { diff --git a/server/src/main/java/com/arcadedb/server/http/ws/WebSocketReceiveListener.java b/server/src/main/java/com/arcadedb/server/http/ws/WebSocketReceiveListener.java index a65ed213dc..71d2902411 100644 --- a/server/src/main/java/com/arcadedb/server/http/ws/WebSocketReceiveListener.java +++ b/server/src/main/java/com/arcadedb/server/http/ws/WebSocketReceiveListener.java @@ -34,12 +34,13 @@ protected void onFullTextMessage(WebSocketChannel channel, BufferedTextMessage t try { switch (action) { case SUBSCRIBE: - this.webSocketEventBus.subscribe(message.getString("database"), message.optString("type", null), channel); + var changeTypes = message.optJSONArray("changeTypes"); + this.webSocketEventBus.subscribe(new EventWatcherSubscription(message.getString("database"), + message.optString("type", null), changeTypes == null ? null : changeTypes.toList(), channel)); this.sendAck(channel, action); break; case UNSUBSCRIBE: - this.webSocketEventBus.unsubscribe(message.getString("database"), message.optString("type", null), - (UUID) channel.getAttribute(WebSocketEventBus.CHANNEL_ID)); + this.webSocketEventBus.unsubscribe(message.getString("database"), (UUID) channel.getAttribute(WebSocketEventBus.CHANNEL_ID)); this.sendAck(channel, action); break; default: diff --git a/server/src/test/java/com/arcadedb/server/ws/WebSocketEventBusIT.java b/server/src/test/java/com/arcadedb/server/ws/WebSocketEventBusIT.java index f702180f16..8ed800cccd 100644 --- a/server/src/test/java/com/arcadedb/server/ws/WebSocketEventBusIT.java +++ b/server/src/test/java/com/arcadedb/server/ws/WebSocketEventBusIT.java @@ -14,15 +14,110 @@ public class WebSocketEventBusIT extends BaseGraphServerTest { public void subscribeDatabaseWorks() throws Exception { var client = new WebSocketClientHelper("ws://localhost:2480/ws", "root", BaseGraphServerTest.DEFAULT_PASSWORD_FOR_TESTS); - var result = client.send(buildActionMessage("subscribe", "graph", null)); + var result = client.send(buildActionMessage("subscribe", "graph")); var json = new JSONObject(result.get(1, TimeUnit.SECONDS)); Assertions.assertEquals("ok", json.get("result")); result = client.get(); var db = this.getServerDatabase(0, "graph"); - db.execute("sql", "INSERT INTO V1 content {name: 'Test'};"); + var v1 = db.newVertex("V1").set("name", "test"); + v1.save(); + json = new JSONObject(result.get(1, TimeUnit.SECONDS)); Assertions.assertEquals("create", json.get("changeType")); + var record = json.getJSONObject("record"); + Assertions.assertEquals("test", record.get("name")); + Assertions.assertEquals("V1", record.get("@type")); + + client.close(); + } + + @Test + public void subscribeTypeWorks() throws Exception { + var client = new WebSocketClientHelper("ws://localhost:2480/ws", "root", BaseGraphServerTest.DEFAULT_PASSWORD_FOR_TESTS); + + var result = client.send(buildActionMessage("subscribe", "graph", "V1")); + var json = new JSONObject(result.get(1, TimeUnit.SECONDS)); + Assertions.assertEquals("ok", json.get("result")); + + result = client.get(); + var db = this.getServerDatabase(0, "graph"); + var v1 = db.newVertex("V1").set("name", "test"); + v1.save(); + + json = new JSONObject(result.get(1, TimeUnit.SECONDS)); + Assertions.assertEquals("create", json.get("changeType")); + var record = json.getJSONObject("record"); + Assertions.assertEquals("test", record.get("name")); + Assertions.assertEquals("V1", record.get("@type")); + + client.close(); + } + + @Test + public void subscribeChangeTypeWorks() throws Exception { + var client = new WebSocketClientHelper("ws://localhost:2480/ws", "root", BaseGraphServerTest.DEFAULT_PASSWORD_FOR_TESTS); + + var result = client.send(buildActionMessage("subscribe", "graph", null, new String[]{"create"})); + var json = new JSONObject(result.get(1, TimeUnit.SECONDS)); + Assertions.assertEquals("ok", json.get("result")); + + result = client.get(); + var db = this.getServerDatabase(0, "graph"); + var v1 = db.newVertex("V1").set("name", "test"); + v1.save(); + + json = new JSONObject(result.get(1, TimeUnit.SECONDS)); + Assertions.assertEquals("create", json.get("changeType")); + var record = json.getJSONObject("record"); + Assertions.assertEquals("test", record.get("name")); + Assertions.assertEquals("V1", record.get("@type")); + + client.close(); + } + + @Test + public void subscribeChangeTypeDoesNotPushOtherChangeTypes() throws Exception { + var client = new WebSocketClientHelper("ws://localhost:2480/ws", "root", BaseGraphServerTest.DEFAULT_PASSWORD_FOR_TESTS); + + var result = client.send(buildActionMessage("subscribe", "graph", null, new String[]{"update"})); + var json = new JSONObject(result.get(1, TimeUnit.SECONDS)); + Assertions.assertEquals("ok", json.get("result")); + + result = client.get(); + var db = this.getServerDatabase(0, "graph"); + var v2 = db.newVertex("V2").set("name", "test"); + v2.save(); + + try { + result.get(100, TimeUnit.MILLISECONDS); + } catch (TimeoutException ignored) { + } + + Assertions.assertFalse(result.isDone()); + + client.close(); + } + + @Test + public void subscribeTypeDoesNotPushOtherTypes() throws Exception { + var client = new WebSocketClientHelper("ws://localhost:2480/ws", "root", BaseGraphServerTest.DEFAULT_PASSWORD_FOR_TESTS); + + var result = client.send(buildActionMessage("subscribe", "graph", "V1")); + var json = new JSONObject(result.get(1, TimeUnit.SECONDS)); + Assertions.assertEquals("ok", json.get("result")); + + result = client.get(); + var db = this.getServerDatabase(0, "graph"); + var v2 = db.newVertex("V2").set("name", "test"); + v2.save(); + + try { + result.get(100, TimeUnit.MILLISECONDS); + } catch (TimeoutException ignored) { + } + + Assertions.assertFalse(result.isDone()); client.close(); } @@ -31,17 +126,18 @@ public void subscribeDatabaseWorks() throws Exception { public void unsubscribeDatabaseWorks() throws Exception { var client = new WebSocketClientHelper("ws://localhost:2480/ws", "root", BaseGraphServerTest.DEFAULT_PASSWORD_FOR_TESTS); - var result = client.send(buildActionMessage("subscribe", "graph", null)); + var result = client.send(buildActionMessage("subscribe", "graph")); var json = new JSONObject(result.get(1, TimeUnit.SECONDS)); Assertions.assertEquals("ok", json.get("result")); - result = client.send(buildActionMessage("unsubscribe", "graph", null)); + result = client.send(buildActionMessage("unsubscribe", "graph")); json = new JSONObject(result.get(1, TimeUnit.SECONDS)); Assertions.assertEquals("ok", json.get("result")); result = client.get(); var db = this.getServerDatabase(0, "graph"); - db.execute("sql", "INSERT INTO V1 content {name: 'Test'};"); + var v1 = db.newVertex("V1").set("name", "test"); + v1.save(); try { result.get(100, TimeUnit.MILLISECONDS); @@ -53,11 +149,20 @@ public void unsubscribeDatabaseWorks() throws Exception { client.close(); } + private static String buildActionMessage(String action, String database) { + return buildActionMessage(action, database, null, null); + } + private static String buildActionMessage(String action, String database, String type) { + return buildActionMessage(action, database, type, null); + } + + private static String buildActionMessage(String action, String database, String type, String[] changeTypes) { var obj = new JSONObject(); obj.put("action", action); - if (database != null) obj.put("database", database); - if (type != null) obj.put("type", type); + obj.putOpt("database", database); + obj.putOpt("type", type); + obj.putOpt("changeTypes", changeTypes); return obj.toString(); } } From 875dc39960cdaa4cb7957e00238895e6f6b1f141 Mon Sep 17 00:00:00 2001 From: lvca Date: Wed, 27 Oct 2021 16:12:45 -0400 Subject: [PATCH 10/11] Fixed race condition on mutable document on trigger --- .../arcadedb/database/MutableDocument.java | 46 +++++++++---------- .../arcadedb/server/http/ws/ChangeEvent.java | 4 +- .../http/ws/EventWatcherSubscription.java | 12 ++--- 3 files changed, 30 insertions(+), 32 deletions(-) diff --git a/engine/src/main/java/com/arcadedb/database/MutableDocument.java b/engine/src/main/java/com/arcadedb/database/MutableDocument.java index fbfc4db0d2..67f83897f1 100644 --- a/engine/src/main/java/com/arcadedb/database/MutableDocument.java +++ b/engine/src/main/java/com/arcadedb/database/MutableDocument.java @@ -42,29 +42,29 @@ protected MutableDocument(final Database database, final DocumentType type, fina buffer.position(buffer.position() + 1); // SKIP RECORD TYPE } - public void merge(final Map other) { + public synchronized void merge(final Map other) { for (Map.Entry entry : other.entrySet()) set(entry.getKey(), entry.getValue()); } - public boolean isDirty() { + public synchronized boolean isDirty() { return dirty; } @Override - public void setBuffer(final Binary buffer) { + public synchronized void setBuffer(final Binary buffer) { super.setBuffer(buffer); dirty = false; //map = null; // AVOID RESETTING HERE FOR INDEXES THAT CAN LOOKUP UP FOR FIELDS CAUSING AN UNMARSHALLING } @Override - public void unsetDirty() { + public synchronized void unsetDirty() { map = null; dirty = false; } - public void fromMap(final Map map) { + public synchronized void fromMap(final Map map) { this.map = new LinkedHashMap<>(map.size()); for (Map.Entry entry : map.entrySet()) this.map.put(entry.getKey(), convertValueToSchemaType(entry.getKey(), entry.getValue(), type)); @@ -73,17 +73,17 @@ public void fromMap(final Map map) { } @Override - public Map toMap() { + public synchronized Map toMap() { checkForLazyLoadingProperties(); return Collections.unmodifiableMap(map); } - public void fromJSON(final JSONObject json) { + public synchronized void fromJSON(final JSONObject json) { fromMap(new JSONSerializer(database).json2map(json)); } @Override - public JSONObject toJSON() { + public synchronized JSONObject toJSON() { checkForLazyLoadingProperties(); final JSONObject result = new JSONSerializer(database).map2json(map); result.put("@type", type.getName()); @@ -93,12 +93,12 @@ public JSONObject toJSON() { } @Override - public boolean has(String propertyName) { + public synchronized boolean has(String propertyName) { checkForLazyLoadingProperties(); return map.containsKey(propertyName); } - public Object get(final String propertyName) { + public synchronized Object get(final String propertyName) { checkForLazyLoadingProperties(); return map.get(propertyName); } @@ -106,7 +106,7 @@ public Object get(final String propertyName) { /** * Sets the property value in the document. If the property has been defined in the schema, the value is converted according to the property type. */ - public MutableDocument set(final String name, Object value) { + public synchronized MutableDocument set(final String name, Object value) { checkForLazyLoadingProperties(); dirty = true; value = setTransformValue(value, name); @@ -119,7 +119,7 @@ public MutableDocument set(final String name, Object value) { * * @param properties Array containing pairs of name (String) and value (Object) */ - public MutableDocument set(final Object... properties) { + public synchronized MutableDocument set(final Object... properties) { if (properties == null || properties.length == 0) throw new IllegalArgumentException("Empty list of properties"); @@ -149,7 +149,7 @@ public MutableDocument set(final Object... properties) { * * @return MutableEmbeddedDocument instance */ - public MutableEmbeddedDocument newEmbeddedDocument(final String embeddedTypeName, final String propertyName) { + public synchronized MutableEmbeddedDocument newEmbeddedDocument(final String embeddedTypeName, final String propertyName) { final Object old = get(propertyName); final MutableEmbeddedDocument emb = database.newEmbeddedDocument(new EmbeddedModifierProperty(this, propertyName), embeddedTypeName); @@ -170,7 +170,7 @@ public MutableEmbeddedDocument newEmbeddedDocument(final String embeddedTypeName * * @return MutableEmbeddedDocument instance */ - public MutableEmbeddedDocument newEmbeddedDocument(final String embeddedTypeName, final String propertyName, final Object propertyMapKey) { + public synchronized MutableEmbeddedDocument newEmbeddedDocument(final String embeddedTypeName, final String propertyName, final Object propertyMapKey) { final Object old = get(propertyName); if (old == null) @@ -193,7 +193,7 @@ public MutableEmbeddedDocument newEmbeddedDocument(final String embeddedTypeName * * @param properties {@literal Map} containing pairs of name (String) and value (Object) */ - public MutableDocument set(final Map properties) { + public synchronized MutableDocument set(final Map properties) { checkForLazyLoadingProperties(); dirty = true; @@ -206,13 +206,13 @@ public MutableDocument set(final Map properties) { return this; } - public Object remove(final String name) { + public synchronized Object remove(final String name) { checkForLazyLoadingProperties(); dirty = true; return map.remove(name); } - public MutableDocument save() { + public synchronized MutableDocument save() { dirty = true; if (rid != null) database.updateRecord(this); @@ -221,7 +221,7 @@ public MutableDocument save() { return this; } - public MutableDocument save(final String bucketName) { + public synchronized MutableDocument save(final String bucketName) { dirty = true; if (rid != null) throw new IllegalStateException("Cannot update a record in a custom bucket"); @@ -231,12 +231,12 @@ public MutableDocument save(final String bucketName) { } @Override - public void setIdentity(final RID rid) { + public synchronized void setIdentity(final RID rid) { this.rid = rid; } @Override - public String toString() { + public synchronized String toString() { final StringBuilder result = new StringBuilder(256); if (rid != null) result.append(rid); @@ -264,17 +264,17 @@ public String toString() { } @Override - public Set getPropertyNames() { + public synchronized Set getPropertyNames() { checkForLazyLoadingProperties(); return map.keySet(); } - public MutableDocument modify() { + public synchronized MutableDocument modify() { return this; } @Override - public void reload() { + public synchronized void reload() { dirty = false; map = null; buffer = null; diff --git a/server/src/main/java/com/arcadedb/server/http/ws/ChangeEvent.java b/server/src/main/java/com/arcadedb/server/http/ws/ChangeEvent.java index a63487cb5c..91856f0466 100644 --- a/server/src/main/java/com/arcadedb/server/http/ws/ChangeEvent.java +++ b/server/src/main/java/com/arcadedb/server/http/ws/ChangeEvent.java @@ -9,7 +9,7 @@ public class ChangeEvent { public enum TYPE {CREATE, UPDATE, DELETE} - public ChangeEvent(TYPE type, Record record) { + public ChangeEvent(final TYPE type, final Record record) { this.type = type; this.record = record; } @@ -23,7 +23,7 @@ public TYPE getType() { } public String toJSON() { - var jsonObject = new JSONObject(); + final var jsonObject = new JSONObject(); jsonObject.put("changeType", this.type.toString().toLowerCase()); jsonObject.put("record", this.record.toJSON()); jsonObject.put("database", this.record.getDatabase().getName()); diff --git a/server/src/main/java/com/arcadedb/server/http/ws/EventWatcherSubscription.java b/server/src/main/java/com/arcadedb/server/http/ws/EventWatcherSubscription.java index 505e55c6bb..9f942e272e 100644 --- a/server/src/main/java/com/arcadedb/server/http/ws/EventWatcherSubscription.java +++ b/server/src/main/java/com/arcadedb/server/http/ws/EventWatcherSubscription.java @@ -2,9 +2,7 @@ import io.undertow.websockets.core.WebSocketChannel; -import java.util.HashSet; -import java.util.List; -import java.util.Set; +import java.util.*; public class EventWatcherSubscription { private final String database; @@ -12,7 +10,7 @@ public class EventWatcherSubscription { private final Set changeTypes; private final WebSocketChannel channel; - public EventWatcherSubscription(String database, String type, List changeTypes, WebSocketChannel channel) { + public EventWatcherSubscription(final String database, final String type, final List changeTypes, final WebSocketChannel channel) { this.database = database; this.type = type; if (changeTypes != null) { @@ -40,8 +38,8 @@ public WebSocketChannel getChannel() { return channel; } - public boolean isMatch(ChangeEvent event) { - return (this.changeTypes == null || this.changeTypes.contains(event.getType())) && - (this.type == null || this.type.equals(event.getRecord().asDocument().getTypeName())); + public boolean isMatch(final ChangeEvent event) { + return (this.changeTypes == null || this.changeTypes.contains(event.getType())) && (this.type == null || this.type.equals( + event.getRecord().asDocument().getTypeName())); } } From 1f48dd7af14a6b6bd385485e34357a148ae87a5a Mon Sep 17 00:00:00 2001 From: lvca Date: Wed, 3 Nov 2021 18:32:28 -0400 Subject: [PATCH 11/11] Enabled WS tests --- pom.xml | 1 - 1 file changed, 1 deletion(-) diff --git a/pom.xml b/pom.xml index 2991a943a3..9f2d9a8d35 100644 --- a/pom.xml +++ b/pom.xml @@ -192,7 +192,6 @@ **/*TwoServersIT.java **/*FullBackupIT.java **/*RemoteConsoleIT.java - **/*WebSocketEventBusIT.java **/ArcadeGraphProcessDebugTest.java **/ArcadeGraphStructureDebugTest.java ${exclude.tests}