Skip to content

Commit

Permalink
Fix Language Worker Channels disposing (Azure#4102)
Browse files Browse the repository at this point in the history
  • Loading branch information
pragnagopa authored Feb 19, 2019
1 parent b99edf7 commit 287a270
Show file tree
Hide file tree
Showing 17 changed files with 109 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,13 @@

namespace Microsoft.Azure.WebJobs.Script.Eventing
{
internal class RpcChannelReadyEvent : RpcChannelEvent
/// <summary>
/// RpcWebHostChannelReadyEvent is published when a language worker channel is started at the
/// Webhost/Application level. LanguageWorkerChannelManager keeps track that channel created
/// </summary>
internal class RpcWebHostChannelReadyEvent : RpcChannelEvent
{
internal RpcChannelReadyEvent(string id, string language, ILanguageWorkerChannel languageWorkerChannel,
internal RpcWebHostChannelReadyEvent(string id, string language, ILanguageWorkerChannel languageWorkerChannel,
string version, IDictionary<string, string> capabilities)
: base(id)
{
Expand Down
2 changes: 0 additions & 2 deletions src/WebJobs.Script/Host/ScriptHost.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ public class ScriptHost : JobHost, IScriptJobHost

private IList<IDisposable> _eventSubscriptions = new List<IDisposable>();
private IFunctionDispatcher _functionDispatcher;
private IProcessRegistry _processRegistry = new EmptyProcessRegistry();

// Specify the "builtin binding types". These are types that are directly accesible without needing an explicit load gesture.
// This is the set of bindings we shipped prior to binding extensibility.
Expand Down Expand Up @@ -899,7 +898,6 @@ protected override void Dispose(bool disposing)
}

_functionDispatcher?.Dispose();
(_processRegistry as IDisposable)?.Dispose();

foreach (var function in Functions)
{
Expand Down
28 changes: 19 additions & 9 deletions src/WebJobs.Script/Rpc/FunctionRegistration/FunctionDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ namespace Microsoft.Azure.WebJobs.Script.Rpc
internal class FunctionDispatcher : IFunctionDispatcher
{
private readonly IMetricsLogger _metricsLogger;
private readonly ILoggerFactory _loggerFactory;
private readonly ILogger _logger;
private IScriptEventManager _eventManager;
private IEnumerable<WorkerConfig> _workerConfigs;
private CreateChannel _channelFactory;
Expand All @@ -42,8 +42,8 @@ public FunctionDispatcher(IOptions<ScriptJobHostOptions> scriptHostOptions,
_scriptOptions = scriptHostOptions.Value;
_languageWorkerChannelManager = languageWorkerChannelManager;
_eventManager = eventManager;
_loggerFactory = loggerFactory;
_workerConfigs = languageWorkerOptions.Value.WorkerConfigs;
_logger = loggerFactory.CreateLogger(ScriptConstants.LogCategoryFunctionDispatcher);

_workerErrorSubscription = _eventManager.OfType<WorkerErrorEvent>()
.Subscribe(WorkerError);
Expand All @@ -59,7 +59,7 @@ internal CreateChannel ChannelFactory
{
_channelFactory = (language, registrations, attemptCount) =>
{
var languageWorkerChannel = _languageWorkerChannelManager.CreateLanguageWorkerChannel(Guid.NewGuid().ToString(), _scriptOptions.RootScriptPath, language, registrations, _metricsLogger, attemptCount);
var languageWorkerChannel = _languageWorkerChannelManager.CreateLanguageWorkerChannel(Guid.NewGuid().ToString(), _scriptOptions.RootScriptPath, language, registrations, _metricsLogger, attemptCount, false);
languageWorkerChannel.StartWorkerProcess();
return languageWorkerChannel;
};
Expand Down Expand Up @@ -94,18 +94,19 @@ public LanguageWorkerState CreateWorkerStateWithExistingChannel(string language,
public void Initialize(string workerRuntime, IEnumerable<FunctionMetadata> functions)
{
_languageWorkerChannelManager.ShutdownStandbyChannels(functions);

workerRuntime = workerRuntime ?? Utility.GetWorkerRuntime(functions);

if (Utility.IsSupportedRuntime(workerRuntime, _workerConfigs))
{
ILanguageWorkerChannel initializedChannel = _languageWorkerChannelManager.GetChannel(workerRuntime);
if (initializedChannel != null)
{
_logger.LogDebug($"Found initialized language worker channel for runtime: {0}", workerRuntime);
CreateWorkerStateWithExistingChannel(workerRuntime, initializedChannel);
}
else
{
_logger.LogDebug($"Creating new language worker channel for runtime:{0}", workerRuntime);
CreateWorkerState(workerRuntime);
}
}
Expand All @@ -129,26 +130,31 @@ public void WorkerError(WorkerErrorEvent workerError)
{
if (_workerStates.TryGetValue(workerError.Language, out LanguageWorkerState erroredWorkerState))
{
_logger.LogDebug($"Handling WorkerErrorEvent for runtime:{workerError.Language}");
erroredWorkerState.Errors.Add(workerError.Exception);
bool isPreInitializedChannel = _languageWorkerChannelManager.ShutdownChannelIfExists(workerError.Language);
if (!isPreInitializedChannel)
{
_logger.LogDebug($"Disposing errored channel for workerId: {0}, for runtime:{1}", erroredWorkerState.Channel.Id, workerError.Language);
erroredWorkerState.Channel.Dispose();
}
_logger.LogDebug($"Restarting worker channel for runtime:{0}", workerError.Language);
RestartWorkerChannel(workerError.Language, erroredWorkerState);
}
}

private void RestartWorkerChannel(string language, LanguageWorkerState erroredWorkerState)
private void RestartWorkerChannel(string runtime, LanguageWorkerState erroredWorkerState)
{
if (erroredWorkerState.Errors.Count < 3)
{
erroredWorkerState.Channel = CreateNewChannelWithExistingWorkerState(language, erroredWorkerState);
_workerStates[language] = erroredWorkerState;
_logger.LogDebug("retrying process start");
erroredWorkerState.Channel = CreateNewChannelWithExistingWorkerState(runtime, erroredWorkerState);
_workerStates[runtime] = erroredWorkerState;
}
else
{
PublishWorkerProcessErrorEvent(language, erroredWorkerState);
_logger.LogDebug($"Exceeded language worker restart retry count for runtime:{0}", runtime);
PublishWorkerProcessErrorEvent(runtime, erroredWorkerState);
}
}

Expand Down Expand Up @@ -190,7 +196,11 @@ protected virtual void Dispose(bool disposing)
foreach (var pair in _workerStates)
{
// TODO #3296 - send WorkerTerminate message to shut down language worker process gracefully (instead of just a killing)
pair.Value.Channel.Dispose();
// WebhostLanguageWorkerChannels life time is managed by LanguageWorkerChannelManager
if (!pair.Value.Channel.IsWebhostChannel)
{
pair.Value.Channel.Dispose();
}
pair.Value.Functions.Dispose();
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/WebJobs.Script/Rpc/ILanguageWorkerChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ public interface ILanguageWorkerChannel : IDisposable
{
string Id { get; }

bool IsWebhostChannel { get; }

WorkerConfig Config { get; }

void RegisterFunctions(IObservable<FunctionRegistrationContext> functionRegistrations);
Expand Down
2 changes: 1 addition & 1 deletion src/WebJobs.Script/Rpc/ILanguageWorkerChannelManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,6 @@ public interface ILanguageWorkerChannelManager

void ShutdownChannels();

ILanguageWorkerChannel CreateLanguageWorkerChannel(string workerId, string scriptRootPath, string language, IObservable<FunctionRegistrationContext> functionRegistrations, IMetricsLogger metricsLogger, int attemptCount);
ILanguageWorkerChannel CreateLanguageWorkerChannel(string workerId, string scriptRootPath, string language, IObservable<FunctionRegistrationContext> functionRegistrations, IMetricsLogger metricsLogger, int attemptCount, bool isWebhostChannel);
}
}
29 changes: 21 additions & 8 deletions src/WebJobs.Script/Rpc/LanguageWorkerChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ internal class LanguageWorkerChannel : ILanguageWorkerChannel
private readonly ILanguageWorkerConsoleLogSource _consoleLogSource;

private bool _disposed;
private bool _disposing;
private bool _isWebHostChannel;
private IObservable<FunctionRegistrationContext> _functionRegistrations;
private WorkerInitResponse _initMessage;
private string _workerId;
Expand Down Expand Up @@ -68,7 +70,8 @@ internal LanguageWorkerChannel(
ILoggerFactory loggerFactory,
IMetricsLogger metricsLogger,
int attemptCount,
ILanguageWorkerConsoleLogSource consoleLogSource)
ILanguageWorkerConsoleLogSource consoleLogSource,
bool isWebHostChannel = false)
{
_workerId = workerId;
_functionRegistrations = functionRegistrations;
Expand All @@ -78,6 +81,7 @@ internal LanguageWorkerChannel(
_processRegistry = processRegistry;
_workerConfig = workerConfig;
_serverUri = serverUri;
_isWebHostChannel = isWebHostChannel;
_workerChannelLogger = loggerFactory.CreateLogger($"Worker.{workerConfig.Language}.{_workerId}");
_consoleLogSource = consoleLogSource;

Expand Down Expand Up @@ -110,6 +114,8 @@ internal LanguageWorkerChannel(

internal Process WorkerProcess => _process;

public bool IsWebhostChannel => _isWebHostChannel;

internal void StartProcess()
{
try
Expand Down Expand Up @@ -151,6 +157,11 @@ private void OnOutputDataReceived(object sender, DataReceivedEventArgs e)

private void OnProcessExited(object sender, EventArgs e)
{
if (_disposing)
{
// No action needed
return;
}
string exceptionMessage = string.Join(",", _processStdErrDataQueue.Where(s => !string.IsNullOrEmpty(s)));
try
{
Expand Down Expand Up @@ -248,7 +259,7 @@ internal void SendWorkerInitRequest(RpcEvent startEvent)
_inboundWorkerEvents.Where(msg => msg.MessageType == MsgType.WorkerInitResponse)
.Timeout(workerInitTimeout)
.Take(1)
.Subscribe(PublishRpcChannelReadyEvent, HandleWorkerError);
.Subscribe(PublishWebhostRpcChannelReadyEvent, HandleWorkerError);

SendStreamingMessage(new StreamingMessage
{
Expand All @@ -265,7 +276,7 @@ internal void PublishWorkerProcessReadyEvent(FunctionEnvironmentReloadResponse r
_eventManager.Publish(wpEvent);
}

internal void PublishRpcChannelReadyEvent(RpcEvent initEvent)
internal void PublishWebhostRpcChannelReadyEvent(RpcEvent initEvent)
{
_startLatencyMetric?.Dispose();
_startLatencyMetric = null;
Expand All @@ -276,11 +287,12 @@ internal void PublishRpcChannelReadyEvent(RpcEvent initEvent)
HandleWorkerError(exc);
return;
}

RpcChannelReadyEvent readyEvent = new RpcChannelReadyEvent(_workerId, _workerConfig.Language, this, _initMessage.WorkerVersion, _initMessage.Capabilities);
_eventManager.Publish(readyEvent);

if (_functionRegistrations != null)
if (_functionRegistrations == null)
{
RpcWebHostChannelReadyEvent readyEvent = new RpcWebHostChannelReadyEvent(_workerId, _workerConfig.Language, this, _initMessage.WorkerVersion, _initMessage.Capabilities);
_eventManager.Publish(readyEvent);
}
else
{
RegisterFunctions(_functionRegistrations);
}
Expand Down Expand Up @@ -516,6 +528,7 @@ protected virtual void Dispose(bool disposing)

public void Dispose()
{
_disposing = true;
Dispose(true);
}
}
Expand Down
18 changes: 10 additions & 8 deletions src/WebJobs.Script/Rpc/LanguageWorkerChannelManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public LanguageWorkerChannelManager(IScriptEventManager eventManager, IEnvironme
_environment = environment ?? throw new ArgumentNullException(nameof(environment));
_eventManager = eventManager;
_loggerFactory = loggerFactory;
_logger = loggerFactory.CreateLogger(ScriptConstants.LanguageWorkerChannelManager);
_logger = loggerFactory.CreateLogger(ScriptConstants.LogCategoryLanguageWorkerChannelManager);
_workerConfigs = languageWorkerOptions.Value.WorkerConfigs;
_applicationHostOptions = applicationHostOptions;
_consoleLogSource = consoleLogSource;
Expand All @@ -60,11 +60,11 @@ public LanguageWorkerChannelManager(IScriptEventManager eventManager, IEnvironme

_shutdownStandbyWorkerChannels = ScheduleShutdownStandbyChannels;
_shutdownStandbyWorkerChannels = _shutdownStandbyWorkerChannels.Debounce(5000);
_rpcChannelReadySubscriptions = _eventManager.OfType<RpcChannelReadyEvent>()
_rpcChannelReadySubscriptions = _eventManager.OfType<RpcWebHostChannelReadyEvent>()
.Subscribe(AddOrUpdateWorkerChannels);
}

public ILanguageWorkerChannel CreateLanguageWorkerChannel(string workerId, string scriptRootPath, string language, IObservable<FunctionRegistrationContext> functionRegistrations, IMetricsLogger metricsLogger, int attemptCount)
public ILanguageWorkerChannel CreateLanguageWorkerChannel(string workerId, string scriptRootPath, string language, IObservable<FunctionRegistrationContext> functionRegistrations, IMetricsLogger metricsLogger, int attemptCount, bool isWebhostChannel = false)
{
var languageWorkerConfig = _workerConfigs.Where(c => c.Language.Equals(language, StringComparison.OrdinalIgnoreCase)).FirstOrDefault();
if (languageWorkerConfig == null)
Expand All @@ -83,7 +83,8 @@ public ILanguageWorkerChannel CreateLanguageWorkerChannel(string workerId, strin
_loggerFactory,
metricsLogger,
attemptCount,
_consoleLogSource);
_consoleLogSource,
isWebhostChannel);
}

public async Task InitializeChannelAsync(string runtime)
Expand All @@ -98,12 +99,12 @@ private async Task InitializeLanguageWorkerChannel(string language, string scrip
{
string workerId = Guid.NewGuid().ToString();
_logger.LogInformation("Creating language worker channel for runtime:{runtime}", language);
ILanguageWorkerChannel languageWorkerChannel = CreateLanguageWorkerChannel(workerId, scriptRootPath, language, null, null, 0);
ILanguageWorkerChannel languageWorkerChannel = CreateLanguageWorkerChannel(workerId, scriptRootPath, language, null, null, 0, true);
languageWorkerChannel.StartWorkerProcess();
IObservable<RpcChannelReadyEvent> rpcChannelReadyEvent = _eventManager.OfType<RpcChannelReadyEvent>()
IObservable<RpcWebHostChannelReadyEvent> rpcChannelReadyEvent = _eventManager.OfType<RpcWebHostChannelReadyEvent>()
.Where(msg => msg.Language == language).Timeout(workerInitTimeout);
// Wait for response from language worker process
RpcChannelReadyEvent readyEvent = await rpcChannelReadyEvent.FirstAsync();
RpcWebHostChannelReadyEvent readyEvent = await rpcChannelReadyEvent.FirstAsync();
}
catch (Exception ex)
{
Expand Down Expand Up @@ -198,9 +199,10 @@ public void ShutdownChannels()
_workerChannels[runtime]?.Dispose();
}
_workerChannels.Clear();
(_processRegistry as IDisposable)?.Dispose();
}

private void AddOrUpdateWorkerChannels(RpcChannelReadyEvent rpcChannelReadyEvent)
private void AddOrUpdateWorkerChannels(RpcWebHostChannelReadyEvent rpcChannelReadyEvent)
{
_logger.LogInformation("Adding language worker channel for runtime: {language}.", rpcChannelReadyEvent.Language);
_workerChannels.Add(rpcChannelReadyEvent.Language, rpcChannelReadyEvent.LanguageWorkerChannel);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Diagnostics;

namespace Microsoft.Azure.WebJobs.Script.Rpc
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
namespace Microsoft.Azure.WebJobs.Script.Rpc
{
// Registers processes on windows with a job object to ensure disposal after parent exit
internal class JobObjectRegistry : IDisposable, IProcessRegistry
internal class JobObjectRegistry : IProcessRegistry, IDisposable
{
private IntPtr _handle;
private bool _disposed = false;
Expand Down
3 changes: 2 additions & 1 deletion src/WebJobs.Script/ScriptConstants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,9 @@ public static class ScriptConstants
public const string LogCategoryFunction = "Function";
public const string LogCategoryWorker = "Worker";
public const string LogCategoryRpcInitializationService = "Host.RpcInitializationService";
public const string LanguageWorkerChannelManager = "Host.LanguageWorkerChannelManager";
public const string LogCategoryLanguageWorkerChannelManager = "Host.LanguageWorkerChannelManager";
public const string LogCategoryFunctionRpcService = "Host.FunctionRpcService";
public const string LogCategoryFunctionDispatcher = "Host.FunctionDispatcher";

public const string SkipHostJsonConfigurationKey = "MS_SkipHostJsonConfiguration";
public const string SkipHostInitializationKey = "MS_SkipHostInitialization";
Expand Down
2 changes: 1 addition & 1 deletion src/WebJobs.Script/Utility.cs
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ internal static void AddFunctionError(IDictionary<string, ICollection<string>> f
functionName = isFunctionShortName ? functionName : Utility.GetFunctionShortName(functionName);

ICollection<string> functionErrorCollection = new Collection<string>();
if (!functionErrors.TryGetValue(functionName, out functionErrorCollection))
if (!string.IsNullOrEmpty(functionName) && !functionErrors.TryGetValue(functionName, out functionErrorCollection))
{
functionErrors[functionName] = functionErrorCollection = new Collection<string>();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ public void InitializeAsync_WorkerRuntime_Node_DoNotInitialize_JavaWorker()
{
var javaChannel = _languageWorkerChannelManager.GetChannel(LanguageWorkerConstants.JavaLanguageWorkerName);
Assert.Null(javaChannel);
var nodeChannel = _languageWorkerChannelManager.GetChannel(LanguageWorkerConstants.NodeLanguageWorkerName);
Assert.Null(nodeChannel);
}

// Get response with default ObjectResult content negotiation enabled
Expand Down
6 changes: 5 additions & 1 deletion test/WebJobs.Script.Tests.Integration/TestFunctionHost.cs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ public TestFunctionHost(string scriptPath, string logPath,

var manager = _testServer.Host.Services.GetService<IScriptHostManager>();
_hostService = manager as WebJobsScriptHostService;

StartAsync().GetAwaiter().GetResult();
}

Expand Down Expand Up @@ -122,6 +121,11 @@ public async Task<string> GetFunctionSecretAsync(string functionName)
return secrets.First().Value;
}

public async Task RestartAsync(CancellationToken cancellationToken)
{
await _hostService.RestartHostAsync(cancellationToken);
}

private async Task StartAsync()
{
bool running = false;
Expand Down
Loading

0 comments on commit 287a270

Please sign in to comment.