Skip to content

Latest commit

 

History

History
184 lines (122 loc) · 9.5 KB

python-ingest-data.md

File metadata and controls

184 lines (122 loc) · 9.5 KB
title description services author ms.author ms.reviewer ms.service ms.topic ms.date
Quickstart: Ingest data using the Azure Data Explorer Python library
In this quickstart, you learn how to ingest (load) data into Azure Data Explorer using Python.
data-explorer
orspod
v-orspod
mblythe
data-explorer
quickstart
10/16/2018

Quickstart: Ingest data using the Azure Data Explorer Python library

Azure Data Explorer is a fast and highly scalable data exploration service for log and telemetry data. Azure Data Explorer provides two client libraries for Python: an ingest library and a data library. These libraries enable you to ingest (load) data into a cluster and query data from your code. In this quickstart, you first create a table and data mapping in a test cluster. You then queue ingestion to the cluster and validate the results.

This quickstart is also available as an Azure Notebook.

If you don't have an Azure subscription, create a free Azure account before you begin.

Prerequisites

In addition to an Azure subscription, you need the following to complete this quickstart:

Install the data and ingest libraries

Install azure-kusto-data and azure-kusto-ingest.

pip install azure-kusto-data
pip install azure-kusto-ingest

Add import statements and constants

Import classes from the libraries, as well as datetime and pandas, a data analysis library.

from azure.kusto.data.request import KustoClient, KustoConnectionStringBuilder
from azure.kusto.data.exceptions import KustoServiceError
from azure.kusto.data.helpers import dataframe_from_result_table
import pandas as pd
import datetime

To authenticate an application, Azure Data Explorer uses your AAD tenant ID. To find your tenant ID, use the following URL, substituting your domain for YourDomain.

https://login.windows.net/<YourDomain>/.well-known/openid-configuration/

For example, if your domain is contoso.com, the URL is: https://login.windows.net/contoso.com/.well-known/openid-configuration/. Click this URL to see the results; the first line is as follows.

"authorization_endpoint":"https://login.windows.net/6babcaad-604b-40ac-a9d7-9fd97c0b779f/oauth2/authorize"

The tenant ID in this case is 6babcaad-604b-40ac-a9d7-9fd97c0b779f. Set the values for AAD_TENANT_ID, KUSTO_URI, KUSTO_INGEST_URI, and KUSTO_DATABASE before running this code.

AAD_TENANT_ID = "<TenantId>"
KUSTO_URI = "https://<ClusterName>.<Region>.kusto.windows.net:443/"
KUSTO_INGEST_URI = "https://ingest-<ClusterName>.<Region>.kusto.windows.net:443/"
KUSTO_DATABASE  = "<DatabaseName>"

Now construct the connection string. This example uses device authentication to access the cluster. You can also use AAD application certificate, AAD application key, and AAD user and password.

You create the destination table and mapping in a later step.

KCSB_INGEST = KustoConnectionStringBuilder.with_aad_device_authentication(KUSTO_INGEST_URI, AAD_TENANT_ID)

KCSB_DATA = KustoConnectionStringBuilder.with_aad_device_authentication(KUSTO_URI, AAD_TENANT_ID)

DESTINATION_TABLE = "StormEvents"
DESTINATION_TABLE_COLUMN_MAPPING = "StormEvents_CSV_Mapping"

Set source file information

Import additional classes and set constants for the data source file. This example uses a sample file hosted on Azure Blob Storage. The StormEvents sample data set contains weather-related data from the National Centers for Environmental Information.

from azure.storage.blob import BlockBlobService
from azure.kusto.ingest import KustoIngestClient, IngestionProperties, FileDescriptor, BlobDescriptor, DataFormat, ReportLevel, ReportMethod

CONTAINER = "samplefiles"
ACCOUNT_NAME = "kustosamplefiles"
SAS_TOKEN = "?st=2018-08-31T22%3A02%3A25Z&se=2020-09-01T22%3A02%3A00Z&sp=r&sv=2018-03-28&sr=b&sig=LQIbomcKI8Ooz425hWtjeq6d61uEaq21UVX7YrM61N4%3D"
FILE_PATH = "StormEvents.csv"
FILE_SIZE = 64158321    # in bytes

BLOB_PATH = "https://" + ACCOUNT_NAME + ".blob.core.windows.net/" + CONTAINER + "/" + FILE_PATH + SAS_TOKEN

Create a table on your test cluster

Create a table that matches the schema of the data in the StormEvents.csv file. When this code runs, it returns a message like the following: To sign in, use a web browser to open the page https://microsoft.com/devicelogin and enter the code F3W4VWZDM to authenticate. Follow the steps to sign in, then return to run the next code block. Subsequent code blocks that make a connection require you to sign in again.

KUSTO_CLIENT = KustoClient(KCSB_DATA)
CREATE_TABLE_COMMAND = ".create table StormEvents (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)"

RESPONSE = KUSTO_CLIENT.execute_mgmt(KUSTO_DATABASE, CREATE_TABLE_COMMAND)

dataframe_from_result_table(RESPONSE.primary_results[0])

Define ingestion mapping

Map incoming CSV data to the column names and data types used when creating the table.

CREATE_MAPPING_COMMAND = """.create table StormEvents ingestion csv mapping 'StormEvents_CSV_Mapping' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1},{"Name":"EpisodeId","datatype":"int","Ordinal":2},{"Name":"EventId","datatype":"int","Ordinal":3},{"Name":"State","datatype":"string","Ordinal":4},{"Name":"EventType","datatype":"string","Ordinal":5},{"Name":"InjuriesDirect","datatype":"int","Ordinal":6},{"Name":"InjuriesIndirect","datatype":"int","Ordinal":7},{"Name":"DeathsDirect","datatype":"int","Ordinal":8},{"Name":"DeathsIndirect","datatype":"int","Ordinal":9},{"Name":"DamageProperty","datatype":"int","Ordinal":10},{"Name":"DamageCrops","datatype":"int","Ordinal":11},{"Name":"Source","datatype":"string","Ordinal":12},{"Name":"BeginLocation","datatype":"string","Ordinal":13},{"Name":"EndLocation","datatype":"string","Ordinal":14},{"Name":"BeginLat","datatype":"real","Ordinal":16},{"Name":"BeginLon","datatype":"real","Ordinal":17},{"Name":"EndLat","datatype":"real","Ordinal":18},{"Name":"EndLon","datatype":"real","Ordinal":19},{"Name":"EpisodeNarrative","datatype":"string","Ordinal":20},{"Name":"EventNarrative","datatype":"string","Ordinal":21},{"Name":"StormSummary","datatype":"dynamic","Ordinal":22}]'"""

RESPONSE = KUSTO_CLIENT.execute_mgmt(KUSTO_DATABASE, CREATE_MAPPING_COMMAND)

dataframe_from_result_table(RESPONSE.primary_results[0])

Queue a message for ingestion

Queue a message to pull data from blob storage and ingest that data into Azure Data Explorer.

INGESTION_CLIENT = KustoIngestClient(KCSB_INGEST)

# All ingestion properties are documented here: https://docs.microsoft.com/azure/kusto/management/data-ingest#ingestion-properties
INGESTION_PROPERTIES  = IngestionProperties(database=KUSTO_DATABASE, table=DESTINATION_TABLE, dataFormat=DataFormat.csv, mappingReference=DESTINATION_TABLE_COLUMN_MAPPING, additionalProperties={'ignoreFirstRecord': 'true'})
BLOB_DESCRIPTOR = BlobDescriptor(BLOB_PATH, FILE_SIZE)  # 10 is the raw size of the data in bytes
INGESTION_CLIENT.ingest_from_blob(BLOB_DESCRIPTOR,ingestion_properties=INGESTION_PROPERTIES)

print('Done queuing up ingestion with Azure Data Explorer')

Validate that data was ingested into the table

Wait for five to ten minutes for the queued ingestion to schedule the ingest and load the data into Azure Data Explorer. Then run the following code to get the count of records in the StormEvents table.

QUERY = "StormEvents | count"

RESPONSE = KUSTO_CLIENT.execute_query(KUSTO_DATABASE, QUERY)

dataframe_from_result_table(RESPONSE.primary_results[0])

Run troubleshooting queries

Sign in to https://dataexplorer.azure.com and connect to your cluster. Run the following command in your database to see if there were any ingestion failures in the last four hours. Replace the database name before running.

.show ingestion failures
| where FailedOn > ago(4h) and Database == "<DatabaseName>"

Run the following command to view the status of all ingestion operations in the last four hours. Replace the database name before running.

.show operations
| where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull"
| summarize arg_max(LastUpdatedOn, *) by OperationId

Clean up resources

If you plan to follow our other quickstarts and tutorials, keep the resources you created. If not, run the following command in your database to clean up the StormEvents table.

.drop table StormEvents

Next steps

[!div class="nextstepaction"] Write queries