Skip to content

Commit

Permalink
[release/6.0] [wasm][debugger] Fixing the race condition while modify…
Browse files Browse the repository at this point in the history
…ing pending_ops (dotnet#80190)

* applying diff.

* fix merge
  • Loading branch information
thaystg authored Jan 6, 2023
1 parent f47df0c commit 2e45bc7
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 27 deletions.
76 changes: 55 additions & 21 deletions src/mono/wasm/debugger/BrowserDebugProxy/DevToolsProxy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using System.Net.WebSockets;
using System.Text;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
Expand All @@ -24,14 +25,19 @@ internal class DevToolsProxy
private ClientWebSocket browser;
private WebSocket ide;
private int next_cmd_id;
private List<Task> pending_ops = new List<Task>();
private readonly ChannelWriter<Task> _channelWriter;
private readonly ChannelReader<Task> _channelReader;
private List<DevToolsQueue> queues = new List<DevToolsQueue>();

protected readonly ILogger logger;

public DevToolsProxy(ILoggerFactory loggerFactory)
{
logger = loggerFactory.CreateLogger<DevToolsProxy>();

var channel = Channel.CreateUnbounded<Task>(new UnboundedChannelOptions { SingleReader = true });
_channelWriter = channel.Writer;
_channelReader = channel.Reader;
}

protected virtual Task<bool> AcceptEvent(SessionId sessionId, string method, JObject args, CancellationToken token)
Expand Down Expand Up @@ -82,7 +88,7 @@ private DevToolsQueue GetQueueForTask(Task task)
return queues.FirstOrDefault(q => q.CurrentSend == task);
}

private void Send(WebSocket to, JObject o, CancellationToken token)
private async Task Send(WebSocket to, JObject o, CancellationToken token)
{
string sender = browser == to ? "Send-browser" : "Send-ide";

Expand All @@ -95,7 +101,7 @@ private void Send(WebSocket to, JObject o, CancellationToken token)

Task task = queue.Send(bytes, token);
if (task != null)
pending_ops.Add(task);
await _channelWriter.WriteAsync(task, token);
}

private async Task OnEvent(SessionId sessionId, string method, JObject args, CancellationToken token)
Expand All @@ -105,7 +111,7 @@ private async Task OnEvent(SessionId sessionId, string method, JObject args, Can
if (!await AcceptEvent(sessionId, method, args, token))
{
//logger.LogDebug ("proxy browser: {0}::{1}",method, args);
SendEventInternal(sessionId, method, args, token);
await SendEventInternal(sessionId, method, args, token);
}
}
catch (Exception e)
Expand All @@ -121,7 +127,7 @@ private async Task OnCommand(MessageId id, string method, JObject args, Cancella
if (!await AcceptCommand(id, method, args, token))
{
Result res = await SendCommandInternal(id, method, args, token);
SendResponseInternal(id, res, token);
await SendResponseInternal(id, res, token);
}
}
catch (Exception e)
Expand All @@ -142,7 +148,7 @@ private void OnResponse(MessageId id, Result result)
logger.LogError("Cannot respond to command: {id} with result: {result} - command is not pending", id, result);
}

private void ProcessBrowserMessage(string msg, CancellationToken token)
private Task ProcessBrowserMessage(string msg, CancellationToken token)
{
var res = JObject.Parse(msg);

Expand All @@ -151,23 +157,30 @@ private void ProcessBrowserMessage(string msg, CancellationToken token)
Log("protocol", $"browser: {msg}");

if (res["id"] == null)
pending_ops.Add(OnEvent(res.ToObject<SessionId>(), res["method"].Value<string>(), res["params"] as JObject, token));
{
return OnEvent(res.ToObject<SessionId>(), res["method"].Value<string>(), res["params"] as JObject, token);
}
else
{
OnResponse(res.ToObject<MessageId>(), Result.FromJson(res));
return null;
}
}

private void ProcessIdeMessage(string msg, CancellationToken token)
private Task ProcessIdeMessage(string msg, CancellationToken token)
{
Log("protocol", $"ide: {msg}");
if (!string.IsNullOrEmpty(msg))
{
var res = JObject.Parse(msg);
var id = res.ToObject<MessageId>();
pending_ops.Add(OnCommand(
return OnCommand(
id,
res["method"].Value<string>(),
res["params"] as JObject, token));
res["params"] as JObject, token);
}

return null;
}

internal async Task<Result> SendCommand(SessionId id, string method, JObject args, CancellationToken token)
Expand All @@ -176,7 +189,7 @@ internal async Task<Result> SendCommand(SessionId id, string method, JObject arg
return await SendCommandInternal(id, method, args, token);
}

private Task<Result> SendCommandInternal(SessionId sessionId, string method, JObject args, CancellationToken token)
private async Task<Result> SendCommandInternal(SessionId sessionId, string method, JObject args, CancellationToken token)
{
int id = Interlocked.Increment(ref next_cmd_id);

Expand All @@ -194,17 +207,17 @@ private Task<Result> SendCommandInternal(SessionId sessionId, string method, JOb
//Log ("verbose", $"add cmd id {sessionId}-{id}");
pending_cmds[msgId] = tcs;

Send(this.browser, o, token);
return tcs.Task;
await Send(browser, o, token);
return await tcs.Task;
}

public void SendEvent(SessionId sessionId, string method, JObject args, CancellationToken token)
public Task SendEvent(SessionId sessionId, string method, JObject args, CancellationToken token)
{
//Log ("verbose", $"sending event {method}: {args}");
SendEventInternal(sessionId, method, args, token);
return SendEventInternal(sessionId, method, args, token);
}

private void SendEventInternal(SessionId sessionId, string method, JObject args, CancellationToken token)
private Task SendEventInternal(SessionId sessionId, string method, JObject args, CancellationToken token)
{
var o = JObject.FromObject(new
{
Expand All @@ -214,21 +227,21 @@ private void SendEventInternal(SessionId sessionId, string method, JObject args,
if (sessionId.sessionId != null)
o["sessionId"] = sessionId.sessionId;

Send(this.ide, o, token);
return Send(ide, o, token);
}

internal void SendResponse(MessageId id, Result result, CancellationToken token)
{
SendResponseInternal(id, result, token);
}

private void SendResponseInternal(MessageId id, Result result, CancellationToken token)
private Task SendResponseInternal(MessageId id, Result result, CancellationToken token)
{
JObject o = result.ToJObject(id);
if (result.IsErr)
logger.LogError($"sending error response for id: {id} -> {result}");

Send(this.ide, o, token);
return Send(this.ide, o, token);
}

// , HttpContext context)
Expand All @@ -248,10 +261,14 @@ public async Task Run(Uri browserUri, WebSocket ideSocket)
Log("verbose", $"DevToolsProxy: Client connected on {browserUri}");
var x = new CancellationTokenSource();

List<Task> pending_ops = new();

pending_ops.Add(ReadOne(browser, x.Token));
pending_ops.Add(ReadOne(ide, x.Token));
pending_ops.Add(side_exception.Task);
pending_ops.Add(client_initiated_close.Task);
Task<bool> readerTask = _channelReader.WaitToReadAsync(x.Token).AsTask();
pending_ops.Add(readerTask);

try
{
Expand All @@ -268,14 +285,26 @@ public async Task Run(Uri browserUri, WebSocket ideSocket)
break;
}

if (readerTask.IsCompleted)
{
while (_channelReader.TryRead(out Task newTask))
{
pending_ops.Add(newTask);
}

pending_ops[4] = _channelReader.WaitToReadAsync(x.Token).AsTask();
}

//logger.LogTrace ("pump {0} {1}", task, pending_ops.IndexOf (task));
if (task == pending_ops[0])
{
string msg = ((Task<string>)task).Result;
if (msg != null)
{
pending_ops[0] = ReadOne(browser, x.Token); //queue next read
ProcessBrowserMessage(msg, x.Token);
Task newTask = ProcessBrowserMessage(msg, x.Token);
if (newTask != null)
pending_ops.Add(newTask);
}
}
else if (task == pending_ops[1])
Expand All @@ -284,7 +313,9 @@ public async Task Run(Uri browserUri, WebSocket ideSocket)
if (msg != null)
{
pending_ops[1] = ReadOne(ide, x.Token); //queue next read
ProcessIdeMessage(msg, x.Token);
Task newTask = ProcessIdeMessage(msg, x.Token);
if (newTask != null)
pending_ops.Add(newTask);
}
}
else if (task == pending_ops[2])
Expand All @@ -304,10 +335,13 @@ public async Task Run(Uri browserUri, WebSocket ideSocket)
}
}
}

_channelWriter.Complete();
}
catch (Exception e)
{
Log("error", $"DevToolsProxy::Run: Exception {e}");
_channelWriter.Complete(e);
//throw;
}
finally
Expand Down
12 changes: 6 additions & 6 deletions src/mono/wasm/debugger/BrowserDebugProxy/MonoProxy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ protected override async Task<bool> AcceptEvent(SessionId sessionId, string meth

case "Runtime.executionContextCreated":
{
SendEvent(sessionId, method, args, token);
await SendEvent(sessionId, method, args, token);
JToken ctx = args?["context"];
var aux_data = ctx?["auxData"] as JObject;
int id = ctx["id"].Value<int>();
Expand Down Expand Up @@ -831,7 +831,7 @@ private async Task<bool> SendCallStack(SessionId sessionId, ExecutionContext con
await SendCommand(sessionId, "Debugger.resume", new JObject(), token);
return true;
}
SendEvent(sessionId, "Debugger.paused", o, token);
await SendEvent(sessionId, "Debugger.paused", o, token);

return true;
}
Expand Down Expand Up @@ -940,7 +940,7 @@ internal async Task<MethodInfo> LoadSymbolsOnDemand(AssemblyInfo asm, int method
foreach (SourceFile source in asm.Sources)
{
var scriptSource = JObject.FromObject(source.ToScriptSource(context.Id, context.AuxData));
SendEvent(sessionId, "Debugger.scriptParsed", scriptSource, token);
await SendEvent(sessionId, "Debugger.scriptParsed", scriptSource, token);
}
return asm.GetMethodByToken(method_token);
}
Expand Down Expand Up @@ -1171,7 +1171,7 @@ private async Task OnSourceFileAdded(SessionId sessionId, SourceFile source, Exe
{
JObject scriptSource = JObject.FromObject(source.ToScriptSource(context.Id, context.AuxData));
Log("debug", $"sending {source.Url} {context.Id} {sessionId.sessionId}");
SendEvent(sessionId, "Debugger.scriptParsed", scriptSource, token);
await SendEvent(sessionId, "Debugger.scriptParsed", scriptSource, token);

foreach (var req in context.BreakpointRequests.Values)
{
Expand Down Expand Up @@ -1253,7 +1253,7 @@ private async Task<DebugStore> RuntimeReady(SessionId sessionId, CancellationTok

DebugStore store = await LoadStore(sessionId, token);
context.ready.SetResult(store);
SendEvent(sessionId, "Mono.runtimeReady", new JObject(), token);
await SendEvent(sessionId, "Mono.runtimeReady", new JObject(), token);
SdbHelper.ResetStore(store);
return store;
}
Expand Down Expand Up @@ -1340,7 +1340,7 @@ private async Task SetBreakpoint(SessionId sessionId, DebugStore store, Breakpoi
};

if (sendResolvedEvent)
SendEvent(sessionId, "Debugger.breakpointResolved", JObject.FromObject(resolvedLocation), token);
await SendEvent(sessionId, "Debugger.breakpointResolved", JObject.FromObject(resolvedLocation), token);
}

req.Locations.AddRange(breakpoints);
Expand Down

0 comments on commit 2e45bc7

Please sign in to comment.