-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathclient.nim
More file actions
253 lines (181 loc) · 7.51 KB
/
client.nim
File metadata and controls
253 lines (181 loc) · 7.51 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
## Main Entry point to the ChatSDK.
## Clients are the primary manager of sending and receiving
## messages, and managing conversations.
import # Foreign
chronicles,
chronos,
libchat,
std/options,
strformat,
types
import #local
delivery/waku_client,
errors,
types,
utils
logScope:
topics = "chat client"
#################################################
# Definitions
#################################################
# Type used to return message data via callback
type ReceivedMessage* = ref object of RootObj
sender*: PublicKey
timestamp*: int64
content*: seq[byte]
type ConvoType = enum
PrivateV1
type Conversation* = object
ctx: LibChat
convoId: string
ds: WakuClient
convo_type: ConvoType
proc id*(self: Conversation): string =
return self.convoId
type
MessageCallback* = proc(conversation: Conversation, msg: ReceivedMessage): Future[void] {.async.}
NewConvoCallback* = proc(conversation: Conversation): Future[void] {.async.}
DeliveryAckCallback* = proc(conversation: Conversation,
msgId: MessageId): Future[void] {.async.}
type ChatClient* = ref object
libchatCtx: LibChat
ds*: WakuClient
inboundQueue: QueueRef
isRunning: bool
newMessageCallbacks: seq[MessageCallback]
newConvoCallbacks: seq[NewConvoCallback]
deliveryAckCallbacks: seq[DeliveryAckCallback]
#################################################
# Constructors
#################################################
proc newClient*(ds: WakuClient, ephemeral: bool = true, installation_name: string = "default"): Result[ChatClient, ErrorType] =
## Creates new instance of a `ChatClient` with a given `WakuConfig`.
## A new installation is created if no saved installation with `installation_name` is found
if not ephemeral:
return err("persistence is not currently supported")
try:
var q = QueueRef(queue: newAsyncQueue[ChatPayload](10))
var c = ChatClient(
libchatCtx: newConversationsContext(installation_name),
ds: ds,
inboundQueue: q,
isRunning: false,
newMessageCallbacks: @[],
newConvoCallbacks: @[])
notice "Client started"
result = ok(c)
except Exception as e:
error "newCLient", err = e.msg
result = err(e.msg)
#################################################
# Parameter Access
#################################################
proc getId*(client: ChatClient): string =
result = client.libchatCtx.getInstallationName()
proc listConversations*(client: ChatClient): seq[Conversation] =
# TODO: (P1) Implement list conversations
result = @[]
#################################################
# Callback Handling
#################################################
proc onNewMessage*(client: ChatClient, callback: MessageCallback) =
client.newMessageCallbacks.add(callback)
proc notifyNewMessage*(client: ChatClient, convo: Conversation, msg: ReceivedMessage) =
for cb in client.newMessageCallbacks:
discard cb(convo, msg)
proc onNewConversation*(client: ChatClient, callback: NewConvoCallback) =
client.newConvoCallbacks.add(callback)
proc notifyNewConversation(client: ChatClient, convo: Conversation) =
for cb in client.newConvoCallbacks:
debug "calling OnConvo CB", client=client.getId(), len = client.newConvoCallbacks.len()
discard cb(convo)
proc onDeliveryAck*(client: ChatClient, callback: DeliveryAckCallback) =
client.deliveryAckCallbacks.add(callback)
proc notifyDeliveryAck(client: ChatClient, convo: Conversation,
messageId: MessageId) =
for cb in client.deliveryAckCallbacks:
discard cb(convo, messageId)
#################################################
# Functional
#################################################
proc createIntroBundle*(self: ChatClient): seq[byte] =
## Generates an IntroBundle for the client, which includes
## the required information to send a message.
result = self.libchatCtx.createIntroductionBundle().valueOr:
error "could not create bundle",error=error, client = self.getId()
return
notice "IntroBundleCreated", client = self.getId(),
bundle = result
proc sendPayloads(ds: WakuClient, payloads: seq[PayloadResult]) =
for payload in payloads:
# TODO: (P2) surface errors
discard ds.sendBytes(payload.address, payload.data)
#################################################
# Conversation Initiation
#################################################
proc getConversation*(client: ChatClient, convoId: string): Conversation =
result = Conversation(ctx:client.libchatCtx, convoId:convoId, ds: client.ds, convo_type: PrivateV1)
proc newPrivateConversation*(client: ChatClient,
introBundle: seq[byte], content: Content): Future[Option[ChatError]] {.async.} =
let res = client.libchatCtx.createNewPrivateConvo(introBundle, content)
let (convoId, payloads) = res.valueOr:
error "could not create bundle",error=error, client = client.getId()
return some(ChatError(code: errLibChat, context:fmt"got: {error}" ))
client.ds.sendPayloads(payloads);
client.notifyNewConversation(Conversation(ctx: client.libchatCtx,
convoId : convoId, ds: client.ds, convo_type: ConvoType.PrivateV1
))
notice "CREATED", client=client.getId(), convoId=convoId
return none(ChatError)
#################################################
# Payload Handling
# Receives a incoming payload, decodes it, and processes it.
#################################################
proc parseMessage(client: ChatClient, msg: ChatPayload) {.raises: [ValueError].} =
try:
let opt_content = client.libchatCtx.handlePayload(msg.bytes).valueOr:
error "handlePayload" , error=error, client=client.getId()
return
if opt_content.isSome():
let content = opt_content.get()
let convo = client.getConversation(content.conversationId)
if content.isNewConvo:
client.notifyNewConversation(convo)
# TODO: (P1) Add sender information from LibChat.
let msg = ReceivedMessage(timestamp:getCurrentTimestamp(),content: content.data )
client.notifyNewMessage(convo, msg)
else:
debug "Parsed message generated no content", client=client.getId()
except Exception as e:
error "HandleFrame Failed", error = e.msg
proc sendMessage*(convo: Conversation, content: Content) : Future[MessageId] {.async, gcsafe.} =
let payloads = convo.ctx.sendContent(convo.convoId, content).valueOr:
error "SendMessage", e=error
return "error"
convo.ds.sendPayloads(payloads);
#################################################
# Async Tasks
#################################################
proc messageQueueConsumer(client: ChatClient) {.async.} =
## Main message processing loop
info "Message listener started"
while client.isRunning:
let message = await client.inboundQueue.queue.get()
debug "Got WakuMessage", client = client.getId() , topic= message.content_topic, len=message.bytes.len()
client.parseMessage(message)
#################################################
# Control Functions
#################################################
proc start*(client: ChatClient) {.async.} =
## Start `ChatClient` and listens for incoming messages.
client.ds.addDispatchQueue(client.inboundQueue)
asyncSpawn client.ds.start()
client.isRunning = true
asyncSpawn client.messageQueueConsumer()
notice "Client start complete", client = client.getId()
proc stop*(client: ChatClient) {.async.} =
## Stop the client.
await client.ds.stop()
client.libchatCtx.destroy()
client.isRunning = false
notice "Client stopped", client = client.getId()