4343import java .util .concurrent .Executors ;
4444import java .util .concurrent .TimeoutException ;
4545import java .util .concurrent .atomic .AtomicLong ;
46+ import java .util .concurrent .locks .Lock ;
47+ import java .util .concurrent .locks .ReadWriteLock ;
48+ import java .util .concurrent .locks .ReentrantReadWriteLock ;
4649import java .util .function .Consumer ;
4750import java .util .logging .Level ;
4851import java .util .logging .Logger ;
@@ -64,6 +67,7 @@ public class Connection implements Closeable {
6467 private static final AtomicLong NEXT_ID = new AtomicLong (1L );
6568 private final WebSocket socket ;
6669 private final Map <Long , Consumer <Either <Throwable , JsonInput >>> methodCallbacks = new LinkedHashMap <>();
70+ private final ReadWriteLock callbacksLock = new ReentrantReadWriteLock (true );
6771 private final Multimap <Event <?>, Consumer <?>> eventCallbacks = HashMultimap .create ();
6872
6973 public Connection (HttpClient client , String url ) {
@@ -162,14 +166,22 @@ public <X> void addListener(Event<X> event, Consumer<X> handler) {
162166 Require .nonNull ("Event to listen for" , event );
163167 Require .nonNull ("Handler to call" , handler );
164168
165- synchronized (eventCallbacks ) {
169+ Lock lock = callbacksLock .writeLock ();
170+ lock .lock ();
171+ try {
166172 eventCallbacks .put (event , handler );
173+ } finally {
174+ lock .unlock ();
167175 }
168176 }
169177
170178 public void clearListeners () {
171- synchronized (eventCallbacks ) {
179+ Lock lock = callbacksLock .writeLock ();
180+ lock .lock ();
181+ try {
172182 eventCallbacks .clear ();
183+ } finally {
184+ lock .unlock ();
173185 }
174186 }
175187
@@ -233,7 +245,9 @@ private void handle(CharSequence data) {
233245 LOG .log (
234246 getDebugLogLevel (),
235247 String .format ("Method %s called with %d callbacks available" , raw .get ("method" ), eventCallbacks .keySet ().size ()));
236- synchronized (eventCallbacks ) {
248+ Lock lock = callbacksLock .readLock ();
249+ lock .lock ();
250+ try {
237251 // TODO: Also only decode once.
238252 eventCallbacks .keySet ().stream ()
239253 .peek (event -> LOG .log (
@@ -275,6 +289,8 @@ private void handle(CharSequence data) {
275289 }
276290 }
277291 });
292+ } finally {
293+ lock .unlock ();
278294 }
279295 } else {
280296 LOG .warning ("Unhandled type: " + data );
0 commit comments