Skip to content

Commit df385cd

Browse files
hanspageltommoor
andauthored
Improve Message Handling, see #170 (#171)
* server: refactor incoming message handling * server: handle unknown messages more graceful * playground: disable onAuthenticate demo * provider: remove useless second parameter * server: move message handling logic to the message receiver * refactor the whole message handling (wip) * refactoring * refactoring * mute exceptions in the provider message receiver (wip) * reenable readOnly connections * clean up tests * docs: update all links from the overview to the single hook, fix #169 * fix: Potential onCreateDocument race condition (#167) * fix: Potential onCreateDocument race condition on highly contested server fix: 'onChange' callback triggered upon hydration of document from persistence * add test * refactor * Use onAuthenticate hooks from custom extensions, see #170 (#172) * use onAuthenticate hooks from custom extensions * simplify the check for the onAuthenticate hook * server: refactor incoming message handling * server: handle unknown messages more graceful * playground: disable onAuthenticate demo * provider: remove useless second parameter * server: move message handling logic to the message receiver * refactor the whole message handling (wip) * refactoring * refactoring * mute exceptions in the provider message receiver (wip) * reenable readOnly connections * clean up tests Co-authored-by: Tom Moor <tom.moor@gmail.com>
1 parent 60d9fab commit df385cd

File tree

15 files changed

+216
-173
lines changed

15 files changed

+216
-173
lines changed

demos/backend/src/minimal.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,11 @@ const server = Server.configure({
77
new Logger(),
88
],
99

10-
async onAuthenticate(data) {
11-
if (data.token !== 'my-access-token') {
12-
throw new Error('Incorrect access token')
13-
}
14-
},
10+
// async onAuthenticate(data) {
11+
// if (data.token !== 'my-access-token') {
12+
// throw new Error('Incorrect access token')
13+
// }
14+
// },
1515

1616
// Test error handling
1717
// async onConnect(data) {

packages/provider/src/HocuspocusProvider.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -365,9 +365,10 @@ export class HocuspocusProvider extends EventEmitter {
365365

366366
this.emit('message', { event, message })
367367

368-
const encoder = new MessageReceiver(message, this).apply(this)
368+
new MessageReceiver(message).apply(this)
369369

370370
// TODO: What’s that doing?
371+
// Move to the MessageReceiver
371372
// if (encoding.length(encoder) > 1) {
372373
// this.send(encoding.toUint8Array(encoder))
373374
// }
Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,44 @@
1-
import * as decoding from 'lib0/decoding'
2-
import * as encoding from 'lib0/encoding'
1+
import {
2+
createDecoder,
3+
readVarUint,
4+
readVarUint8Array,
5+
Decoder,
6+
} from 'lib0/decoding'
7+
import {
8+
Encoder,
9+
createEncoder,
10+
writeVarUint,
11+
writeVarUint8Array,
12+
} from 'lib0/encoding'
313
import { MessageType } from './types'
414

515
export class IncomingMessage {
616

717
data: any
818

9-
encoder: encoding.Encoder
19+
encoder: Encoder
1020

11-
decoder: decoding.Decoder
12-
13-
type: MessageType
21+
decoder: Decoder
1422

1523
constructor(data: any) {
1624
this.data = data
17-
this.encoder = encoding.createEncoder()
18-
this.decoder = decoding.createDecoder(new Uint8Array(this.data))
19-
this.type = decoding.readVarUint(this.decoder)
25+
this.encoder = createEncoder()
26+
this.decoder = createDecoder(new Uint8Array(this.data))
27+
}
28+
29+
readVarUint(): MessageType {
30+
return readVarUint(this.decoder)
31+
}
32+
33+
readVarUint8Array() {
34+
return readVarUint8Array(this.decoder)
35+
}
36+
37+
writeVarUint(type: MessageType) {
38+
return writeVarUint(this.encoder, type)
2039
}
2140

22-
get name() {
23-
return MessageType[this.type]
41+
writeVarUint8Array(data: Uint8Array) {
42+
return writeVarUint8Array(this.encoder, data)
2443
}
2544
}

packages/provider/src/MessageReceiver.ts

Lines changed: 30 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
1-
import * as decoding from 'lib0/decoding'
2-
import * as encoding from 'lib0/encoding'
31
import * as awarenessProtocol from 'y-protocols/awareness'
4-
import * as syncProtocol from 'y-protocols/sync'
2+
import { readSyncMessage, messageYjsSyncStep2 } from 'y-protocols/sync'
53
import { MessageType } from './types'
64
import { HocuspocusProvider } from './HocuspocusProvider'
75
import { IncomingMessage } from './IncomingMessage'
@@ -16,7 +14,9 @@ export class MessageReceiver {
1614
}
1715

1816
public apply(provider: HocuspocusProvider, emitSynced = true) {
19-
switch (this.message.type) {
17+
const type = this.message.readVarUint()
18+
19+
switch (type) {
2020
case MessageType.Sync:
2121
this.applySyncMessage(provider, emitSynced)
2222
break
@@ -34,47 +34,56 @@ export class MessageReceiver {
3434
break
3535

3636
default:
37-
throw new Error(`Can’t apply unknown type of message: ${this.message.type}`)
37+
throw new Error(`Can’t apply message of unknown type: ${type}`)
3838
}
39-
40-
return this.message.encoder
4139
}
4240

4341
private applySyncMessage(provider: HocuspocusProvider, emitSynced: boolean) {
44-
encoding.writeVarUint(this.message.encoder, MessageType.Sync)
42+
const { message } = this
4543

46-
const syncMessageType = syncProtocol.readSyncMessage(
47-
this.message.decoder,
48-
this.message.encoder,
49-
provider.document,
50-
provider,
51-
)
44+
message.writeVarUint(MessageType.Sync)
5245

53-
if (emitSynced && syncMessageType === syncProtocol.messageYjsSyncStep2) {
54-
provider.synced = true
46+
try {
47+
const syncMessageType = readSyncMessage(
48+
message.decoder,
49+
message.encoder,
50+
provider.document,
51+
provider,
52+
)
53+
54+
if (emitSynced && syncMessageType === messageYjsSyncStep2) {
55+
provider.synced = true
56+
}
57+
} catch (e) {
58+
// TODO: That shouldn’t happen … but it does. Remove the try/catch and run the tests.
5559
}
5660
}
5761

5862
private applyAwarenessMessage(provider: HocuspocusProvider) {
63+
const { message } = this
64+
5965
awarenessProtocol.applyAwarenessUpdate(
6066
provider.awareness,
61-
decoding.readVarUint8Array(this.message.decoder),
67+
message.readVarUint8Array(),
6268
provider,
6369
)
6470
}
6571

6672
private applyAuthMessage(provider: HocuspocusProvider) {
73+
const { message } = this
74+
6775
readAuthMessage(
68-
this.message.decoder,
76+
message.decoder,
6977
provider.permissionDeniedHandler.bind(provider),
7078
provider.authenticatedHandler.bind(provider),
7179
)
7280
}
7381

7482
private applyQueryAwarenessMessage(provider: HocuspocusProvider) {
75-
encoding.writeVarUint(this.message.encoder, MessageType.Awareness)
76-
encoding.writeVarUint8Array(
77-
this.message.encoder,
83+
const { message } = this
84+
85+
message.writeVarUint(MessageType.Awareness)
86+
message.writeVarUint8Array(
7887
awarenessProtocol.encodeAwarenessUpdate(
7988
provider.awareness,
8089
Array.from(provider.awareness.getStates().keys()),

packages/provider/src/MessageSender.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import * as encoding from 'lib0/encoding'
1+
import { Encoder, toUint8Array } from 'lib0/encoding'
22
import * as bc from 'lib0/broadcastchannel'
33
import { AuthenticationMessage } from './OutgoingMessages/AuthenticationMessage'
44
import { AwarenessMessage } from './OutgoingMessages/AwarenessMessage'
@@ -10,7 +10,7 @@ import { Constructable } from './types'
1010

1111
export class MessageSender {
1212

13-
encoder: encoding.Encoder
13+
encoder: Encoder
1414

1515
message: any
1616

@@ -27,7 +27,7 @@ export class MessageSender {
2727
}
2828

2929
create() {
30-
return encoding.toUint8Array(this.encoder)
30+
return toUint8Array(this.encoder)
3131
}
3232

3333
send(webSocket: any) {
Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,16 @@
1-
import * as encoding from 'lib0/encoding'
1+
import { Encoder, createEncoder, toUint8Array } from 'lib0/encoding'
22
import { MessageType, OutgoingMessageInterface } from './types'
33

44
export class OutgoingMessage implements OutgoingMessageInterface {
5-
encoder: encoding.Encoder
5+
encoder: Encoder
66

77
type?: MessageType
88

99
constructor() {
10-
this.encoder = encoding.createEncoder()
10+
this.encoder = createEncoder()
1111
}
1212

13-
get name() {
14-
if (typeof this.type === 'number') {
15-
return MessageType[this.type]
16-
}
17-
18-
throw new Error('Type for outgoing message not set.')
13+
toUint8Array() {
14+
return toUint8Array(this.encoder)
1915
}
2016
}

packages/provider/src/OutgoingMessages/AuthenticationMessage.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import * as encoding from 'lib0/encoding'
1+
import { writeVarUint } from 'lib0/encoding'
22
import { writeAuthentication } from '../../../../shared/protocols/auth'
33
import { MessageType, OutgoingMessageArguments } from '../types'
44
import { OutgoingMessage } from '../OutgoingMessage'
@@ -10,10 +10,10 @@ export class AuthenticationMessage extends OutgoingMessage {
1010

1111
get(args: Partial<OutgoingMessageArguments>) {
1212
if (typeof args.token === 'undefined') {
13-
throw new Error('The authentication message requires token as an argument')
13+
throw new Error('The authentication message requires `token` as an argument.')
1414
}
1515

16-
encoding.writeVarUint(this.encoder, this.type)
16+
writeVarUint(this.encoder, this.type)
1717
writeAuthentication(this.encoder, args.token)
1818

1919
return this.encoder

packages/provider/src/OutgoingMessages/UpdateMessage.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
import * as encoding from 'lib0/encoding'
2-
import * as syncProtocol from 'y-protocols/sync'
1+
import { writeVarUint } from 'lib0/encoding'
2+
import { writeUpdate } from 'y-protocols/sync'
33
import { MessageType, OutgoingMessageArguments } from '../types'
44
import { OutgoingMessage } from '../OutgoingMessage'
55

@@ -9,8 +9,8 @@ export class UpdateMessage extends OutgoingMessage {
99
description = 'A document update'
1010

1111
get(args: Partial<OutgoingMessageArguments>) {
12-
encoding.writeVarUint(this.encoder, this.type)
13-
syncProtocol.writeUpdate(this.encoder, args.update)
12+
writeVarUint(this.encoder, this.type)
13+
writeUpdate(this.encoder, args.update)
1414

1515
return this.encoder
1616
}

packages/server/src/Connection.ts

Lines changed: 6 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,9 @@ import { IncomingMessage as HTTPIncomingMessage } from 'http'
44

55
import Document from './Document'
66
import { IncomingMessage } from './IncomingMessage'
7-
import { MessageType, WsReadyStates } from './types'
7+
import { WsReadyStates } from './types'
88
import { OutgoingMessage } from './OutgoingMessage'
9+
import { MessageReceiver } from './MessageReceiver'
910

1011
class Connection {
1112

@@ -166,24 +167,10 @@ class Connection {
166167
* Handle an incoming message
167168
* @private
168169
*/
169-
private handleMessage(input: Iterable<number>): void {
170-
const message = new IncomingMessage(input)
171-
172-
if (message.type === MessageType.Awareness) {
173-
this.document.applyAwarenessUpdate(this, message.readUint8Array())
174-
return
175-
}
176-
177-
message.readSyncMessageAndApplyItTo(this.document, this)
178-
179-
if (message.length <= 1) {
180-
return
181-
}
182-
183-
return this.send(
184-
message.toUint8Array(),
185-
)
186-
170+
private handleMessage(data: Iterable<number>): void {
171+
new MessageReceiver(
172+
new IncomingMessage(data),
173+
).apply(this)
187174
}
188175

189176
/**

0 commit comments

Comments
 (0)