Skip to content

Commit

Permalink
Change az function to use SignalR
Browse files Browse the repository at this point in the history
  • Loading branch information
fpelaez committed Nov 19, 2018
1 parent efc0340 commit b9ce65f
Show file tree
Hide file tree
Showing 3 changed files with 177 additions and 39 deletions.
210 changes: 172 additions & 38 deletions Source/SmartHotel360.WebsiteFunction/PetChecker.cs
Original file line number Diff line number Diff line change
@@ -1,84 +1,218 @@
using Gremlin.Net.Driver;
using Gremlin.Net.Structure.IO.GraphSON;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.Documents;
using Microsoft.Azure.Documents.Client;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.Azure.WebJobs.Extensions.SignalRService;
using Microsoft.Azure.WebJobs.Host;
using Microsoft.ProjectOxford.Vision;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net.Http;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.Documents;
using System.Collections.Generic;
using Microsoft.Azure.WebJobs.Host;
using Microsoft.ProjectOxford.Vision;
using Microsoft.Azure.Documents.Client;


namespace PetCheckerFunction
{
public static class PetChecker
{
[FunctionName("PetChecker")]
public static async Task RunPetChecker([CosmosDBTrigger("pets", "checks", ConnectionStringSetting = "constr", CreateLeaseCollectionIfNotExists = true)] IReadOnlyList<Document> document, TraceWriter log)
public static async Task RunPetChecker(
[CosmosDBTrigger("pets", "checks", ConnectionStringSetting = "constr",
CreateLeaseCollectionIfNotExists=true)] IReadOnlyList<Document> document,
[SignalR(HubName = "petcheckin", ConnectionStringSetting = "AzureSignalRConnectionString")] IAsyncCollector<SignalRMessage> sender,
TraceWriter log)
{
var httpClient = new HttpClient();
var sendingResponse = false;
try
{
foreach (dynamic doc in document)
{
sendingResponse = false;
var isProcessed = doc.IsApproved != null;
if (isProcessed)
{
continue;
}
var url = doc.MediaUrl;

var url = doc.MediaUrl.ToString();
var uploaded = (DateTime)doc.Created;
log.Info($">>> Processing image in {url} upladed at {uploaded.ToString()}");
var res = await httpClient.GetAsync(url);
using (var stream = await res.Content.ReadAsStreamAsync() as Stream)

using (var httpClient = new HttpClient())
{

var res = await httpClient.GetAsync(url);
var stream = await res.Content.ReadAsStreamAsync() as Stream;
log.Info($"--- Image succesfully downloaded from storage");
(bool allowed, string message) = await PassesImageModerationAsync(stream, log);
var (allowed, message, tags) = await PassesImageModerationAsync(stream, log);
log.Info($"--- Image analyzed. It was {(allowed ? string.Empty : "NOT")} approved");
doc.IsApproved = allowed;
doc.Message = message;
log.Info($"--- Updating CosmosDb document to have historical data");
await UpsertDocument(doc, log);
log.Info($"<<< Image in {url} processed!");
log.Info($"--- Updating Graph");
await InsertInGraph(tags, doc, log);
log.Info("--- Sending SignalR response.");
sendingResponse = true;
await SendSignalRResponse(sender, allowed, message);
log.Info($"<<< Done! Image in {url} processed!");
}
}
}
finally
catch (Exception ex)
{
httpClient?.Dispose();
var msg = $"Error {ex.Message} ({ex.GetType().Name})";
log.Info("!!! " + msg);

if (ex is AggregateException aggex)
{
foreach (var innex in aggex.InnerExceptions)
{
log.Info($"!!! (inner) Error {innex.Message} ({innex.GetType().Name})");
}
}

if (!sendingResponse)
{
await SendSignalRResponse(sender, false, msg);
}
throw ex;
}
}
private static async Task UpsertDocument(dynamic doc, TraceWriter log)

private static Task SendSignalRResponse(IAsyncCollector<SignalRMessage> sender, bool isOk, string message)
{
return sender.AddAsync(new SignalRMessage()
{
Target = "ProcessDone",
Arguments = new[] { new {
processedAt = DateTime.UtcNow,
accepted = isOk,
message
}}
});

}

private static async Task InsertInGraph(IEnumerable<string> tags, dynamic doc, TraceWriter log)
{
var hostname = await GetSecret("gremlin_endpoint");
var port = await GetSecret("gremlin_port");
var database = "pets";
var collection = "checks";
var authKey = Environment.GetEnvironmentVariable("gremlin_key");
var portToUse = 443;
portToUse = int.TryParse(port, out portToUse) ? portToUse : 443;

var gremlinServer = new GremlinServer(hostname, portToUse, enableSsl: true,
username: "/dbs/" + database + "/colls/" + collection,
password: authKey);
var gremlinClient = new GremlinClient(gremlinServer, new GraphSON2Reader(), new GraphSON2Writer(), GremlinClient.GraphSON2MimeType);
foreach (var tag in tags)
{
log.Info("--- --- Checking vertex for tag " + tag);
await TryAddTag(gremlinClient, tag, log);
}

var queries = AddPetToGraphQueries(doc, tags);
log.Info("--- --- Adding vertex for pet checkin ");
foreach (string query in queries)
{
await gremlinClient.SubmitAsync<dynamic>(query);
}
}

private static async Task TryAddTag(GremlinClient gremlinClient, string tag, TraceWriter log)
{
var endpoint = Environment.GetEnvironmentVariable("cosmos_uri");
var auth = Environment.GetEnvironmentVariable("cosmos_key");
using (var client = new DocumentClient(new Uri(endpoint), auth))
var query = $"g.V('{tag}')";
var response = await gremlinClient.SubmitAsync<dynamic>(query);

if (!response.Any())
{
var dbName = "pets";
var colName = "checks";
doc.Analyzed = DateTime.UtcNow;
await client.UpsertDocumentAsync(
UriFactory.CreateDocumentCollectionUri(dbName, colName), doc);
log.Info($"--- CosmosDb document updated.");
log.Info("--- --- Adding vertex for tag " + tag);
await gremlinClient.SubmitAsync<dynamic>(AddTagToGraphQuery(tag));
}
}
public static async Task<(bool, string)> PassesImageModerationAsync(Stream image, TraceWriter log)

private static IEnumerable<string> AddPetToGraphQueries(dynamic doc, IEnumerable<string> tags)
{
var id = doc.id.ToString();

var msg = (doc.Message?.ToString() ?? "").Replace("'", "\'");

yield return $"g.addV('checkin').property('id','{id}').property('description','{msg}')";
foreach (var tag in tags)
{
yield return $"g.V('{id}').addE('seems').to(g.V('{tag}'))";
}
}

private static string AddTagToGraphQuery(string tag) => $"g.addV('tag').property('id', '{tag}').property('value', '{tag}')";

private static async Task UpsertDocument(dynamic doc, TraceWriter log)
{
var endpoint = await GetSecret("cosmos_uri");
var auth = await GetSecret("cosmos_key");

var client = new DocumentClient(new Uri(endpoint), auth);
var dbName = "pets";
var colName = "checks";
doc.Analyzed = DateTime.UtcNow;
await client.UpsertDocumentAsync(
UriFactory.CreateDocumentCollectionUri(dbName, colName), doc);
log.Info($"--- CosmosDb document updated.");
}

private static async Task<string> GetSecret(string secretName)
{
log.Info("--- Creating VisionApi client and analyzing image");
var key = Environment.GetEnvironmentVariable("MicrosoftVisionApiKey");
var endpoint = Environment.GetEnvironmentVariable("MicrosoftVisionApiEndpoint");
var client = new VisionServiceClient(key, endpoint);
var features = new VisualFeature[] { VisualFeature.Description };
var result = await client.AnalyzeImageAsync(image, features);
log.Info($"--- Image analyzed with tags: {String.Join(",", result.Description.Tags)}");
if (!int.TryParse(Environment.GetEnvironmentVariable("MicrosoftVisionNumTags"), out var tagsToFetch))

return Environment.GetEnvironmentVariable(secretName);
}

public static async Task<(bool allowd, string message, string[] tags)> PassesImageModerationAsync(Stream image, TraceWriter log)
{
try
{
tagsToFetch = 5;
log.Info("--- Creating VisionApi client and analyzing image");

var key = await GetSecret("MicrosoftVisionApiKey");
var endpoint = await GetSecret("MicrosoftVisionApiEndpoint");
var numTags = await GetSecret("MicrosoftVisionNumTags");
var client = new VisionServiceClient(key, endpoint);
var features = new VisualFeature[] { VisualFeature.Description };
var result = await client.AnalyzeImageAsync(image, features);

log.Info($"--- Image analyzed with tags: {String.Join(",", result.Description.Tags)}");
if (!int.TryParse(numTags, out var tagsToFetch))
{
tagsToFetch = 5;
}
var fetchedTags = result?.Description?.Tags.Take(tagsToFetch).ToArray() ?? new string[0];
bool isAllowed = fetchedTags.Contains("dog");
string message = result?.Description?.Captions.FirstOrDefault()?.Text;
return (isAllowed, message, fetchedTags);
}
catch (Exception ex)
{
log.Info("Vision API error! " + ex.Message);
return (false, "error " + ex.Message, new string[0]);
}
bool isAllowed = result.Description.Tags.Take(tagsToFetch).Contains("dog");
string message = result?.Description?.Captions.FirstOrDefault()?.Text;
return (isAllowed, message);
}

[FunctionName(nameof(SignalRInfo))]
public static IActionResult SignalRInfo(
[HttpTrigger(AuthorizationLevel.Anonymous, "post")]HttpRequestMessage req,
[SignalRConnectionInfo(HubName = "petcheckin")] SignalRConnectionInfo info)
{
return info != null
? (ActionResult)new OkObjectResult(info)
: new NotFoundObjectResult("Failed to load SignalR Info.");
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
<None Remove="lib\Microsoft.ProjectOxford.Vision.dll" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Gremlin.Net" Version="3.3.4" />
<PackageReference Include="Gremlin.Net" Version="3.3.4" />
<PackageReference Include="Microsoft.Azure.KeyVault" Version="3.0.0" />
<PackageReference Include="Microsoft.Azure.Services.AppAuthentication" Version="1.1.0-preview" />
<PackageReference Include="Microsoft.NET.Sdk.Functions" Version="1.0.23" />
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.CosmosDB" Version="3.0.1" />
Expand Down
3 changes: 3 additions & 0 deletions Source/SmartHotel360.WebsiteFunction/local.settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,8 @@
"MicrosoftVisionApiEndpoint": "<Cognitive Services Vision API URL (eg. https://southcentralus.api.cognitive.microsoft.com/vision/v1.0)>",
"MicrosoftVisionNumTags": "10",
"AzureSignalRConnectionString": "<Connection String to the SignalR Service instance>"
},
"Host": {
"CORS": "*"
}
}

0 comments on commit b9ce65f

Please sign in to comment.