In this lab you will use an Azure Logic App to simmulate high-frequency stock market of the NYSE. Every second the LogicApp generates a random number of stock purchase transactions with variable amounts for 5 of the biggest tech companies in the world. The LogicApp then sends stock transaction messages to Event Hubs. You will use Stream Analytics queries to join hot and cold data streams to process the high volume of transactions and generate aggregate calculations. The results will be sent to a real-time dataset in Power BI.
IMPORTANT: This lab requires you have a valid Power BI account. If you don’t have one you can register for a 60-day trial here: https://powerbi.microsoft.com/en-us/power-bi-pro/
The estimated time to complete this lab is: 60 minutes.
The following Azure services will be used in this lab. If you need further training resources or access to technical documentation please find in the table below links to Microsoft Learn and to each service's Technical Documentation.
Azure Service | Microsoft Learn | Technical Documentation |
---|---|---|
Azure Data Lake Gen2 | Large Scale Data Processing with Azure Data Lake Storage Gen2 | Azure Data Lake Gen2 Technical Documentation |
Azure Logic Apps | Build automated workflows to integrate data and apps with Azure Logic Apps | Azure Logic Apps Technical Documentation |
Azure Event Hubs | Enable reliable messaging for Big Data applications using Azure Event Hubs | Azure Event Hubs Technical Documentation |
Azure Stream Analytics | Implement a Data Streaming Solution with Azure Streaming Analytics | Azure Stream Analytics Technical Documentation |
Power BI | Create and use analytics reports with Power BI | Power BI Technical Documentation |
IMPORTANT: Some of the Azure services provisioned require globally unique name and a “-suffix” has been added to their names to ensure this uniqueness. Please take note of the suffix generated as you will need it for the following resources in this lab:
Name | Type |
---|---|
synapsedatalakesuffix | Storage Account |
ADPEventHubs-suffix | Event Hubs Namespace |
SynapseStreamAnalytics-suffix | Stream Analytics job |
In this section you will review the implementation of the LogicApp used to simmulate high-frequency stock purchase transactions. These transactions will be formatted as JSON messages and sent to Event Hubs for processing. All steps required to generate the stock transaction messages have alreay been done for you and no further changes are required in this section.
IMPORTANT |
---|
Execute these steps on your host computer |
-
In the Azure Portal, go to the lab resource group and locate the Logic App resource ADPLogicApp.
-
On the ADPLogicApp menu, click Logic app designer to open the design panel.
-
On the Logic app designer panel, note that the Recurrence trigger is set to execute every 1 second:
-
The next two steps Initialize Config Settings and Parse Config Settings define the parameters used to generate the stock purchase messages.
-
The Initialize messageCount step is used to initialize a variable used to count the number of messages generated. It's initial value is set to 1.
-
In the Until numberOfMessages is achieved loop a piece of JavaScript is executed to generate the stock purchase transaction message. The message is then sent to the nysestocktrade Event Hub. The loop repeats its execution generating a random number of messages between 1 and 10 for every execution of the LogicApp.
-
The format of each stock purchase transaction message generated looks like this:
{
"StockTicker": "MSFT",
"Quantity": 120,
"Price": 83.63,
"TradeTimestamp": "2019-11-24T00:21:50.207Z"
}
-
Return to the Overview panel. Note that the LogicApp is disabled by default. Click Enable to enable the LogicApp to start firing every second.
In this section you will prepare Event Hubs to ingest NYSE stock trade messages generated by the LogicApp and save to your Synapse Data Lake account.
IMPORTANT |
---|
Execute these steps on your host computer |
-
In the Azure Portal, go to the lab resource group and locate the Event Hubs resource ADPEventHubs-suffix.
-
On the Event Hubs panel, note that the nysestocktrade Event Hub has already been created for you. This is the same Event Hub you saw referenced by the LogicApp in the previous section.
-
Click on the nysestocktrade Event Hub to open its settings. Then click on the Capture item on the left-hand side menu.
-
Enter the following details:
- Capture: On
- Time window (minutes): 1
- Do not emit empty files when no events occur during the capture time window: Checked.
- Capture Provider: Azure Storage
- Azure Storage Container: [select the nysestocktrade container in your synapsedatalakesuffix storage account] -
Leave remaining fields with their default values.
-
Click Save Changes.
-
On your SynapseDataLake account, navigate to the NYCStockTrades container you created in the previous section.
-
You should be able to see the folder structure created by Event Hubs Capture with AVRO files containing the individual stock purchase transaction messages generated by the LogicApp. These files can then be used in other analytics worlkloads whenever the granular detail about each individual transaction is required.
In this section you will configure your Stream Analytics job to join hot and cold data streams and execute queries on data sent by Event Hubs and generate outputs to Power BI.
IMPORTANT |
---|
Execute these steps on your host computer |
-
In the Azure Portal, go to the lab resource group and locate the Stream Analytics resource SynapseStreamAnalytics-suffix.
-
On the Inputs panel, click + Add stream input button and select Event Hub to create a new input stream.
-
On the Event Hub New input blade enter the following details:
- Input alias: NYSEStockTrades
- Event Hub namespace: ADPEventHubs-suffix
- Event Hub name > Use existing: nysestocktrade
- Event Hub policy name > Use existing: RootManageSharedAccessKey
- Event Hub consumer group > Use existing: $Default -
Leave remaining fields with their default values.
-
Now click + Add reference input button and select SQL Database to create a new reference data input stream.
-
On the SQL Database New input blade enter the following details:
- Input alias: NYSEStockCompanies
- Storage account for this job: [select your synapsedatalakesuffix storage account]
- Select SQL Database from your subscriptions: Checked
- Subscription: your Azure subscription
- Database: NYCDataSets
- Server name: operationalsql-suffix.database.windows.net
- User name: adpadmin
- Password: P@ssw0rd123!
- Refresh periodically: Off
- Snapshot query:select [StockTicker] ,[CompanyName] from [NYC].[NYSE_StockTickerLookup]
This reference table contains static data about the companies:
-
Click Save to save your reference input stream and return to the Inputs panel.
-
Click on the Outputs panel on the left-hand side menu. Once it is loaded, click + Add button and select Power BI to create a new output stream.
-
On the Power BI New Output blade, click Authorize to authenticate with Power BI. Enter your credentials to authenticate.
-
Once authenticated, enter the following details:
- Output alias: StockTradeByCompany
- Authentication Mode: User Token
- Group Workspace: My Workspace
- Dataset name: StockTradeByCompany
- Table name: StockTradeByCompany
IMPORTANT: Set Authentication Mode to User Token before you can select My Workspace for Group Workspace.
-
Leave remaining fields with their default values.
-
Click Save to save your output stream and return to the Outputs panel.
-
Repeat the process to create another Power BI Output. This time enter the following details:
- Output alias: StockTradeTotals
- Authentication Mode: User Token
- Group Workspace: My Workspace
- Dataset name: StockTradeTotals
- Table name: StockTradeTotals -
Click Save to save your output stream and return to the Outputs panel.
-
On the Query panel, note the inputs and outputs you created in the previous steps.
-
Enter the following SQL commands in the query window.
--Total amount traded broken down by company in the last 30 seconds and calculated every 5 seconds
SELECT
Company.CompanyName
, sum(Trade.Quantity * Trade.Price) as TradedAmount
, System.Timestamp as WindowDateTime
INTO
[StockTradeByCompany]
FROM
[NYSEStockTrades] as Trade TIMESTAMP BY TradeTimestamp
INNER JOIN [NYSEStockCompanies] as Company
on Trade.StockTicker = Company.StockTicker
GROUP BY Company.CompanyName, HoppingWindow(second, 30, 5)
--Total amount traded and total number of transactions in the last 30 seconds and calculated every 5 seconds
SELECT
sum(Trade.Quantity * Trade.Price) as TotalTradedAmount
, count(*) as TotalTradeCount
, System.Timestamp as WindowDateTime
INTO
[StockTradeTotals]
FROM
[NYSEStockTrades] as Trade TIMESTAMP BY TradeTimestamp
GROUP BY HoppingWindow(second, 30, 5)
-
Click Save query.
-
On the Overview panel, click Start to start the Stream Analytics job.
-
On the Start job blade, select Now and click the Start button.
In this section you will log on to the Power BI portal to create a dashboard to visualize real-time stock transactions statistics data sent by Stream Analytics.
IMPORTANT |
---|
Execute these steps on your host computer |
-
Open a new browser tab and navigate to https://www.powerbi.com
-
Enter your credentials to authenticate with the Power BI service.
-
Once authenticated, open the Workspaces menu and click My Workspace at the top of the Workspaces list.
-
Navigate to the Datasets tab and verify that two datasets have been created by Stream Analytics: StockTradeByCompany and StockTradeTotals.
-
On the top right-hand side corner click + Create and then click Dashboard from the dropdown menu to create a new dashboard.
-
Type NYSE Trade Activity in the Dashboard name field and click Create.
-
Click on the (elipsis) ... button from the toolbar and then click on the + Add tile menu item.
-
On the Add tile blade, select Custom Streaming Data under the Real-Time Data section.
-
Click Next.
-
On the Add a custom streaming data tile blade, select the StockTradeTotals dataset.
-
Click Next.
-
On the Visualization Type field select Card.
-
On the Fields field select TotalTradedAmount.
-
Click on the brush icon to set the Value decimal places field to 2.
-
Click Next.
-
On the Tile details blade, enter the following details:
- Title: Total Traded Amount
- Subtitle: in the last 30 seconds -
Leave remaining fields with their default values. Click Apply.
-
Repeat the process to create another tile, this time to display the total trade count. Use the following details:
- Dataset: StockTradeTotals
- Visualization Type: Card
- Fields: TotalTradeCount
- Details > Title: Total Trade Count
- Details > Subtitle: in the last 30 seconds -
You should be able to see the values for both tiles changing every 5 seconds.
-
Repeat the process to create another tile, this time to display the historical values for TotalTradeCount over the last 5 minutes.
- Dataset: StockTradeTotals
- Visualization Type: Line Chart
- Axis: WindowDateTime
- Value: TotalTradeCount
- Time window to display: 5 minutes
- Details > Title: Total Trade Count
- Details > Subtitle: 5 min history window -
Repeat the process to create another tile, this time to display the total traded amount broken down by company.
- Dataset: StockTradeByCompany
- Visualization Type: Clustered bar chart
- Axis: CompanyName
- Legend: CompanyName
- Value: TradedAmount
- Details > Title: Traded Amount by Company
- Details > Subtitle: in the last 30 seconds -
Your real-time dashboard should look similar to the picture below. Every tile should be refreshed approximately every 5 seconds.