title | description | services | ms.service | author | ms.author | ms.reviewer | ms.custom | ms.topic | ms.date |
---|---|---|---|---|---|---|---|---|---|
Phoenix Query Server REST SDK - Azure HDInsight |
Install and use the REST SDK for the Phoenix Query Server in Azure HDInsight. |
hdinsight |
hdinsight |
ashishthaps |
ashishth |
jasonh |
hdinsightactive |
conceptual |
12/04/2017 |
Apache Phoenix is an open source, massively parallel relational database layer on top of Apache HBase. Phoenix allows you to use SQL-like queries with HBase through SSH tools such as SQLLine. Phoenix also provides an HTTP server called Phoenix Query Server (PQS), a thin client that supports two transport mechanisms for client communication: JSON and Protocol Buffers. Protocol Buffers is the default mechanism, and offers more efficient communication than JSON.
This article describes how to use the PQS REST SDK to create tables, upsert rows individually and in bulk, and select data using SQL statements. The examples use the Microsoft .NET driver for Apache Phoenix Query Server. This SDK is built on Apache Calcite's Avatica APIs, which exclusively use Protocol Buffers for the serialization format.
For more information, see Apache Calcite Avatica Protocol Buffers Reference.
Microsoft .NET driver for Apache Phoenix Query Server is provided as a NuGet package, which can be installed from the Visual Studio NuGet Package Manager Console with the following command:
Install-Package Microsoft.Phoenix.Client
To begin using the library, instantiate a new PhoenixClient
object, passing in ClusterCredentials
containing the Uri
to your cluster and the cluster's Apache Hadoop user name and password.
var credentials = new ClusterCredentials(new Uri("https://CLUSTERNAME.azurehdinsight.net/"), "USERNAME", "PASSWORD");
client = new PhoenixClient(credentials);
Replace CLUSTERNAME with your HDInsight HBase cluster name, and USERNAME and PASSWORD with the Hadoop credentials specified on cluster creation. The default Hadoop user name is admin.
To send one or more requests to PQS, you need to include a unique connection identifier to associate the request(s) with the connection.
string connId = Guid.NewGuid().ToString();
Each example first makes a call to the OpenConnectionRequestAsync
method, passing in the unique connection identifier. Next, define ConnectionProperties
and RequestOptions
, passing those objects and the generated connection identifier to the ConnectionSyncRequestAsync
method. PQS's ConnectionSyncRequest
object helps ensure that both the client and server have a consistent view of the database properties.
To call ConnectionSyncRequestAsync
, pass in a ConnectionProperties
object.
ConnectionProperties connProperties = new ConnectionProperties
{
HasAutoCommit = true,
AutoCommit = true,
HasReadOnly = true,
ReadOnly = false,
TransactionIsolation = 0,
Catalog = "",
Schema = "",
IsDirty = true
};
await client.ConnectionSyncRequestAsync(connId, connProperties, options);
Here are some properties of interest:
Property | Description |
---|---|
AutoCommit | A boolean denoting whether autoCommit is enabled for Phoenix transactions. |
ReadOnly | A boolean denoting whether the connection is read-only. |
TransactionIsolation | An integer denoting the level of transaction isolation per the JDBC specification - see the following table. |
Catalog | The name of the catalog to use when fetching connection properties. |
Schema | The name of the schema to use when fetching connection properties. |
IsDirty | A boolean denoting whether the properties have been altered. |
Here are the TransactionIsolation
values:
Isolation value | Description |
---|---|
0 | Transactions are not supported. |
1 | Dirty reads, non-repeatable reads, and phantom reads may occur. |
2 | Dirty reads are prevented, but non-repeatable reads and phantom reads may occur. |
4 | Dirty reads and non-repeatable reads are prevented, but phantom reads may occur. |
8 | Dirty reads, non-repeatable reads, and phantom reads are all prevented. |
HBase, like any other RDBMS, stores data in tables. Phoenix uses standard SQL queries to create new tables, while defining the primary key and column types.
This example and all subsequent examples, use the instantiated PhoenixClient
object as defined in Instantiate a new PhoenixClient object.
string connId = Guid.NewGuid().ToString();
RequestOptions options = RequestOptions.GetGatewayDefaultOptions();
// You can set certain request options, such as timeout in milliseconds:
options.TimeoutMillis = 300000;
// In gateway mode, PQS requests will be https://<cluster dns name>.azurehdinsight.net/hbasephoenix<N>/
// Requests sent to hbasephoenix0/ will be forwarded to PQS on workernode0
options.AlternativeEndpoint = "hbasephoenix0/";
CreateStatementResponse createStatementResponse = null;
OpenConnectionResponse openConnResponse = null;
try
{
// Opening connection
var info = new pbc::MapField<string, string>();
openConnResponse = await client.OpenConnectionRequestAsync(connId, info, options);
// Syncing connection
ConnectionProperties connProperties = new ConnectionProperties
{
HasAutoCommit = true,
AutoCommit = true,
HasReadOnly = true,
ReadOnly = false,
TransactionIsolation = 0,
Catalog = "",
Schema = "",
IsDirty = true
};
await client.ConnectionSyncRequestAsync(connId, connProperties, options);
// Create the statement
createStatementResponse = client.CreateStatementRequestAsync(connId, options).Result;
// Create the table if it does not exist
string sql = "CREATE TABLE IF NOT EXISTS Customers (Id varchar(20) PRIMARY KEY, FirstName varchar(50), " +
"LastName varchar(100), StateProvince char(2), Email varchar(255), Phone varchar(15))";
await client.PrepareAndExecuteRequestAsync(connId, sql, createStatementResponse.StatementId, long.MaxValue, int.MaxValue, options);
Console.WriteLine($"Table \"Customers\" created.");
}
catch (Exception e)
{
Console.WriteLine(e);
throw;
}
finally
{
if (createStatementResponse != null)
{
client.CloseStatementRequestAsync(connId, createStatementResponse.StatementId, options).Wait();
createStatementResponse = null;
}
if (openConnResponse != null)
{
client.CloseConnectionRequestAsync(connId, options).Wait();
openConnResponse = null;
}
}
The preceding example creates a new table named Customers
using the IF NOT EXISTS
option. The CreateStatementRequestAsync
call creates a new statement in the Avitica (PQS) server. The finally
block closes the returned CreateStatementResponse
and the OpenConnectionResponse
objects.
This example shows an individual data insert, referencing a List<string>
collection of American state and territory abbreviations:
var states = new List<string> { "AL", "AK", "AS", "AZ", "AR", "CA", "CO", "CT", "DE", "DC", "FM", "FL", "GA", "GU", "HI", "ID", "IL", "IN", "IA", "KS", "KY", "LA", "ME", "MH", "MD", "MA", "MI", "MN", "MS", "MO", "MT", "NE", "NV", "NH", "NJ", "NM", "NY", "NC", "ND", "MP", "OH", "OK", "OR", "PW", "PA", "PR", "RI", "SC", "SD", "TN", "TX", "UT", "VT", "VI", "VA", "WA", "WV", "WI", "WY" };
The table's StateProvince
column value will be used in a subsequent select operation.
string connId = Guid.NewGuid().ToString();
RequestOptions options = RequestOptions.GetGatewayDefaultOptions();
options.TimeoutMillis = 300000;
// In gateway mode, PQS requests will be https://<cluster dns name>.azurehdinsight.net/hbasephoenix<N>/
// Requests sent to hbasephoenix0/ will be forwarded to PQS on workernode0
options.AlternativeEndpoint = "hbasephoenix0/";
OpenConnectionResponse openConnResponse = null;
StatementHandle statementHandle = null;
try
{
// Opening connection
pbc::MapField<string, string> info = new pbc::MapField<string, string>();
openConnResponse = await client.OpenConnectionRequestAsync(connId, info, options);
// Syncing connection
ConnectionProperties connProperties = new ConnectionProperties
{
HasAutoCommit = true,
AutoCommit = true,
HasReadOnly = true,
ReadOnly = false,
TransactionIsolation = 0,
Catalog = "",
Schema = "",
IsDirty = true
};
await client.ConnectionSyncRequestAsync(connId, connProperties, options);
string sql = "UPSERT INTO Customers VALUES (?,?,?,?,?,?)";
PrepareResponse prepareResponse = await client.PrepareRequestAsync(connId, sql, 100, options);
statementHandle = prepareResponse.Statement;
var r = new Random();
// Insert 300 rows
for (int i = 0; i < 300; i++)
{
var list = new pbc.RepeatedField<TypedValue>();
var id = new TypedValue
{
StringValue = "id" + i,
Type = Rep.String
};
var firstName = new TypedValue
{
StringValue = "first" + i,
Type = Rep.String
};
var lastName = new TypedValue
{
StringValue = "last" + i,
Type = Rep.String
};
var state = new TypedValue
{
StringValue = states.ElementAt(r.Next(0, 49)),
Type = Rep.String
};
var email = new TypedValue
{
StringValue = $"email{1}@junkemail.com",
Type = Rep.String
};
var phone = new TypedValue
{
StringValue = $"555-229-341{i.ToString().Substring(0,1)}",
Type = Rep.String
};
list.Add(id);
list.Add(firstName);
list.Add(lastName);
list.Add(state);
list.Add(email);
list.Add(phone);
Console.WriteLine("Inserting customer " + i);
await client.ExecuteRequestAsync(statementHandle, list, long.MaxValue, true, options);
}
await client.CommitRequestAsync(connId, options);
Console.WriteLine("Upserted customer data");
}
catch (Exception ex)
{
}
finally
{
if (statementHandle != null)
{
await client.CloseStatementRequestAsync(connId, statementHandle.Id, options);
statementHandle = null;
}
if (openConnResponse != null)
{
await client.CloseConnectionRequestAsync(connId, options);
openConnResponse = null;
}
}
The structure for executing an insert statement is similar to creating a new table. Note that at the end of the try
block, the transaction is explicitly committed. This example repeats an insert transaction 300 times. The following example shows a more efficient batch insert process.
The following code is nearly identical to the code for inserting data individually. This example uses the UpdateBatch
object in a call to ExecuteBatchRequestAsync
, rather than repeatedly calling ExecuteRequestAsync
with a prepared statement.
string connId = Guid.NewGuid().ToString();
RequestOptions options = RequestOptions.GetGatewayDefaultOptions();
options.TimeoutMillis = 300000;
// In gateway mode, PQS requests will be https://<cluster dns name>.azurehdinsight.net/hbasephoenix<N>/
// Requests sent to hbasephoenix0/ will be forwarded to PQS on workernode0
options.AlternativeEndpoint = "hbasephoenix0/";
OpenConnectionResponse openConnResponse = null;
CreateStatementResponse createStatementResponse = null;
try
{
// Opening connection
pbc::MapField<string, string> info = new pbc::MapField<string, string>();
openConnResponse = await client.OpenConnectionRequestAsync(connId, info, options);
// Syncing connection
ConnectionProperties connProperties = new ConnectionProperties
{
HasAutoCommit = true,
AutoCommit = true,
HasReadOnly = true,
ReadOnly = false,
TransactionIsolation = 0,
Catalog = "",
Schema = "",
IsDirty = true
};
await client.ConnectionSyncRequestAsync(connId, connProperties, options);
// Creating statement
createStatementResponse = await client.CreateStatementRequestAsync(connId, options);
string sql = "UPSERT INTO Customers VALUES (?,?,?,?,?,?)";
PrepareResponse prepareResponse = client.PrepareRequestAsync(connId, sql, long.MaxValue, options).Result;
var statementHandle = prepareResponse.Statement;
var updates = new pbc.RepeatedField<UpdateBatch>();
var r = new Random();
// Insert 300 rows
for (int i = 300; i < 600; i++)
{
var list = new pbc.RepeatedField<TypedValue>();
var id = new TypedValue
{
StringValue = "id" + i,
Type = Rep.String
};
var firstName = new TypedValue
{
StringValue = "first" + i,
Type = Rep.String
};
var lastName = new TypedValue
{
StringValue = "last" + i,
Type = Rep.String
};
var state = new TypedValue
{
StringValue = states.ElementAt(r.Next(0, 49)),
Type = Rep.String
};
var email = new TypedValue
{
StringValue = $"email{1}@junkemail.com",
Type = Rep.String
};
var phone = new TypedValue
{
StringValue = $"555-229-341{i.ToString().Substring(0, 1)}",
Type = Rep.String
};
list.Add(id);
list.Add(firstName);
list.Add(lastName);
list.Add(state);
list.Add(email);
list.Add(phone);
var batch = new UpdateBatch
{
ParameterValues = list
};
updates.Add(batch);
Console.WriteLine($"Added customer {i} to batch");
}
var executeBatchResponse = await client.ExecuteBatchRequestAsync(connId, statementHandle.Id, updates, options);
Console.WriteLine("Batch upserted customer data");
}
catch (Exception ex)
{
}
finally
{
if (openConnResponse != null)
{
await client.CloseConnectionRequestAsync(connId, options);
openConnResponse = null;
}
}
In one test environment, individually inserting 300 new records took almost 2 minutes. In contrast, inserting 300 records as a batch required only 6 seconds.
This example shows how to reuse one connection to execute multiple queries:
- Select all records, and then fetch remaining records after the default maximum of 100 are returned.
- Use a total row count select statement to retrieve the single scalar result.
- Execute a select statement that returns the total number of customers per state or territory.
string connId = Guid.NewGuid().ToString();
RequestOptions options = RequestOptions.GetGatewayDefaultOptions();
// In gateway mode, PQS requests will be https://<cluster dns name>.azurehdinsight.net/hbasephoenix<N>/
// Requests sent to hbasephoenix0/ will be forwarded to PQS on workernode0
options.AlternativeEndpoint = "hbasephoenix0/";
OpenConnectionResponse openConnResponse = null;
StatementHandle statementHandle = null;
try
{
// Opening connection
pbc::MapField<string, string> info = new pbc::MapField<string, string>();
openConnResponse = await client.OpenConnectionRequestAsync(connId, info, options);
// Syncing connection
ConnectionProperties connProperties = new ConnectionProperties
{
HasAutoCommit = true,
AutoCommit = true,
HasReadOnly = true,
ReadOnly = false,
TransactionIsolation = 0,
Catalog = "",
Schema = "",
IsDirty = true
};
await client.ConnectionSyncRequestAsync(connId, connProperties, options);
var createStatementResponse = await client.CreateStatementRequestAsync(connId, options);
string sql = "SELECT * FROM Customers";
ExecuteResponse executeResponse = await client.PrepareAndExecuteRequestAsync(connId, sql, createStatementResponse.StatementId, long.MaxValue, int.MaxValue, options);
pbc::RepeatedField<Row> rows = executeResponse.Results[0].FirstFrame.Rows;
// Loop through all of the returned rows and display the first two columns
for (int i = 0; i < rows.Count; i++)
{
Row row = rows[i];
Console.WriteLine(row.Value[0].ScalarValue.StringValue + " " + row.Value[1].ScalarValue.StringValue);
}
// 100 is hard-coded on the server side as the default firstframe size
// FetchRequestAsync is called to get any remaining rows
Console.WriteLine("");
Console.WriteLine($"Number of rows: {rows.Count}");
// Fetch remaining rows, offset is not used, simply set to 0
// When FetchResponse.Frame.Done is true, all rows were fetched
FetchResponse fetchResponse = await client.FetchRequestAsync(connId, createStatementResponse.StatementId, 0, int.MaxValue, options);
Console.WriteLine($"Frame row count: {fetchResponse.Frame.Rows.Count}");
Console.WriteLine($"Fetch response is done: {fetchResponse.Frame.Done}");
Console.WriteLine("");
// Running query 2
string sql2 = "select count(*) from Customers";
ExecuteResponse countResponse = await client.PrepareAndExecuteRequestAsync(connId, sql2, createStatementResponse.StatementId, long.MaxValue, int.MaxValue, options);
long count = countResponse.Results[0].FirstFrame.Rows[0].Value[0].ScalarValue.NumberValue;
Console.WriteLine($"Total customer records: {count}");
Console.WriteLine("");
// Running query 3
string sql3 = "select StateProvince, count(*) as Number from Customers group by StateProvince order by Number desc";
ExecuteResponse groupByResponse = await client.PrepareAndExecuteRequestAsync(connId, sql3, createStatementResponse.StatementId, long.MaxValue, int.MaxValue, options);
pbc::RepeatedField<Row> stateRows = groupByResponse.Results[0].FirstFrame.Rows;
for (int i = 0; i < stateRows.Count; i++)
{
Row row = stateRows[i];
Console.WriteLine(row.Value[0].ScalarValue.StringValue + ": " + row.Value[1].ScalarValue.NumberValue);
}
}
catch (Exception ex)
{
}
finally
{
if (statementHandle != null)
{
await client.CloseStatementRequestAsync(connId, statementHandle.Id, options);
statementHandle = null;
}
if (openConnResponse != null)
{
await client.CloseConnectionRequestAsync(connId, options);
openConnResponse = null;
}
}
The output of the select
statements should be the following result:
id0 first0
id1 first1
id10 first10
id100 first100
id101 first101
id102 first102
. . .
id185 first185
id186 first186
id187 first187
id188 first188
Number of rows: 100
Frame row count: 500
Fetch response is done: True
Total customer records: 600
NJ: 21
CA: 19
GU: 17
NC: 16
IN: 16
MA: 16
AZ: 16
ME: 16
IL: 15
OR: 15
. . .
MO: 10
HI: 10
GA: 10
DC: 9
NM: 9
MD: 9
MP: 9
SC: 7
AR: 7
MH: 6
FM: 5