Skip to content

Commit

Permalink
[Rpc]Add processOptions in worker.config (Azure#6806)
Browse files Browse the repository at this point in the history
  • Loading branch information
pragnagopa authored Oct 19, 2020
1 parent 2169801 commit 6976eb5
Show file tree
Hide file tree
Showing 9 changed files with 206 additions and 102 deletions.
2 changes: 1 addition & 1 deletion src/WebJobs.Script.WebHost/WebJobs.Script.WebHost.csproj
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
<Project Sdk="Microsoft.NET.Sdk.Web">
<Project Sdk="Microsoft.NET.Sdk.Web">
<Import Project="..\..\build\common.props" />
<Import Project="..\..\build\python.props" />
<PropertyGroup>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;

namespace Microsoft.Azure.WebJobs.Script.Workers
{
public class WorkerProcessCountOptions
{
/// <summary>
/// Gets or sets a value indicating whether to set FUNCTIONS_WORKER_PROCESS_COUNT to number of cpu cores on the host machine
/// </summary>
public bool SetWorkerCountToNumberOfCpuCores { get; set; }

/// <summary>
/// Gets or sets number of worker processes to start. Default process count is 1
/// </summary>
public int ProcessCount { get; set; } = 1;

/// <summary>
/// Gets or sets maximum number worker processes allowed. Default max process count is 10
/// </summary>
public int MaxProcessCount { get; set; } = 10;

/// <summary>
/// Gets or sets interval between process startups. Default 10secs
/// </summary>
public TimeSpan ProcessStartupInterval { get; set; } = TimeSpan.FromSeconds(10);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,7 @@ public class RpcWorkerConfig
public RpcWorkerDescription Description { get; set; }

public WorkerProcessArguments Arguments { get; set; }

public WorkerProcessCountOptions CountOptions { get; set; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Runtime.InteropServices;
using System.Linq;
using System.Text.RegularExpressions;
using Microsoft.Azure.WebJobs.Script.Config;
using Microsoft.Azure.WebJobs.Script.Diagnostics;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
Expand All @@ -23,7 +22,7 @@ internal class RpcWorkerConfigFactory
private readonly IMetricsLogger _metricsLogger;
private readonly IEnvironment _environment;

private Dictionary<string, RpcWorkerDescription> _workerDescripionDictionary = new Dictionary<string, RpcWorkerDescription>();
private Dictionary<string, RpcWorkerConfig> _workerDescripionDictionary = new Dictionary<string, RpcWorkerConfig>();

public RpcWorkerConfigFactory(IConfiguration config, ILogger logger, ISystemRuntimeInformation systemRuntimeInfo, IEnvironment environment, IMetricsLogger metricsLogger)
{
Expand All @@ -48,28 +47,7 @@ public IList<RpcWorkerConfig> GetConfigs()
using (_metricsLogger.LatencyEvent(MetricEventNames.GetConfigs))
{
BuildWorkerProviderDictionary();
var result = new List<RpcWorkerConfig>();

foreach (var description in _workerDescripionDictionary.Values)
{
_logger.LogDebug($"Worker path for language worker {description.Language}: {description.WorkerDirectory}");

var arguments = new WorkerProcessArguments()
{
ExecutablePath = description.DefaultExecutablePath,
WorkerPath = description.DefaultWorkerPath
};

arguments.ExecutableArguments.AddRange(description.Arguments);
var config = new RpcWorkerConfig()
{
Description = description,
Arguments = arguments
};
result.Add(config);
}

return result;
return _workerDescripionDictionary.Values.ToList();
}
}

Expand Down Expand Up @@ -151,8 +129,28 @@ internal void AddProvider(string workerDir)
{
workerDescription.FormatWorkerPathIfNeeded(_systemRuntimeInformation, _environment, _logger);
workerDescription.ThrowIfFileNotExists(workerDescription.DefaultWorkerPath, nameof(workerDescription.DefaultWorkerPath));
_workerDescripionDictionary[workerDescription.Language] = workerDescription;
workerDescription.ExpandEnvironmentVariables();

WorkerProcessCountOptions workerProcessCount = GetWorkerProcessCount(workerConfig);

var arguments = new WorkerProcessArguments()
{
ExecutablePath = workerDescription.DefaultExecutablePath,
WorkerPath = workerDescription.DefaultWorkerPath
};
arguments.ExecutableArguments.AddRange(workerDescription.Arguments);
var config = new RpcWorkerConfig()
{
Description = workerDescription,
Arguments = arguments
};
var rpcWorkerconfig = new RpcWorkerConfig()
{
Description = workerDescription,
Arguments = arguments,
CountOptions = workerProcessCount
};
_workerDescripionDictionary[workerDescription.Language] = rpcWorkerconfig;
_logger.LogDebug($"Added WorkerConfig for language: {workerDescription.Language}");
}
}
Expand All @@ -163,21 +161,41 @@ internal void AddProvider(string workerDir)
}
}

private static Dictionary<string, WorkerDescription> GetWorkerDescriptionProfiles(JObject workerConfig)
internal WorkerProcessCountOptions GetWorkerProcessCount(JObject workerConfig)
{
Dictionary<string, WorkerDescription> descriptionProfiles = new Dictionary<string, WorkerDescription>();
var profiles = workerConfig.Property("profiles")?.Value.ToObject<JObject>();
if (profiles != null)
WorkerProcessCountOptions workerProcessCount = workerConfig.Property(WorkerConstants.ProcessCount)?.Value.ToObject<WorkerProcessCountOptions>();

workerProcessCount = workerProcessCount ?? new WorkerProcessCountOptions();

if (workerProcessCount.SetWorkerCountToNumberOfCpuCores)
{
foreach (var profile in profiles)
{
string name = profile.Key;
JToken value = profile.Value;
WorkerDescription description = profile.Value.ToObject<WorkerDescription>();
descriptionProfiles.Add(name, description);
}
workerProcessCount.ProcessCount = _environment.GetEffectiveCoresCount();
// set Max worker process count to Number of effective cores if MaxProcessCount is less than MinProcessCount
workerProcessCount.MaxProcessCount = workerProcessCount.ProcessCount > workerProcessCount.MaxProcessCount ? workerProcessCount.ProcessCount : workerProcessCount.MaxProcessCount;
}

// Env variable takes precedence over worker.config
string processCountEnvSetting = _environment.GetEnvironmentVariable(RpcWorkerConstants.FunctionsWorkerProcessCountSettingName);
if (!string.IsNullOrEmpty(processCountEnvSetting))
{
workerProcessCount.ProcessCount = int.Parse(processCountEnvSetting) > 1 ? int.Parse(processCountEnvSetting) : 1;
}

// Validate
if (workerProcessCount.ProcessCount <= 0)
{
throw new ArgumentOutOfRangeException($"{nameof(workerProcessCount.ProcessCount)}", "ProcessCount must be greater than 0.");
}
if (workerProcessCount.ProcessCount > workerProcessCount.MaxProcessCount)
{
throw new ArgumentException($"{nameof(workerProcessCount.ProcessCount)} must not be greater than {nameof(workerProcessCount.MaxProcessCount)}");
}
return descriptionProfiles;
if (workerProcessCount.ProcessStartupInterval.Ticks < 0)
{
throw new ArgumentOutOfRangeException($"{nameof(workerProcessCount.ProcessStartupInterval)}", "The TimeSpan must not be negative.");
}

return workerProcessCount;
}

private static void GetWorkerDescriptionFromAppSettings(RpcWorkerDescription workerDescription, IConfigurationSection languageSection)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ internal class RpcFunctionInvocationDispatcher : IFunctionInvocationDispatcher
private readonly IRpcWorkerChannelFactory _rpcWorkerChannelFactory;
private readonly IEnvironment _environment;
private readonly IApplicationLifetime _applicationLifetime;
private readonly int _debounceSeconds = 10;
private readonly int _maxAllowedProcessCount = 10;
private readonly TimeSpan _shutdownTimeout = TimeSpan.FromSeconds(10);
private readonly TimeSpan thresholdBetweenRestarts = TimeSpan.FromMinutes(WorkerConstants.WorkerRestartErrorIntervalThresholdInMinutes);

Expand All @@ -50,6 +48,7 @@ internal class RpcFunctionInvocationDispatcher : IFunctionInvocationDispatcher
private IEnumerable<FunctionMetadata> _functions;
private ConcurrentStack<WorkerErrorEvent> _languageWorkerErrors = new ConcurrentStack<WorkerErrorEvent>();
private CancellationTokenSource _processStartCancellationToken = new CancellationTokenSource();
private int _debounceMilliSeconds = (int)TimeSpan.FromSeconds(10).TotalMilliseconds;

public RpcFunctionInvocationDispatcher(IOptions<ScriptJobHostOptions> scriptHostOptions,
IMetricsLogger metricsLogger,
Expand All @@ -76,14 +75,8 @@ public RpcFunctionInvocationDispatcher(IOptions<ScriptJobHostOptions> scriptHost
_logger = loggerFactory.CreateLogger<RpcFunctionInvocationDispatcher>();
_rpcWorkerChannelFactory = rpcWorkerChannelFactory;
_workerRuntime = _environment.GetEnvironmentVariable(RpcWorkerConstants.FunctionWorkerRuntimeSettingName);

var processCount = _environment.GetEnvironmentVariable(RpcWorkerConstants.FunctionsWorkerProcessCountSettingName);
_maxProcessCount = (processCount != null && int.Parse(processCount) > 1) ? int.Parse(processCount) : 1;
_maxProcessCount = _maxProcessCount > _maxAllowedProcessCount ? _maxAllowedProcessCount : _maxProcessCount;
_functionDispatcherLoadBalancer = functionDispatcherLoadBalancer;

State = FunctionInvocationDispatcherState.Default;
ErrorEventsThreshold = 3 * _maxProcessCount;

_workerErrorSubscription = _eventManager.OfType<WorkerErrorEvent>()
.Subscribe(WorkerError);
Expand Down Expand Up @@ -150,7 +143,7 @@ private void StartWorkerProcesses(int startIndex, Action startAction)
{
for (var count = startIndex; count < _maxProcessCount; count++)
{
startAction = startAction.Debounce(_processStartCancellationToken.Token, count * _debounceSeconds * 1000);
startAction = startAction.Debounce(_processStartCancellationToken.Token, count * _debounceMilliSeconds);
startAction();
}
}
Expand All @@ -175,6 +168,15 @@ public async Task InitializeAsync(IEnumerable<FunctionMetadata> functions, Cance
return;
}

var workerConfig = _workerConfigs.Where(c => c.Description.Language.Equals(_workerRuntime, StringComparison.OrdinalIgnoreCase)).FirstOrDefault();
if (workerConfig == null)
{
throw new InvalidOperationException($"WorkerCofig for runtime: {_workerRuntime} not found");
}
_maxProcessCount = workerConfig.CountOptions.ProcessCount;
_debounceMilliSeconds = (int)workerConfig.CountOptions.ProcessStartupInterval.TotalMilliseconds;
ErrorEventsThreshold = 3 * _maxProcessCount;

if (functions == null || functions.Count() == 0)
{
// do not initialize function dispatcher if there are no functions
Expand Down
1 change: 1 addition & 0 deletions src/WebJobs.Script/Workers/WorkerConstants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public static class WorkerConstants
public const string WorkerDescriptionDefaultExecutablePath = "defaultExecutablePath";
public const string WorkerDescriptionDefaultWorkerPath = "defaultWorkerPath";
public const string WorkerDescription = "description";
public const string ProcessCount = "processOptions";
public const string WorkerDescriptionArguments = "arguments";
public const string WorkerDescriptionDefaultRuntimeVersion = "defaultRuntimeVersion";

Expand Down
6 changes: 3 additions & 3 deletions test/WebJobs.Script.Tests.Shared/TestHelpers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -256,12 +256,12 @@ public static async Task<string> ReadStreamToEnd(Stream stream)
}
}

public static IList<RpcWorkerConfig> GetTestWorkerConfigs(bool includeDllWorker = false)
public static IList<RpcWorkerConfig> GetTestWorkerConfigs(bool includeDllWorker = false, int processCountValue = 1)
{
var workerConfigs = new List<RpcWorkerConfig>
{
new RpcWorkerConfig() { Description = GetTestWorkerDescription("node", ".js") },
new RpcWorkerConfig() { Description = GetTestWorkerDescription("java", ".jar") }
new RpcWorkerConfig() { Description = GetTestWorkerDescription("node", ".js"), CountOptions = new Script.Workers.WorkerProcessCountOptions() { ProcessCount = processCountValue } },
new RpcWorkerConfig() { Description = GetTestWorkerDescription("java", ".jar"), CountOptions = new Script.Workers.WorkerProcessCountOptions() { ProcessCount = processCountValue } }
};

// Allow tests to have a worker that claims the .dll extension.
Expand Down
Loading

0 comments on commit 6976eb5

Please sign in to comment.