title | description | services | documentationcenter | author | manager | editor | tags | ms.assetid | ms.service | ms.custom | ms.workload | ms.tgt_pltfrm | ms.devlang | ms.topic | ms.date | ms.author |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Use ScaleR and SparkR with Azure HDInsight | Microsoft Docs |
Use ScaleR and SparkR with R Server and HDInsight |
hdinsight |
bradsev |
jhubbard |
cgronlun |
azure-portal |
5a76f897-02e8-4437-8f2b-4fb12225854a |
hdinsight |
hdinsightactive |
big-data |
na |
na |
article |
06/19/2017 |
bradsev |
This article shows how to predict flight arrival delays using a ScaleR logistic regression model from data on flight delays and weather joined with SparkR. This scenario demonstrates the capabilities of ScaleR for data manipulation on Spark used with Microsoft R Server for analytics. The combination of these technologies enables you to apply the latest capabilities in distributed processing.
Although both packages run on Hadoop’s Spark execution engine, they are blocked from in-memory data sharing as they each require their own respective Spark sessions. Until this issue is addressed in an upcoming version of R Server, the workaround is to maintain non-overlapping Spark sessions, and to exchange data through intermediate files. The instructions here show that these requirements are straightforward to achieve.
We use an example here initially shared in a talk at Strata 2016 by Mario Inchiosa and Roni Burd that is also available through the webinar Building a Scalable Data Science Platform with R. The example uses SparkR to join the well-known airlines arrival delay data set with weather data at departure and arrival airports. The data joined is then used as input to a ScaleR logistic regression model for predicting flight arrival delay.
The code we walkthrough was originally written for R Server running on Spark in an HDInsight cluster on Azure. But the concept of mixing the use of SparkR and ScaleR in one script is also valid in the context of on-premises environments. In the following, we presume an intermediate level of knowledge of R and R the ScaleR library of R Server. We also introduce use of SparkR while walking through this scenario.
The AirOnTime08to12CSV airlines public dataset contains information on flight arrival and departure details for all commercial flights within the USA, from October 1987 to December 2012. This is a large dataset: there are nearly 150 million records in total. It is just under 4 GB unpacked. It is available from the U.S. government archives. More conveniently, it is available as a zip file (AirOnTimeCSV.zip) containing a set of 303 separate monthly CSV files from the Revolution Analytics dataset repository
To see the effects of weather on flight delays, we also need the weather data at each of the airports. This data can be downloaded as zip files in raw form, by month, from the National Oceanic and Atmospheric Administration repository. For the purposes of this example, we pull weather data from May 2007 – December 2012 and used the hourly data files within each of the 68 monthly zips. The monthly zip files also contain a mapping (YYYYMMstation.txt) between the weather station ID (WBAN), the airport that it is associated with (CallSign), and the airport’s time zone offset from UTC (TimeZone). All of this information is needed when joining with the airline delay and weather data.
The first step is to set up the Spark environment. We begin by pointing to the directory that contains our input data directories, creating a Spark compute context, and creating a logging function for informational logging to the console:
workDir <- '~'
myNameNode <- 'default'
myPort <- 0
inputDataDir <- 'wasb://[email protected]'
hdfsFS <- RxHdfsFileSystem(hostName=myNameNode, port=myPort)
# create a persistent Spark session to reduce startup times
# (remember to stop it later!)
sparkCC <- RxSpark(consoleOutput=TRUE, nameNode=myNameNode, port=myPort, persistentRun=TRUE)
# create working directories
rxHadoopMakeDir('/user')
rxHadoopMakeDir('user/RevoShare')
rxHadoopMakeDir('user/RevoShare/remoteuser')
(dataDir <- '/share')
rxHadoopMakeDir(dataDir)
rxHadoopListFiles(dataDir)
setwd(workDir)
getwd()
# version of rxRoc that runs in a local CC
rxRoc <- function(...){
rxSetComputeContext(RxLocalSeq())
roc <- RevoScaleR::rxRoc(...)
rxSetComputeContext(sparkCC)
return(roc)
}
logmsg <- function(msg) { cat(format(Sys.time(), "%Y-%m-%d %H:%M:%S"),':',msg,'\n') }
t0 <- proc.time()
#..start
logmsg('Start')
(trackers <- system("mapred job -list-active-trackers", intern = TRUE))
logmsg(paste('Number of task nodes=',length(trackers)))
Next we add “Spark_Home” to the search path for R packages so that we can use SparkR, and initialize a SparkR session:
#..setup for use of SparkR
logmsg('Initialize SparkR')
.libPaths(c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"), .libPaths()))
library(SparkR)
sparkEnvir <- list(spark.executor.instances = '10',
spark.yarn.executor.memoryOverhead = '8000')
sc <- sparkR.init(
sparkEnvir = sparkEnvir,
sparkPackages = "com.databricks:spark-csv_2.10:1.3.0"
)
sqlContext <- sparkRSQL.init(sc)
To prepare the weather data, we subset it to the columns needed for modeling:
- "Visibility"
- "DryBulbCelsius"
- "DewPointCelsius"
- "RelativeHumidity"
- "WindSpeed"
- "Altimeter"
Then we add an airport code associated with the weather station and convert the measurements from local time to UTC.
We begin by creating a file to map the weather station (WBAN) info to an airport code. We could get this correlation from the mapping file included with the weather data. By mapping the CallSign (for example, LAX) field in the weather data file to Origin in the airline data. However, we just happened to have another mapping on hand that maps WBAN to AirportID (for example, 12892 for LAX) and includes TimeZone that has been saved to a CSV file called “wban-to-airport-id-tz.CSV” that we can use. For example:
AirportID | WBAN | TimeZone |
---|---|---|
10685 | 54831 | -6 |
14871 | 24232 | -8 |
.. | .. | .. |
The following code reads each of the hourly raw weather data files, subsets to the columns we need, merges the weather station mapping file, adjusts the date times of measurements to UTC, and then writes out a new version of the file:
# Look up AirportID and Timezone for WBAN (weather station ID) and adjust time
adjustTime <- function(dataList)
{
dataset0 <- as.data.frame(dataList)
dataset1 <- base::merge(dataset0, wbanToAirIDAndTZDF1, by = "WBAN")
if(nrow(dataset1) == 0) {
dataset1 <- data.frame(
Visibility = numeric(0),
DryBulbCelsius = numeric(0),
DewPointCelsius = numeric(0),
RelativeHumidity = numeric(0),
WindSpeed = numeric(0),
Altimeter = numeric(0),
AdjustedYear = numeric(0),
AdjustedMonth = numeric(0),
AdjustedDay = integer(0),
AdjustedHour = integer(0),
AirportID = integer(0)
)
return(dataset1)
}
Year <- as.integer(substr(dataset1$Date, 1, 4))
Month <- as.integer(substr(dataset1$Date, 5, 6))
Day <- as.integer(substr(dataset1$Date, 7, 8))
Time <- dataset1$Time
Hour <- ceiling(Time/100)
Timezone <- as.integer(dataset1$TimeZone)
adjustdate = as.POSIXlt(sprintf("%4d-%02d-%02d %02d:00:00", Year, Month, Day, Hour), tz = "UTC") + Timezone * 3600
AdjustedYear = as.POSIXlt(adjustdate)$year + 1900
AdjustedMonth = as.POSIXlt(adjustdate)$mon + 1
AdjustedDay = as.POSIXlt(adjustdate)$mday
AdjustedHour = as.POSIXlt(adjustdate)$hour
AirportID = dataset1$AirportID
Weather = dataset1[,c("Visibility", "DryBulbCelsius", "DewPointCelsius", "RelativeHumidity", "WindSpeed", "Altimeter")]
data.set = data.frame(cbind(AdjustedYear, AdjustedMonth, AdjustedDay, AdjustedHour, AirportID, Weather))
return(data.set)
}
wbanToAirIDAndTZDF <- read.csv("wban-to-airport-id-tz.csv")
colInfo <- list(
WBAN = list(type="integer"),
Date = list(type="character"),
Time = list(type="integer"),
Visibility = list(type="numeric"),
DryBulbCelsius = list(type="numeric"),
DewPointCelsius = list(type="numeric"),
RelativeHumidity = list(type="numeric"),
WindSpeed = list(type="numeric"),
Altimeter = list(type="numeric")
)
weatherDF <- RxTextData(file.path(inputDataDir, "WeatherRaw"), colInfo = colInfo)
weatherDF1 <- RxTextData(file.path(inputDataDir, "Weather"), colInfo = colInfo,
filesystem=hdfsFS)
rxSetComputeContext("localpar")
rxDataStep(weatherDF, outFile = weatherDF1, rowsPerRead = 50000, overwrite = T,
transformFunc = adjustTime,
transformObjects = list(wbanToAirIDAndTZDF1 = wbanToAirIDAndTZDF))
Now we use the SparkR read.df() function to import the weather and airline data to Spark DataFrames. This function, like many other Spark methods, are executed lazily, meaning that they are queued for execution but not executed until required.
airPath <- file.path(inputDataDir, "AirOnTime08to12CSV")
weatherPath <- file.path(inputDataDir, "Weather") # pre-processed weather data
rxHadoopListFiles(airPath)
rxHadoopListFiles(weatherPath)
# create a SparkR DataFrame for the airline data
logmsg('create a SparkR DataFrame for the airline data')
# use inferSchema = "false" for more robust parsing
airDF <- read.df(sqlContext, airPath, source = "com.databricks.spark.csv",
header = "true", inferSchema = "false")
# Create a SparkR DataFrame for the weather data
logmsg('create a SparkR DataFrame for the weather data')
weatherDF <- read.df(sqlContext, weatherPath, source = "com.databricks.spark.csv",
header = "true", inferSchema = "true")
Next we do some cleanup on the airline data we’ve imported to rename columns. We only keep the variables needed, and round scheduled departure times down to the nearest hour to enable merging with the latest weather data at departure:
logmsg('clean the airline data')
airDF <- rename(airDF,
ArrDel15 = airDF$ARR_DEL15,
Year = airDF$YEAR,
Month = airDF$MONTH,
DayofMonth = airDF$DAY_OF_MONTH,
DayOfWeek = airDF$DAY_OF_WEEK,
Carrier = airDF$UNIQUE_CARRIER,
OriginAirportID = airDF$ORIGIN_AIRPORT_ID,
DestAirportID = airDF$DEST_AIRPORT_ID,
CRSDepTime = airDF$CRS_DEP_TIME,
CRSArrTime = airDF$CRS_ARR_TIME
)
# Select desired columns from the flight data.
varsToKeep <- c("ArrDel15", "Year", "Month", "DayofMonth", "DayOfWeek", "Carrier", "OriginAirportID", "DestAirportID", "CRSDepTime", "CRSArrTime")
airDF <- select(airDF, varsToKeep)
# Apply schema
coltypes(airDF) <- c("character", "integer", "integer", "integer", "integer", "character", "integer", "integer", "integer", "integer")
# Round down scheduled departure time to full hour.
airDF$CRSDepTime <- floor(airDF$CRSDepTime / 100)
Now we perform similar operations on the weather data:
# Average weather readings by hour
logmsg('clean the weather data')
weatherDF <- agg(groupBy(weatherDF, "AdjustedYear", "AdjustedMonth", "AdjustedDay", "AdjustedHour", "AirportID"), Visibility="avg",
DryBulbCelsius="avg", DewPointCelsius="avg", RelativeHumidity="avg", WindSpeed="avg", Altimeter="avg"
)
weatherDF <- rename(weatherDF,
Visibility = weatherDF$'avg(Visibility)',
DryBulbCelsius = weatherDF$'avg(DryBulbCelsius)',
DewPointCelsius = weatherDF$'avg(DewPointCelsius)',
RelativeHumidity = weatherDF$'avg(RelativeHumidity)',
WindSpeed = weatherDF$'avg(WindSpeed)',
Altimeter = weatherDF$'avg(Altimeter)'
)
We now use the SparkR join() function to do a left outer join of the airline and weather data by departure AirportID and datetime. The outer join allows us to retain all the airline data records even if there is no matching weather data. Following the join, we remove some redundant columns, and rename the kept columns to remove the incoming DataFrame prefix introduced by the join.
logmsg('Join airline data with weather at Origin Airport')
joinedDF <- SparkR::join(
airDF,
weatherDF,
airDF$OriginAirportID == weatherDF$AirportID &
airDF$Year == weatherDF$AdjustedYear &
airDF$Month == weatherDF$AdjustedMonth &
airDF$DayofMonth == weatherDF$AdjustedDay &
airDF$CRSDepTime == weatherDF$AdjustedHour,
joinType = "left_outer"
)
# Remove redundant columns
vars <- names(joinedDF)
varsToDrop <- c('AdjustedYear', 'AdjustedMonth', 'AdjustedDay', 'AdjustedHour', 'AirportID')
varsToKeep <- vars[!(vars %in% varsToDrop)]
joinedDF1 <- select(joinedDF, varsToKeep)
joinedDF2 <- rename(joinedDF1,
VisibilityOrigin = joinedDF1$Visibility,
DryBulbCelsiusOrigin = joinedDF1$DryBulbCelsius,
DewPointCelsiusOrigin = joinedDF1$DewPointCelsius,
RelativeHumidityOrigin = joinedDF1$RelativeHumidity,
WindSpeedOrigin = joinedDF1$WindSpeed,
AltimeterOrigin = joinedDF1$Altimeter
)
In a similar fashion, we join the weather and airline data based on arrival AirportID and datetime:
logmsg('Join airline data with weather at Destination Airport')
joinedDF3 <- SparkR::join(
joinedDF2,
weatherDF,
airDF$DestAirportID == weatherDF$AirportID &
airDF$Year == weatherDF$AdjustedYear &
airDF$Month == weatherDF$AdjustedMonth &
airDF$DayofMonth == weatherDF$AdjustedDay &
airDF$CRSDepTime == weatherDF$AdjustedHour,
joinType = "left_outer"
)
# Remove redundant columns
vars <- names(joinedDF3)
varsToDrop <- c('AdjustedYear', 'AdjustedMonth', 'AdjustedDay', 'AdjustedHour', 'AirportID')
varsToKeep <- vars[!(vars %in% varsToDrop)]
joinedDF4 <- select(joinedDF3, varsToKeep)
joinedDF5 <- rename(joinedDF4,
VisibilityDest = joinedDF4$Visibility,
DryBulbCelsiusDest = joinedDF4$DryBulbCelsius,
DewPointCelsiusDest = joinedDF4$DewPointCelsius,
RelativeHumidityDest = joinedDF4$RelativeHumidity,
WindSpeedDest = joinedDF4$WindSpeed,
AltimeterDest = joinedDF4$Altimeter
)
That completes the joins we need to do with SparkR. We save the data from the final Spark DataFrame “joinedDF5” to a CSV for input to ScaleR and then close out the SparkR session. We explicitly tell SparkR to save the resultant CSV in 80 separate partitions to enable sufficient parallelism in ScaleR processing:
logmsg('output the joined data from Spark to CSV')
joinedDF5 <- repartition(joinedDF5, 80) # write.df below will produce this many CSVs
# write result to directory of CSVs
write.df(joinedDF5, file.path(dataDir, "joined5Csv"), "com.databricks.spark.csv", "overwrite", header = "true")
# We can shut down the SparkR Spark context now
sparkR.stop()
# remove non-data files
rxHadoopRemove(file.path(dataDir, "joined5Csv/_SUCCESS"))
We could use the CSV file of joined airline and weather data as-is for modeling via a ScaleR text data source. But we import it to XDF first, since it is more efficient when running multiple operations on the dataset:
logmsg('Import the CSV to compressed, binary XDF format')
# set the Spark compute context for R Server
rxSetComputeContext(sparkCC)
rxGetComputeContext()
colInfo <- list(
ArrDel15 = list(type="numeric"),
Year = list(type="factor"),
Month = list(type="factor"),
DayofMonth = list(type="factor"),
DayOfWeek = list(type="factor"),
Carrier = list(type="factor"),
OriginAirportID = list(type="factor"),
DestAirportID = list(type="factor"),
RelativeHumidityOrigin = list(type="numeric"),
AltimeterOrigin = list(type="numeric"),
DryBulbCelsiusOrigin = list(type="numeric"),
WindSpeedOrigin = list(type="numeric"),
VisibilityOrigin = list(type="numeric"),
DewPointCelsiusOrigin = list(type="numeric"),
RelativeHumidityDest = list(type="numeric"),
AltimeterDest = list(type="numeric"),
DryBulbCelsiusDest = list(type="numeric"),
WindSpeedDest = list(type="numeric"),
VisibilityDest = list(type="numeric"),
DewPointCelsiusDest = list(type="numeric")
)
joinedDF5Txt <- RxTextData(file.path(dataDir, "joined5Csv"),
colInfo = colInfo, fileSystem = hdfsFS)
rxGetInfo(joinedDF5Txt)
destData <- RxXdfData(file.path(dataDir, "joined5XDF"), fileSystem = hdfsFS)
rxImport(inData = joinedDF5Txt, destData, overwrite = TRUE)
rxGetInfo(destData, getVarInfo = T)
# File name: /user/RevoShare/dev/delayDataLarge/joined5XDF
# Number of composite data files: 80
# Number of observations: 148619655
# Number of variables: 22
# Number of blocks: 320
# Compression type: zlib
# Variable information:
# Var 1: ArrDel15, Type: numeric, Low/High: (0.0000, 1.0000)
# Var 2: Year
# 26 factor levels: 1987 1988 1989 1990 1991 ... 2008 2009 2010 2011 2012
# Var 3: Month
# 12 factor levels: 10 11 12 1 2 ... 5 6 7 8 9
# Var 4: DayofMonth
# 31 factor levels: 1 3 4 5 7 ... 29 30 2 18 31
# Var 5: DayOfWeek
# 7 factor levels: 4 6 7 1 3 2 5
# Var 6: Carrier
# 30 factor levels: PI UA US AA DL ... HA F9 YV 9E VX
# Var 7: OriginAirportID
# 374 factor levels: 15249 12264 11042 15412 13930 ... 13341 10559 14314 11711 10558
# Var 8: DestAirportID
# 378 factor levels: 13303 14492 10721 11057 13198 ... 14802 11711 11931 12899 10559
# Var 9: CRSDepTime, Type: integer, Low/High: (0, 24)
# Var 10: CRSArrTime, Type: integer, Low/High: (0, 2400)
# Var 11: RelativeHumidityOrigin, Type: numeric, Low/High: (0.0000, 100.0000)
# Var 12: AltimeterOrigin, Type: numeric, Low/High: (28.1700, 31.1600)
# Var 13: DryBulbCelsiusOrigin, Type: numeric, Low/High: (-46.1000, 47.8000)
# Var 14: WindSpeedOrigin, Type: numeric, Low/High: (0.0000, 81.0000)
# Var 15: VisibilityOrigin, Type: numeric, Low/High: (0.0000, 90.0000)
# Var 16: DewPointCelsiusOrigin, Type: numeric, Low/High: (-41.7000, 29.0000)
# Var 17: RelativeHumidityDest, Type: numeric, Low/High: (0.0000, 100.0000)
# Var 18: AltimeterDest, Type: numeric, Low/High: (28.1700, 31.1600)
# Var 19: DryBulbCelsiusDest, Type: numeric, Low/High: (-46.1000, 53.9000)
# Var 20: WindSpeedDest, Type: numeric, Low/High: (0.0000, 136.0000)
# Var 21: VisibilityDest, Type: numeric, Low/High: (0.0000, 88.0000)
# Var 22: DewPointCelsiusDest, Type: numeric, Low/High: (-43.0000, 29.0000)
finalData <- RxXdfData(file.path(dataDir, "joined5XDF"), fileSystem = hdfsFS)
We use rxDataStep to split out the 2012 data for testing and keep the rest for training:
# split out the training data
logmsg('split out training data as all data except year 2012')
trainDS <- RxXdfData( file.path(dataDir, "finalDataTrain" ),fileSystem = hdfsFS)
rxDataStep( inData = finalData, outFile = trainDS,
rowSelection = ( Year != 2012 ), overwrite = T )
# split out the testing data
logmsg('split out the test data for year 2012')
testDS <- RxXdfData( file.path(dataDir, "finalDataTest" ), fileSystem = hdfsFS)
rxDataStep( inData = finalData, outFile = testDS,
rowSelection = ( Year == 2012 ), overwrite = T )
rxGetInfo(trainDS)
rxGetInfo(testDS)
Now we are ready to build a model. To see the influence of weather data on delay in the arrival time, we use ScaleR’s logistic regression routine. We use it to model whether an arrival delay of greater than 15 minutes is influenced by the weather at the departure and arrival airports:
logmsg('train a logistic regression model for Arrival Delay > 15 minutes')
formula <- as.formula(ArrDel15 ~ Year + Month + DayofMonth + DayOfWeek + Carrier +
OriginAirportID + DestAirportID + CRSDepTime + CRSArrTime +
RelativeHumidityOrigin + AltimeterOrigin + DryBulbCelsiusOrigin +
WindSpeedOrigin + VisibilityOrigin + DewPointCelsiusOrigin +
RelativeHumidityDest + AltimeterDest + DryBulbCelsiusDest +
WindSpeedDest + VisibilityDest + DewPointCelsiusDest
)
# Use the scalable rxLogit() function but set max iterations to 3 for the purposes of
# this exercise
logitModel <- rxLogit(formula, data = trainDS, maxIterations = 3)
base::summary(logitModel)
Now let’s see how it does on the test data by making some predictions and looking at ROC and AUC.
# Predict over test data (Logistic Regression).
logmsg('predict over the test data')
logitPredict <- RxXdfData(file.path(dataDir, "logitPredict"), fileSystem = hdfsFS)
# Use the scalable rxPredict() function
rxPredict(logitModel, data = testDS, outData = logitPredict,
extraVarsToWrite = c("ArrDel15"),
type = 'response', overwrite = TRUE)
# Calculate ROC and Area Under the Curve (AUC).
logmsg('calculate the roc and auc')
logitRoc <- rxRoc("ArrDel15", "ArrDel15_Pred", logitPredict)
logitAuc <- rxAuc(logitRoc)
head(logitAuc)
logitAuc
plot(logitRoc)
We can also use the model for scoring data on another platform. By saving it to an RDS file and then transferring and importing that RDS into a destination scoring environment such as SQL Server R Services. It is important to ensure that the factor levels of the data to be scored match those on which the model was built. That match can be achieved by extracting and saving the column infomation associated with the modeling data via ScaleR’s rxCreateColInfo()
function and then applying that column information to the input data source for prediction. In the following we save a few rows of the test dataset and extract and use the column information from this sample in the prediction script:
# save the model and a sample of the test dataset
logmsg('save serialized version of the model and a sample of the test data')
rxSetComputeContext('localpar')
saveRDS(logitModel, file = "logitModel.rds")
testDF <- head(testDS, 1000)
saveRDS(testDF , file = "testDF.rds" )
list.files()
rxHadoopListFiles(file.path(inputDataDir,''))
rxHadoopListFiles(dataDir)
# stop the spark engine
rxStopEngine(sparkCC)
logmsg('Done.')
elapsed <- (proc.time() - t0)[3]
logmsg(paste('Elapsed time=',sprintf('%6.2f',elapsed),'(sec)\n\n'))
In this article, we’ve shown how it’s possible to combine use of SparkR for data manipulation with ScaleR for model development in Hadoop Spark. This scenario requires that you maintain separate Spark sessions, only running one session at a time, and exchange data via CSV files. Although straightforward, this process should be even easier in an upcoming R Server release, when SparkR and ScaleR can share a Spark session and so share Spark DataFrames.
-
For more information on use of R Server on Spark, see the Getting started guide on MSDN
-
For general information on R Server, see the Get started with R article.
-
For information on R Server on HDInsight, see R Server on Azure HDInsight overview and R Server on Azure HDInsight.
For more information on use of SparkR, see:
-
SparkR Overview from Databricks