This repository was archived by the owner on Nov 16, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 100
/
Copy pathobEventHub.cs
58 lines (50 loc) · 2.02 KB
/
obEventHub.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
using Microsoft.Azure.EventHubs;
using Microsoft.Azure.WebJobs;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using System;
using System.Text;
using System.Threading.Tasks;
using System.Diagnostics;
using System.Collections.Generic;
namespace nsgFunc
{
public partial class Util
{
private static Lazy<EventHubClient> LazyEventHubConnection = new Lazy<EventHubClient>(() =>
{
string EventHubConnectionString = GetEnvironmentVariable("eventHubConnection");
string EventHubName = GetEnvironmentVariable("eventHubName");
var connectionStringBuilder = new EventHubsConnectionStringBuilder(EventHubConnectionString)
{
EntityPath = EventHubName
};
var eventHubClient = EventHubClient.CreateFromConnectionString(connectionStringBuilder.ToString());
return eventHubClient;
});
public static async Task<int> obEventHub(string newClientContent, ILogger log)
{
var eventHubClient = LazyEventHubConnection.Value;
int bytesSent = 0;
foreach (var bundleOfMessages in bundleMessageListsJson(newClientContent, log))
{
await eventHubClient.SendAsync(new EventData(Encoding.UTF8.GetBytes(bundleOfMessages)));
bytesSent += bundleOfMessages.Length;
}
return bytesSent;
}
static System.Collections.Generic.IEnumerable<string> bundleMessageListsJson(string newClientContent, ILogger log)
{
foreach (var messageList in denormalizedRecords(newClientContent, null, log))
{
var outgoingRecords = new OutgoingRecords();
outgoingRecords.records = messageList;
var outgoingJson = JsonConvert.SerializeObject(outgoingRecords, new JsonSerializerSettings
{
NullValueHandling = NullValueHandling.Ignore
});
yield return outgoingJson;
}
}
}
}