title | description | services | author | manager | editor | ms.service | ms.component | ms.topic | ms.date | ms.author | ms.custom |
---|---|---|---|---|---|---|---|---|---|---|---|
Create features for data in an Hadoop cluster using Hive queries | Microsoft Docs |
Examples of Hive queries that generate features in data stored in an Azure HDInsight Hadoop cluster. |
machine-learning |
marktab |
cgronlun |
cgronlun |
machine-learning |
team-data-science-process |
article |
11/21/2017 |
tdsp |
(previous author=deguhath, ms.author=deguhath) |
This document shows how to create features for data stored in an Azure HDInsight Hadoop cluster using Hive queries. These Hive queries use embedded Hive User-Defined Functions (UDFs), the scripts for which are provided.
The operations needed to create features can be memory intensive. The performance of Hive queries becomes more critical in such cases and can be improved by tuning certain parameters. The tuning of these parameters is discussed in the final section.
Examples of the queries that are presented are specific to the NYC Taxi Trip Data scenarios are also provided in GitHub repository. These queries already have data schema specified and are ready to be submitted to run. In the final section, parameters that users can tune so that the performance of Hive queries can be improved are also discussed.
This task is a step in the Team Data Science Process (TDSP).
This article assumes that you have:
- Created an Azure storage account. If you need instructions, see Create an Azure Storage account
- Provisioned a customized Hadoop cluster with the HDInsight service. If you need instructions, see Customize Azure HDInsight Hadoop Clusters for Advanced Analytics.
- The data has been uploaded to Hive tables in Azure HDInsight Hadoop clusters. If it has not, follow Create and load data to Hive tables to upload data to Hive tables first.
- Enabled remote access to the cluster. If you need instructions, see Access the Head Node of Hadoop Cluster.
In this section, several examples of the ways in which features can be generating using Hive queries are described. Once you have generated additional features, you can either add them as columns to the existing table or create a new table with the additional features and primary key, which can then be joined with the original table. Here are the examples presented:
- Frequency-based Feature Generation
- Risks of Categorical Variables in Binary Classification
- Extract features from Datetime Field
- Extract features from Text Field
- Calculate distance between GPS coordinates
It is often useful to calculate the frequencies of the levels of a categorical variable, or the frequencies of certain combinations of levels from multiple categorical variables. Users can use the following script to calculate these frequencies:
select
a.<column_name1>, a.<column_name2>, a.sub_count/sum(a.sub_count) over () as frequency
from
(
select
<column_name1>,<column_name2>, count(*) as sub_count
from <databasename>.<tablename> group by <column_name1>, <column_name2>
)a
order by frequency desc;
In binary classification, non-numeric categorical variables must be converted into numeric features when the models being used only take numeric features. This conversion is done by replacing each non-numeric level with a numeric risk. This section shows some generic Hive queries that calculate the risk values (log odds) of a categorical variable.
set smooth_param1=1;
set smooth_param2=20;
select
<column_name1>,<column_name2>,
ln((sum_target+${hiveconf:smooth_param1})/(record_count-sum_target+${hiveconf:smooth_param2}-${hiveconf:smooth_param1})) as risk
from
(
select
<column_nam1>, <column_name2>, sum(binary_target) as sum_target, sum(1) as record_count
from
(
select
<column_name1>, <column_name2>, if(target_column>0,1,0) as binary_target
from <databasename>.<tablename>
)a
group by <column_name1>, <column_name2>
)b
In this example, variables smooth_param1
and smooth_param2
are set to smooth the risk values calculated from the data. Risks have a range between -Inf and Inf. A risk > 0 indicates that the probability that the target is equal to 1 is greater than 0.5.
After the risk table is calculated, users can assign risk values to a table by joining it with the risk table. The Hive joining query was provided in previous section.
Hive comes with a set of UDFs for processing datetime fields. In Hive, the default datetime format is 'yyyy-MM-dd 00:00:00' ('1970-01-01 12:21:32' for example). This section shows examples that extract the day of a month, the month from a datetime field, and other examples that convert a datetime string in a format other than the default format to a datetime string in default format.
select day(<datetime field>), month(<datetime field>)
from <databasename>.<tablename>;
This Hive query assumes that the is in the default datetime format.
If a datetime field is not in the default format, you need to convert the datetime field into Unix time stamp first, and then convert the Unix time stamp to a datetime string that is in the default format. When the datetime is in default format, users can apply the embedded datetime UDFs to extract features.
select from_unixtime(unix_timestamp(<datetime field>,'<pattern of the datetime field>'))
from <databasename>.<tablename>;
In this query, if the has the pattern like 03/26/2015 12:04:39, the ' should be 'MM/dd/yyyy HH:mm:ss'
. To test it, users can run
select from_unixtime(unix_timestamp('05/15/2015 09:32:10','MM/dd/yyyy HH:mm:ss'))
from hivesampletable limit 1;
The hivesampletable in this query comes preinstalled on all Azure HDInsight Hadoop clusters by default when the clusters are provisioned.
When the Hive table has a text field that contains a string of words that are delimited by spaces, the following query extracts the length of the string, and the number of words in the string.
select length(<text field>) as str_len, size(split(<text field>,' ')) as word_num
from <databasename>.<tablename>;
The query given in this section can be directly applied to the NYC Taxi Trip Data. The purpose of this query is to show how to apply an embedded mathematical function in Hive to generate features.
The fields that are used in this query are the GPS coordinates of pickup and dropoff locations, named pickup_longitude, pickup_latitude, dropoff_longitude, and dropoff_latitude. The queries that calculate the direct distance between the pickup and dropoff coordinates are:
set R=3959;
set pi=radians(180);
select pickup_longitude, pickup_latitude, dropoff_longitude, dropoff_latitude,
${hiveconf:R}*2*2*atan((1-sqrt(1-pow(sin((dropoff_latitude-pickup_latitude)
*${hiveconf:pi}/180/2),2)-cos(pickup_latitude*${hiveconf:pi}/180)
*cos(dropoff_latitude*${hiveconf:pi}/180)*pow(sin((dropoff_longitude-pickup_longitude)*${hiveconf:pi}/180/2),2)))
/sqrt(pow(sin((dropoff_latitude-pickup_latitude)*${hiveconf:pi}/180/2),2)
+cos(pickup_latitude*${hiveconf:pi}/180)*cos(dropoff_latitude*${hiveconf:pi}/180)*
pow(sin((dropoff_longitude-pickup_longitude)*${hiveconf:pi}/180/2),2))) as direct_distance
from nyctaxi.trip
where pickup_longitude between -90 and 0
and pickup_latitude between 30 and 90
and dropoff_longitude between -90 and 0
and dropoff_latitude between 30 and 90
limit 10;
The mathematical equations that calculate the distance between two GPS coordinates can be found on the Movable Type Scripts site, authored by Peter Lapisu. In this Javascript, the function toRad()
is just lat_or_lonpi/180*, which converts degrees to radians. Here, lat_or_lon is the latitude or longitude. Since Hive does not provide the function atan2
, but provides the function atan
, the atan2
function is implemented by atan
function in the above Hive query using the definition provided in Wikipedia.
A full list of Hive embedded UDFs can be found in the Built-in Functions section on the Apache Hive wiki).
The default parameter settings of Hive cluster might not be suitable for the Hive queries and the data that the queries are processing. This section discusses some parameters that users can tune to improve the performance of Hive queries. Users need to add the parameter tuning queries before the queries of processing data.
-
Java heap space: For queries involving joining large datasets, or processing long records, running out of heap space is one of the common errors. This error can be avoided by setting parameters mapreduce.map.java.opts and mapreduce.task.io.sort.mb to desired values. Here is an example:
set mapreduce.map.java.opts=-Xmx4096m; set mapreduce.task.io.sort.mb=-Xmx1024m;
This parameter allocates 4GB memory to Java heap space and also makes sorting more efficient by allocating more memory for it. It is a good idea to play with these allocations if there are any job failure errors related to heap space.
-
DFS block size: This parameter sets the smallest unit of data that the file system stores. As an example, if the DFS block size is 128 MB, then any data of size less than and up to 128 MB is stored in a single block. Data that is larger than 128 MB is allotted extra blocks.
-
Choosing a small block size causes large overheads in Hadoop since the name node has to process many more requests to find the relevant block pertaining to the file. A recommended setting when dealing with gigabytes (or larger) data is:
set dfs.block.size=128m;
-
Optimizing join operation in Hive: While join operations in the map/reduce framework typically take place in the reduce phase, sometimes, enormous gains can be achieved by scheduling joins in the map phase (also called "mapjoins"). To direct Hive to do this whenever possible, set:
set hive.auto.convert.join=true;
-
Specifying the number of mappers to Hive: While Hadoop allows the user to set the number of reducers, the number of mappers is typically not be set by the user. A trick that allows some degree of control on this number is to choose the Hadoop variables mapred.min.split.size and mapred.max.split.size as the size of each map task is determined by:
num_maps = max(mapred.min.split.size, min(mapred.max.split.size, dfs.block.size))
Typically, the default value of:
- mapred.min.split.size is 0, that of
- mapred.max.split.size is Long.MAX and that of
- dfs.block.size is 64 MB.
As we can see, given the data size, tuning these parameters by "setting" them allows us to tune the number of mappers used.
-
Here are a few other more advanced options for optimizing Hive performance. These allow you to set the memory allocated to map and reduce tasks, and can be useful in tweaking performance. Keep in mind that the mapreduce.reduce.memory.mb cannot be greater than the physical memory size of each worker node in the Hadoop cluster.
set mapreduce.map.memory.mb = 2048; set mapreduce.reduce.memory.mb=6144; set mapreduce.reduce.java.opts=-Xmx8192m; set mapred.reduce.tasks=128; set mapred.tasktracker.reduce.tasks.maximum=128;