Skip to content

Commit fdc0d16

Browse files
authored
[wasm][debugger] Make DevToolsProxy's pending_ops thread-safe
Currently, `pending_ops` can get written by different threads at the same time, and also read in parallel. This causes tests to randomly fail with: ``` DevToolsProxy::Run: Exception System.ArgumentException: The tasks argument included a null value. (Parameter 'tasks') at System.Threading.Tasks.Task.WhenAny(Task[] tasks) at Microsoft.WebAssembly.Diagnostics.DevToolsProxy.Run(Uri browserUri, WebSocket ideSocket) in /_/src/mono/wasm/debugger/BrowserDebugProxy/DevToolsProxy.cs:line 269 ``` Instead, we use `Channel<T>` to add the ops, and then read those in the main loop, and add to the *local* `pending_ops` list.
1 parent 0c33964 commit fdc0d16

2 files changed

Lines changed: 61 additions & 27 deletions

File tree

src/mono/wasm/debugger/BrowserDebugProxy/DevToolsProxy.cs

Lines changed: 55 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
using System.Net.WebSockets;
99
using System.Text;
1010
using System.Threading;
11+
using System.Threading.Channels;
1112
using System.Threading.Tasks;
1213
using Microsoft.Extensions.Logging;
1314
using Newtonsoft.Json;
@@ -24,14 +25,19 @@ internal class DevToolsProxy
2425
private ClientWebSocket browser;
2526
private WebSocket ide;
2627
private int next_cmd_id;
27-
private List<Task> pending_ops = new List<Task>();
28+
private readonly ChannelWriter<Task> _channelWriter;
29+
private readonly ChannelReader<Task> _channelReader;
2830
private List<DevToolsQueue> queues = new List<DevToolsQueue>();
2931

3032
protected readonly ILogger logger;
3133

3234
public DevToolsProxy(ILoggerFactory loggerFactory)
3335
{
3436
logger = loggerFactory.CreateLogger<DevToolsProxy>();
37+
38+
var channel = Channel.CreateUnbounded<Task>(new UnboundedChannelOptions { SingleReader = true });
39+
_channelWriter = channel.Writer;
40+
_channelReader = channel.Reader;
3541
}
3642

3743
protected virtual Task<bool> AcceptEvent(SessionId sessionId, string method, JObject args, CancellationToken token)
@@ -93,7 +99,7 @@ private DevToolsQueue GetQueueForTask(Task task)
9399
return queues.FirstOrDefault(q => q.CurrentSend == task);
94100
}
95101

96-
private void Send(WebSocket to, JObject o, CancellationToken token)
102+
private async Task Send(WebSocket to, JObject o, CancellationToken token)
97103
{
98104
string sender = browser == to ? "Send-browser" : "Send-ide";
99105

@@ -105,7 +111,7 @@ private void Send(WebSocket to, JObject o, CancellationToken token)
105111

106112
Task task = queue.Send(bytes, token);
107113
if (task != null)
108-
pending_ops.Add(task);
114+
await _channelWriter.WriteAsync(task, token);
109115
}
110116

111117
private async Task OnEvent(SessionId sessionId, string method, JObject args, CancellationToken token)
@@ -115,7 +121,7 @@ private async Task OnEvent(SessionId sessionId, string method, JObject args, Can
115121
if (!await AcceptEvent(sessionId, method, args, token))
116122
{
117123
//logger.LogDebug ("proxy browser: {0}::{1}",method, args);
118-
SendEventInternal(sessionId, method, args, token);
124+
await SendEventInternal(sessionId, method, args, token);
119125
}
120126
}
121127
catch (Exception e)
@@ -131,7 +137,7 @@ private async Task OnCommand(MessageId id, string method, JObject args, Cancella
131137
if (!await AcceptCommand(id, method, args, token))
132138
{
133139
Result res = await SendCommandInternal(id, method, args, token);
134-
SendResponseInternal(id, res, token);
140+
await SendResponseInternal(id, res, token);
135141
}
136142
}
137143
catch (Exception e)
@@ -152,31 +158,38 @@ private void OnResponse(MessageId id, Result result)
152158
logger.LogError("Cannot respond to command: {id} with result: {result} - command is not pending", id, result);
153159
}
154160

155-
private void ProcessBrowserMessage(string msg, CancellationToken token)
161+
private Task ProcessBrowserMessage(string msg, CancellationToken token)
156162
{
157163
var res = JObject.Parse(msg);
158164

159165
//if (method != "Debugger.scriptParsed" && method != "Runtime.consoleAPICalled")
160166
Log("protocol", $"browser: {msg}");
161167

162168
if (res["id"] == null)
163-
pending_ops.Add(OnEvent(res.ToObject<SessionId>(), res["method"].Value<string>(), res["params"] as JObject, token));
169+
{
170+
return OnEvent(res.ToObject<SessionId>(), res["method"].Value<string>(), res["params"] as JObject, token);
171+
}
164172
else
173+
{
165174
OnResponse(res.ToObject<MessageId>(), Result.FromJson(res));
175+
return null;
176+
}
166177
}
167178

168-
private void ProcessIdeMessage(string msg, CancellationToken token)
179+
private Task ProcessIdeMessage(string msg, CancellationToken token)
169180
{
170181
Log("protocol", $"ide: {msg}");
171182
if (!string.IsNullOrEmpty(msg))
172183
{
173184
var res = JObject.Parse(msg);
174185
var id = res.ToObject<MessageId>();
175-
pending_ops.Add(OnCommand(
186+
return OnCommand(
176187
id,
177188
res["method"].Value<string>(),
178-
res["params"] as JObject, token));
189+
res["params"] as JObject, token);
179190
}
191+
192+
return null;
180193
}
181194

182195
internal async Task<Result> SendCommand(SessionId id, string method, JObject args, CancellationToken token)
@@ -185,7 +198,7 @@ internal async Task<Result> SendCommand(SessionId id, string method, JObject arg
185198
return await SendCommandInternal(id, method, args, token);
186199
}
187200

188-
private Task<Result> SendCommandInternal(SessionId sessionId, string method, JObject args, CancellationToken token)
201+
private async Task<Result> SendCommandInternal(SessionId sessionId, string method, JObject args, CancellationToken token)
189202
{
190203
int id = Interlocked.Increment(ref next_cmd_id);
191204

@@ -203,17 +216,17 @@ private Task<Result> SendCommandInternal(SessionId sessionId, string method, JOb
203216
//Log ("verbose", $"add cmd id {sessionId}-{id}");
204217
pending_cmds[msgId] = tcs;
205218

206-
Send(this.browser, o, token);
207-
return tcs.Task;
219+
await Send(browser, o, token);
220+
return await tcs.Task;
208221
}
209222

210-
public void SendEvent(SessionId sessionId, string method, JObject args, CancellationToken token)
223+
public Task SendEvent(SessionId sessionId, string method, JObject args, CancellationToken token)
211224
{
212225
//Log ("verbose", $"sending event {method}: {args}");
213-
SendEventInternal(sessionId, method, args, token);
226+
return SendEventInternal(sessionId, method, args, token);
214227
}
215228

216-
private void SendEventInternal(SessionId sessionId, string method, JObject args, CancellationToken token)
229+
private Task SendEventInternal(SessionId sessionId, string method, JObject args, CancellationToken token)
217230
{
218231
var o = JObject.FromObject(new
219232
{
@@ -223,21 +236,21 @@ private void SendEventInternal(SessionId sessionId, string method, JObject args,
223236
if (sessionId.sessionId != null)
224237
o["sessionId"] = sessionId.sessionId;
225238

226-
Send(this.ide, o, token);
239+
return Send(ide, o, token);
227240
}
228241

229242
internal void SendResponse(MessageId id, Result result, CancellationToken token)
230243
{
231244
SendResponseInternal(id, result, token);
232245
}
233246

234-
private void SendResponseInternal(MessageId id, Result result, CancellationToken token)
247+
private Task SendResponseInternal(MessageId id, Result result, CancellationToken token)
235248
{
236249
JObject o = result.ToJObject(id);
237250
if (!result.IsOk)
238251
logger.LogError($"sending error response for id: {id} -> {result}");
239252

240-
Send(this.ide, o, token);
253+
return Send(this.ide, o, token);
241254
}
242255

243256
// , HttpContext context)
@@ -257,10 +270,14 @@ public async Task Run(Uri browserUri, WebSocket ideSocket)
257270
Log("verbose", $"DevToolsProxy: Client connected on {browserUri}");
258271
var x = new CancellationTokenSource();
259272

273+
List<Task> pending_ops = new();
274+
260275
pending_ops.Add(ReadOne(browser, x.Token));
261276
pending_ops.Add(ReadOne(ide, x.Token));
262277
pending_ops.Add(side_exception.Task);
263278
pending_ops.Add(client_initiated_close.Task);
279+
Task<bool> readerTask = _channelReader.WaitToReadAsync(x.Token).AsTask();
280+
pending_ops.Add(readerTask);
264281

265282
try
266283
{
@@ -277,14 +294,26 @@ public async Task Run(Uri browserUri, WebSocket ideSocket)
277294
break;
278295
}
279296

297+
if (readerTask.IsCompleted)
298+
{
299+
while (_channelReader.TryRead(out Task newTask))
300+
{
301+
pending_ops.Add(newTask);
302+
}
303+
304+
pending_ops[4] = _channelReader.WaitToReadAsync(x.Token).AsTask();
305+
}
306+
280307
//logger.LogTrace ("pump {0} {1}", task, pending_ops.IndexOf (task));
281308
if (completedTask == pending_ops[0])
282309
{
283310
string msg = ((Task<string>)completedTask).Result;
284311
if (msg != null)
285312
{
286313
pending_ops[0] = ReadOne(browser, x.Token); //queue next read
287-
ProcessBrowserMessage(msg, x.Token);
314+
Task newTask = ProcessBrowserMessage(msg, x.Token);
315+
if (newTask != null)
316+
pending_ops.Add(newTask);
288317
}
289318
}
290319
else if (completedTask == pending_ops[1])
@@ -293,7 +322,9 @@ public async Task Run(Uri browserUri, WebSocket ideSocket)
293322
if (msg != null)
294323
{
295324
pending_ops[1] = ReadOne(ide, x.Token); //queue next read
296-
ProcessIdeMessage(msg, x.Token);
325+
Task newTask = ProcessIdeMessage(msg, x.Token);
326+
if (newTask != null)
327+
pending_ops.Add(newTask);
297328
}
298329
}
299330
else if (completedTask == pending_ops[2])
@@ -313,10 +344,13 @@ public async Task Run(Uri browserUri, WebSocket ideSocket)
313344
}
314345
}
315346
}
347+
348+
_channelWriter.Complete();
316349
}
317350
catch (Exception e)
318351
{
319352
Log("error", $"DevToolsProxy::Run: Exception {e}");
353+
_channelWriter.Complete(e);
320354
//throw;
321355
}
322356
finally

src/mono/wasm/debugger/BrowserDebugProxy/MonoProxy.cs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ protected override async Task<bool> AcceptEvent(SessionId sessionId, string meth
139139

140140
case "Runtime.executionContextCreated":
141141
{
142-
SendEvent(sessionId, method, args, token);
142+
await SendEvent(sessionId, method, args, token);
143143
JToken ctx = args?["context"];
144144
var aux_data = ctx?["auxData"] as JObject;
145145
int id = ctx["id"].Value<int>();
@@ -986,7 +986,7 @@ private async Task<bool> SendCallStack(SessionId sessionId, ExecutionContext con
986986
await SendCommand(sessionId, "Debugger.resume", new JObject(), token);
987987
return true;
988988
}
989-
SendEvent(sessionId, "Debugger.paused", o, token);
989+
await SendEvent(sessionId, "Debugger.paused", o, token);
990990

991991
return true;
992992

@@ -1103,7 +1103,7 @@ internal async Task<MethodInfo> LoadSymbolsOnDemand(AssemblyInfo asm, int method
11031103
foreach (SourceFile source in asm.Sources)
11041104
{
11051105
var scriptSource = JObject.FromObject(source.ToScriptSource(context.Id, context.AuxData));
1106-
SendEvent(sessionId, "Debugger.scriptParsed", scriptSource, token);
1106+
await SendEvent(sessionId, "Debugger.scriptParsed", scriptSource, token);
11071107
}
11081108
return asm.GetMethodByToken(method_token);
11091109
}
@@ -1333,7 +1333,7 @@ private async Task OnSourceFileAdded(SessionId sessionId, SourceFile source, Exe
13331333
{
13341334
JObject scriptSource = JObject.FromObject(source.ToScriptSource(context.Id, context.AuxData));
13351335
Log("debug", $"sending {source.Url} {context.Id} {sessionId.sessionId}");
1336-
SendEvent(sessionId, "Debugger.scriptParsed", scriptSource, token);
1336+
await SendEvent(sessionId, "Debugger.scriptParsed", scriptSource, token);
13371337

13381338
foreach (var req in context.BreakpointRequests.Values)
13391339
{
@@ -1412,7 +1412,7 @@ private async Task<DebugStore> RuntimeReady(SessionId sessionId, CancellationTok
14121412

14131413
DebugStore store = await LoadStore(sessionId, token);
14141414
context.ready.SetResult(store);
1415-
SendEvent(sessionId, "Mono.runtimeReady", new JObject(), token);
1415+
await SendEvent(sessionId, "Mono.runtimeReady", new JObject(), token);
14161416
context.SdbAgent.ResetStore(store);
14171417
return store;
14181418
}
@@ -1502,7 +1502,7 @@ private async Task SetBreakpoint(SessionId sessionId, DebugStore store, Breakpoi
15021502
};
15031503

15041504
if (sendResolvedEvent)
1505-
SendEvent(sessionId, "Debugger.breakpointResolved", JObject.FromObject(resolvedLocation), token);
1505+
await SendEvent(sessionId, "Debugger.breakpointResolved", JObject.FromObject(resolvedLocation), token);
15061506
}
15071507

15081508
req.Locations.AddRange(breakpoints);

0 commit comments

Comments
 (0)