Skip to content

Commit

Permalink
Add CancellationToken to ActionExtensions- Debouce API (Azure#4643)
Browse files Browse the repository at this point in the history
  • Loading branch information
pragnagopa authored Jul 11, 2019
1 parent aedab0b commit 8f8ff17
Show file tree
Hide file tree
Showing 11 changed files with 160 additions and 89 deletions.
2 changes: 1 addition & 1 deletion sample/CSharp/HttpTrigger/run.csx
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ public static IActionResult Run(HttpRequest req, TraceWriter log)

if (req.Query.TryGetValue("name", out StringValues value))
{
return new OkObjectResult($"Hello, {value.ToString()}");
return new OkObjectResult($"Hello {value.ToString()}");
}

return new BadRequestObjectResult("Please pass a name on the query string or in the request body");
Expand Down
2 changes: 1 addition & 1 deletion src/WebJobs.Script.WebHost/FileMonitoringService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public FileMonitoringService(IOptions<ScriptJobHostOptions> scriptOptions, ILogg
_restart = _restart.Debounce(500);

_shutdown = Shutdown;
_shutdown = _shutdown.Debounce(500);
_shutdown = _shutdown.Debounce(milliseconds: 500);
}

public Task StartAsync(CancellationToken cancellationToken)
Expand Down
11 changes: 7 additions & 4 deletions src/WebJobs.Script/Extensions/ActionExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ namespace Microsoft.Azure.WebJobs.Script
{
public static class ActionExtensions
{
public static Action<T> Debounce<T>(this Action<T> func, int milliseconds = 300)
public static Action<T> Debounce<T>(this Action<T> func, CancellationToken cancellationToken = default, int milliseconds = 300)
{
var last = 0;

Expand All @@ -24,17 +24,20 @@ public static Action<T> Debounce<T>(this Action<T> func, int milliseconds = 300)
// Only proceeed with the operation if there have been no
// more events within the specified time window (i.e. there
// is a quiet period)
func(arg);
if (!cancellationToken.IsCancellationRequested)
{
func(arg);
}
}
t.Dispose();
});
};
}

public static Action Debounce(this Action targetAction, int milliseconds = 300)
public static Action Debounce(this Action targetAction, CancellationToken cancellationToken = default, int milliseconds = 300)
{
Action<object> action = _ => targetAction();
action = action.Debounce(milliseconds);
action = action.Debounce(cancellationToken, milliseconds);

return () => action(null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using System.Linq;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using Microsoft.Azure.WebJobs.Script.Description;
Expand Down Expand Up @@ -45,6 +46,7 @@ internal class FunctionDispatcher : IFunctionDispatcher
private Action _shutdownStandbyWorkerChannels;
private IEnumerable<FunctionMetadata> _functions;
private ConcurrentBag<Exception> _languageWorkerErrors = new ConcurrentBag<Exception>();
private CancellationTokenSource _processStartCancellationToken = new CancellationTokenSource();

public FunctionDispatcher(IOptions<ScriptJobHostOptions> scriptHostOptions,
IMetricsLogger metricsLogger,
Expand Down Expand Up @@ -87,7 +89,7 @@ public FunctionDispatcher(IOptions<ScriptJobHostOptions> scriptHostOptions,
.Subscribe(AddOrUpdateWorkerChannels);

_shutdownStandbyWorkerChannels = ShutdownWebhostLanguageWorkerChannels;
_shutdownStandbyWorkerChannels = _shutdownStandbyWorkerChannels.Debounce(5000);
_shutdownStandbyWorkerChannels = _shutdownStandbyWorkerChannels.Debounce(milliseconds: 5000);
}

public FunctionDispatcherState State { get; private set; }
Expand Down Expand Up @@ -131,7 +133,7 @@ private void StartWorkerProcesses(int startIndex, Action startAction)
{
for (var count = startIndex; count < _maxProcessCount; count++)
{
startAction = startAction.Debounce(count * _debounceSeconds * 1000);
startAction = startAction.Debounce(_processStartCancellationToken.Token, count * _debounceSeconds * 1000);
startAction();
}
}
Expand Down Expand Up @@ -280,6 +282,8 @@ protected virtual void Dispose(bool disposing)
{
_workerErrorSubscription.Dispose();
_rpcChannelReadySubscriptions.Dispose();
_processStartCancellationToken.Cancel();
_processStartCancellationToken.Dispose();
_jobHostLanguageWorkerChannelManager.DisposeAndRemoveChannels();
_disposed = true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs.Script.Eventing;
using Microsoft.Extensions.Logging;
Expand Down Expand Up @@ -37,7 +38,7 @@ public WebHostLanguageWorkerChannelManager(IScriptEventManager eventManager, IEn
_applicationHostOptions = applicationHostOptions;

_shutdownStandbyWorkerChannels = ScheduleShutdownStandbyChannels;
_shutdownStandbyWorkerChannels = _shutdownStandbyWorkerChannels.Debounce(5000);
_shutdownStandbyWorkerChannels = _shutdownStandbyWorkerChannels.Debounce(milliseconds: 5000);
}

public Task<ILanguageWorkerChannel> InitializeChannelAsync(string runtime)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,16 @@ public abstract class EndToEndTestFixture : IAsyncLifetime
private readonly string _rootPath;
private string _copiedRootPath;
private string _functionsWorkerRuntime;
private int _workerProcessCount;

protected EndToEndTestFixture(string rootPath, string testId, string functionsWorkerRuntime)
protected EndToEndTestFixture(string rootPath, string testId, string functionsWorkerRuntime, int workerProcessesCount = 1)
{
FixtureId = testId;

_rootPath = rootPath;
_functionsWorkerRuntime = functionsWorkerRuntime;
_workerProcessCount = workerProcessesCount;

}

public CloudBlobContainer TestInputContainer { get; private set; }
Expand Down Expand Up @@ -91,6 +94,7 @@ public async Task InitializeAsync()
if (!string.IsNullOrEmpty(_functionsWorkerRuntime))
{
Environment.SetEnvironmentVariable(LanguageWorkerConstants.FunctionWorkerRuntimeSettingName, _functionsWorkerRuntime);
Environment.SetEnvironmentVariable(LanguageWorkerConstants.FunctionsWorkerProcessCountSettingName, _workerProcessCount.ToString());
}

FunctionsSyncManagerMock = new Mock<IFunctionsSyncManager>(MockBehavior.Strict);
Expand Down Expand Up @@ -214,6 +218,7 @@ public virtual Task DisposeAsync()
}
}
Environment.SetEnvironmentVariable(LanguageWorkerConstants.FunctionWorkerRuntimeSettingName, string.Empty);
Environment.SetEnvironmentVariable(LanguageWorkerConstants.FunctionsWorkerProcessCountSettingName, string.Empty);
return Task.CompletedTask;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ public async Task SetHostState_Offline_Succeeds()
var hostStatus = response.Content.ReadAsAsync<HostStatus>();

// verify functions can be invoked
await InvokeAndValidateHttpTrigger(functionName);
await SamplesTestHelpers.InvokeAndValidateHttpTrigger(_fixture, functionName);

// verify function status is ok
response = await GetFunctionStatusAsync(functionName);
Expand All @@ -310,7 +310,7 @@ await TestHelpers.RunWithTimeoutAsync(async () =>
Assert.Null(functionStatus.Errors);

// verify that when offline function requests return 503
response = await InvokeHttpTrigger(functionName);
response = await SamplesTestHelpers.InvokeHttpTrigger(_fixture, functionName);
await VerifyOfflineResponse(response);

// verify that the root returns 503 immediately when offline
Expand All @@ -331,7 +331,7 @@ await TestHelpers.RunWithTimeoutAsync(async () =>
await _fixture.InitializeAsync();

// verify functions can be invoked
await InvokeAndValidateHttpTrigger(functionName);
await SamplesTestHelpers.InvokeAndValidateHttpTrigger(_fixture, functionName);

// verify the same thing via admin api
response = await AdminInvokeFunction(functionName);
Expand Down Expand Up @@ -535,26 +535,6 @@ public async Task HttpTrigger_Poco_Get_Succeeds()
}
}

private async Task InvokeAndValidateHttpTrigger(string functionName)
{
string functionKey = await _fixture.Host.GetFunctionSecretAsync($"{functionName}");
string uri = $"api/{functionName}?code={functionKey}&name=Mathew";
HttpRequestMessage request = new HttpRequestMessage(HttpMethod.Get, uri);
request.Headers.Accept.Add(new MediaTypeWithQualityHeaderValue("text/plain"));

HttpResponseMessage response = await _fixture.Host.HttpClient.SendAsync(request);
Assert.Equal(HttpStatusCode.OK, response.StatusCode);
string body = await response.Content.ReadAsStringAsync();
Assert.Equal("text/plain", response.Content.Headers.ContentType.MediaType);
Assert.Equal("Hello, Mathew", body);

// verify request also succeeds with master key
string masterKey = await _fixture.Host.GetMasterKeyAsync();
uri = $"api/{functionName}?code={masterKey}&name=Mathew";
request = new HttpRequestMessage(HttpMethod.Get, uri);
Assert.Equal(HttpStatusCode.OK, response.StatusCode);
}

// invoke a function via the admin invoke api
private async Task<HttpResponseMessage> AdminInvokeFunction(string functionName, string input = null)
{
Expand All @@ -570,16 +550,6 @@ private async Task<HttpResponseMessage> AdminInvokeFunction(string functionName,
return await _fixture.Host.HttpClient.SendAsync(request);
}

private async Task<HttpResponseMessage> InvokeHttpTrigger(string functionName)
{
string functionKey = await _fixture.Host.GetFunctionSecretAsync($"{functionName}");
string uri = $"api/{functionName}?code={functionKey}&name=Mathew";
HttpRequestMessage request = new HttpRequestMessage(HttpMethod.Get, uri);
request.Headers.Accept.Add(new MediaTypeWithQualityHeaderValue("text/plain"));

return await _fixture.Host.HttpClient.SendAsync(request);
}

[Fact]
public async Task HttpTrigger_DuplicateQueryParams_Succeeds()
{
Expand All @@ -599,7 +569,7 @@ public async Task HttpTrigger_DuplicateQueryParams_Succeeds()
Assert.Equal(HttpStatusCode.OK, response.StatusCode);
string body = await response.Content.ReadAsStringAsync();
Assert.Equal("text/plain", response.Content.Headers.ContentType.MediaType);
Assert.Equal("Hello, Mathew,Amy", body);
Assert.Equal("Hello Mathew,Amy", body);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public SamplesEndToEndTests_Java(TestFixture fixture)
[Fact]
public async Task HttpTrigger_Java_Get_Succeeds()
{
await InvokeHttpTrigger("HttpTrigger");
await SamplesTestHelpers.InvokeHttpTrigger(_fixture, "HttpTrigger");
}

[Fact]
Expand All @@ -52,17 +52,6 @@ public async Task JavaProcess_Same_AfterHostRestart()
Assert.Equal(0, result.Count());
}

private async Task InvokeHttpTrigger(string functionName)
{
string functionKey = await _fixture.Host.GetFunctionSecretAsync($"{functionName}");
string uri = $"api/{functionName}?code={functionKey}&name=Mathew";
HttpRequestMessage request = new HttpRequestMessage(HttpMethod.Get, uri);
request.Headers.Accept.Add(new MediaTypeWithQualityHeaderValue("text/plain"));

var response = await _fixture.Host.HttpClient.SendAsync(request);
Assert.Equal(HttpStatusCode.OK, response.StatusCode);
}

public class TestFixture : EndToEndTestFixture
{
static TestFixture()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,43 +252,12 @@ public async Task ManualTrigger_Invoke_Succeeds()
[Fact]
public async Task HttpTrigger_Get_Succeeds()
{
await InvokeAndValidateHttpTrigger("HttpTrigger");
}

private async Task InvokeAndValidateHttpTrigger(string functionName)
{
string functionKey = await _fixture.Host.GetFunctionSecretAsync($"{functionName}");
string uri = $"api/{functionName}?code={functionKey}&name=Mathew";
HttpRequestMessage request = new HttpRequestMessage(HttpMethod.Get, uri);
request.Headers.Accept.Add(new MediaTypeWithQualityHeaderValue("text/plain"));

HttpResponseMessage response = await _fixture.Host.HttpClient.SendAsync(request);
Assert.Equal(HttpStatusCode.OK, response.StatusCode);
string body = await response.Content.ReadAsStringAsync();
Assert.Equal("text/plain", response.Content.Headers.ContentType.MediaType);
Assert.Equal("Hello Mathew", body);

// verify request also succeeds with master key
string masterKey = await _fixture.Host.GetMasterKeyAsync();
uri = $"api/{functionName}?code={masterKey}&name=Mathew";
request = new HttpRequestMessage(HttpMethod.Get, uri);
Assert.Equal(HttpStatusCode.OK, response.StatusCode);
}

private async Task<HttpResponseMessage> InvokeHttpTrigger(string functionName)
{
string functionKey = await _fixture.Host.GetFunctionSecretAsync($"{functionName}");
string uri = $"api/{functionName}?code={functionKey}&name=Mathew";
HttpRequestMessage request = new HttpRequestMessage(HttpMethod.Get, uri);
request.Headers.Accept.Add(new MediaTypeWithQualityHeaderValue("text/plain"));

return await _fixture.Host.HttpClient.SendAsync(request);
await SamplesTestHelpers.InvokeAndValidateHttpTrigger(_fixture, "HttpTrigger");
}

[Fact]
public async Task HttpTrigger_DuplicateQueryParams_Succeeds()
{

string functionKey = await _fixture.Host.GetFunctionSecretAsync("httptrigger");
string uri = $"api/httptrigger?code={functionKey}&name=Mathew&name=Amy";
HttpRequestMessage request = new HttpRequestMessage(HttpMethod.Get, uri);
Expand Down Expand Up @@ -436,6 +405,8 @@ public async Task NodeProcess_Different_AfterHostRestart()
await _fixture.Host.RestartAsync(CancellationToken.None);

await HttpTrigger_Get_Succeeds();
// wait for orphaned jobhost instance to be disposed
await Task.Delay(TimeSpan.FromSeconds(5));
IEnumerable<int> nodeProcessesAfter = Process.GetProcessesByName("node").Select(p => p.Id);
// Verify number of node processes before and after restart are the same.
Assert.Equal(nodeProcessesBefore.Count(), nodeProcessesAfter.Count());
Expand All @@ -447,7 +418,6 @@ public async Task NodeProcess_Different_AfterHostRestart()
[Fact]
public async Task HttpTrigger_Disabled_SucceedsWithAdminKey()
{

// first try with function key only - expect 404
string functionKey = await _fixture.Host.GetFunctionSecretAsync("HttpTrigger-Disabled");
string uri = $"api/httptrigger-disabled?code={functionKey}";
Expand Down
Loading

0 comments on commit 8f8ff17

Please sign in to comment.