-
Notifications
You must be signed in to change notification settings - Fork 55
/
Copy pathdata.Rmd
702 lines (530 loc) · 41.9 KB
/
data.Rmd
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
```{r include=FALSE}
knitr::opts_chunk$set(eval = FALSE)
source("r/render.R")
source("r/plots.R")
library(ggplot2)
```
# Data {#data}
> Has it occurred to you that she might not have been a reliable source of information?
>
> --- Jon Snow
With the knowledge acquired in previous chapters, you are now equipped to start doing analysis and modeling at scale! So far, however, we haven’t really explained much about how to read data into Spark. We’ve explored how to use `copy_to()` to upload small datasets or functions like `spark_read_csv()` or `spark_write_csv()` without explaining in detail how and why.
So, you are about to learn how to read and write data using Spark. And, while this is important on its own, this chapter will also introduce you to the _data lake_—a repository of data stored in its natural or raw format that provides various benefits over existing storage architectures. For instance, you can easily integrate data from external systems without transforming it into a common format and without assuming those sources are as reliable as your internal data sources.
In addition, we will also discuss how to extend Spark’s capabilities to work with data not accessible out of the box and make several recommendations focused on improving performance for reading and writing data. Reading large datasets often requires you to fine-tune your Spark cluster configuration, but that's the topic of [Chapter 9](#tuning).
## Overview
In [Chapter 1](#intro), you<!--((("data handling", "overview of")))--> learned that beyond big data and big compute, you can also use Spark to improve velocity, variety, and veracity in data tasks. While you can use the learnings of this chapter for any task requiring loading and storing data, it is particularly interesting to present this chapter in the context of dealing with a variety of data sources. To understand why, we should first take a quick detour to examine how data is currently processed in many organizations.
For several years, it’s been a common practice to store large datasets in a relational _database_, a system first proposed in 1970 by Edgar F. Codd.^[Codd EF (1970). “A relational model of data for large shared data banks.”] You can think of a database as a collection of tables that are related to one another, where each table is carefully designed to hold specific data types and relationships to other tables. Most relational database systems use _Structured Query Language_ (SQL) for querying and maintaining the database. Databases are still widely used today, with good reason: they store data reliably and consistently; in fact, your bank probably stores account balances in a database and that’s a good practice.
However, databases have also been used to store information from other applications and systems. For instance, your bank might also store data produced by other banks, such as incoming checks. To accomplish this, the external data needs to be extracted from the external system, transformed into something that fits the current database, and finally be loaded into it. This is known as _Extract, Transform, and Load_ (ETL), a general procedure for copying data from one or more sources into a destination system that represents the data differently from the source. The ETL process became popular in the 1970s.
Aside from databases, data is often also loaded into a _data warehouse_, a system used for reporting and data analysis. The data is usually stored and indexed in a format that increases data analysis speed but that is often not suitable for modeling or running custom distributed code. The challenge is that changing databases and data warehouses is usually a long and delicate process, since data needs to be reindexed and the data from multiple data sources needs to be carefully transformed into single tables that are shared across data sources.
Instead of trying to transform all data sources into a common format, you can embrace this variety of data sources in a _data lake_, a system or repository of data stored in its natural format (see Figure \@ref(fig:data-data-lake)). Since data lakes make data available in its original format, there is no need to carefully transform it in advance; anyone can use it for analysis, which adds significant flexibility over ETL. You then can use Spark to unify data processing from data lakes, databases, and data warehouses through a single interface that is scalable across all of them. Some organizations also use Spark to replace their existing ETL process; however, this falls in the realm of data engineering, which is well beyond the scope of this book. We illustrate this with dotted lines in Figure \@ref(fig:data-data-lake).
```{r data-data-lake, echo=FALSE, eval=TRUE, fig.cap='Spark processing raw data from a data lakes, databases, and data warehouses', out.height = '280pt', out.width = 'auto', fig.align = 'center'}
render_nomnoml("
#spacing: 20
#padding: 10
#lineWidth:1
[Data Lake|
[hdfs://data.csv]
[s3a://data.json]
[dbfs://data.parquet]
[wasbs://data.xml]
]
[Data Lake]->[Spark|
[<note>spark_read_csv()
spark_read_json()
spark_read_parquet()
...
spark_read_jdbc()
spark_write_jdbc()
]
]
[Spark]->[Data Lake]
[Data Lake]--->[Storage Systems|
[Databases]
[Data Warehouses]]
[Storage Systems]--->[Spark]
[Spark]--->[Storage Systems]
", "images/data-data-lake.png")
```
In order to support a broad variety of data source, Spark needs to be able to read and write data in several different *file formats* (CSV, JSON, Parquet, etc), access them while stored in several *file systems* (HDFS, S3, DBFS, etc) and, potentially, interoperate with other *storage systems* (databases, data warehouses, etc). We will get to all of that; but first, we will start by presenting how to read, write and copy data using Spark.
## Reading Data
To support a broad variety of data sources, Spark needs to be able to read and write data in several different file formats (CSV, JSON, Parquet, and others), and access them while stored in several file systems (HDFS, S3, DBFS, and more) and, potentially, interoperate with other storage systems (databases, data warehouses, etc.). We will get to all of that, but first, we will start by presenting how to read, write, and copy data using Spark.
### Paths
If<!--((("data handling", "reading data", id="DHread08")))--> you are new to Spark, it is highly recommended to review this section before you start working with large datasets. We will introduce several techniques that improve the speed and efficiency of reading data. Each subsection presents specific ways to take advantage of how Spark reads files, such as the ability to treat entire folders as datasets as well as being able to describe them to read datasets faster in Spark.
```{r data-paths}
letters <- data.frame(x = letters, y = 1:length(letters))
dir.create("data-csv")
write.csv(letters[1:3, ], "data-csv/letters1.csv", row.names = FALSE)
write.csv(letters[1:3, ], "data-csv/letters2.csv", row.names = FALSE)
do.call("rbind", lapply(dir("data-csv", full.names = TRUE), read.csv))
```
```
x y
1 a 1
2 b 2
3 c 3
4 a 1
5 b 2
6 c 3
```
In Spark, there<!--((("folder as a table idea")))--> is the notion of a folder as a dataset. Instead of enumerating each file, simply pass the path containing all the files. Spark assumes that every file in that folder is part of the same dataset. This implies that the target folder should be used only for data purposes. This is especially important since storage systems like HDFS store files across multiple machines, but, conceptually, they are stored in the same folder; when Spark reads the files from this folder, it's actually executing distributed code to read each file within each machine—no data is transferred between machines when distributed files are read:
```{r data-paths-connect}
library(sparklyr)
sc <- spark_connect(master = "local", version = "2.3")
spark_read_csv(sc, "data-csv/")
```
```
# Source: spark<datacsv> [?? x 2]
x y
<chr> <int>
1 a 1
2 b 2
3 c 3
4 d 4
5 e 5
6 a 1
7 b 2
8 c 3
9 d 4
10 e 5
```
The "folder as a table" idea is found in other open source technologies as well. Under the hood, Hive tables work the same way. When you query a Hive table, the mapping is done over multiple files within the same folder. The folder’s name usually matches the name of the table visible to the user.
Next, we will present a technique that allows Spark to read files faster as well as to reduce read failures by describing the structure of a dataset in advance.
### Schema
When<!--((("reading data", "schema")))((("schemas")))--> reading data, Spark is able to determine the data source’s column names and column types, also known as the _schema_. However, guessing the schema comes at a cost; Spark needs to do an initial pass on the data to guess what it is. For a large dataset, this can add a significant amount of time to the data ingestion process, which can become costly even for medium-size datasets. For files that are read over and over again, the additional read time accumulates over time.
To avoid this, Spark allows you to provide a column definition by providing a `columns` argument to describe your dataset. You can create this schema by sampling a small portion of the original file yourself:
```{r data-schema}
spec_with_r <- sapply(read.csv("data-csv/letters1.csv", nrows = 10), class)
spec_with_r
```
```
x y
"factor" "integer"
```
Or, you can set the column specification to a vector containing the column types explicitly. The vector’s values are named to match the field names:
```{r data-schema-explicit}
spec_explicit <- c(x = "character", y = "numeric")
spec_explicit
```
```
x y
"character" "numeric"
```
The accepted variable types are: `integer`, `character`, `logical`, `double`, `numeric`, `factor`, `Date`, and `POSIXct`.
Then, when<!--((("commands", "spark_read_csv()")))--> reading using `spark_read_csv()`, you can pass `spec_with_r` to the `columns` argument to match the names and types of the original file. This helps to improve performance since Spark will not need to determine the column types.
```{r data-schema-read-csv}
spark_read_csv(sc, "data-csv/", columns = spec_with_r)
```
```
# Source: spark<datacsv> [?? x 2]
x y
<chr> <int>
1 a 1
2 b 2
3 c 3
4 a 1
5 b 2
6 c 3
```
The following example shows how to set the field type to something different. However, the new field type needs to be a compatible type in the original dataset. For example, you cannot set a `character` field to `numeric`. If you use an incompatible type, the file read will fail with an error. Additionally, the following example also changes the names of the original fields:
```{r data-schema-compatible}
spec_compatible <- c(my_letter = "character", my_number = "character")
spark_read_csv(sc, "data-csv/", columns = spec_compatible)
```
```
# Source: spark<datacsv> [?? x 2]
my_letter my_number
<chr> <chr>
1 a 1
2 b 2
3 c 3
4 a 1
5 b 2
6 c 3
```
In Spark, <!--((("malformed entries")))-->malformed entries can cause errors during reading, particularly for non-character fields. To prevent such errors, we can use a file specification that imports them as characters and then use `dplyr` to coerce the field into the desired type.
This subsection reviewed how we can read files faster and with fewer failures, which lets us start our analysis more quickly. Another way to accelerate our analysis is by loading less data into Spark memory, which we examine in the next section.
### Memory
By<!--((("reading data", "memory")))((("memory")))--> default, when using Spark with R, when you read data, it is copied into Spark's distributed memory, making data analysis and other operations very fast. There are cases, such as when the data is too big, for which loading all the data might not be practical or even necessary. For those cases, Spark can just "map" the files without copying data into memory.
The mapping creates a sort of virtual table in Spark. The implication is that when a query runs against that table, Spark needs to read the data from the files at that time. Any consecutive reads after that will do the same. In effect, Spark becomes a pass-through for the data. The advantage of this method is that there is almost no up-front time cost to "reading" the file; the mapping is very fast. The downside is that running queries that actually extract data will take longer.
This is controlled by the `memory` argument of the read functions. Setting it to `FALSE` prevents the data copy (the default is `TRUE`):
```{r data-memory}
mapped_csv <- spark_read_csv(sc, "data-csv/", memory = FALSE)
```
There are good use cases for this method, one of which is when not all columns of a table are needed. For example, take a very large file that contains many columns. Assuming this is not the first time you interact with this data, you would know what columns are needed for the analysis. When you know which columns you need, the files can be read using `memory = FALSE`, and then the needed columns can be selected with `dplyr`. The resulting `dplyr` variable can then be cached into memory, using the `compute()` function. This will make Spark query the file(s), pull the selected fields, and copy only that data into memory. The result is an in-memory table that took comparatively less time to ingest:
```{r data-memory-compute}
mapped_csv %>%
dplyr::select(y) %>%
dplyr::compute("test")
```
The next section covers a short technique to make it easier to carry the original field names of imported data.
### Columns
Spark 1.6 required<!--((("reading data", "columns")))((("columns")))--> that column names be sanitized, so R does that by default. There might be cases when you would like to keep the original names intact, or when working with Spark version 2.0 or above. To do that, set the `sparklyr.sanitize.column.names` option to `FALSE`:
```{r data-columns}
options(sparklyr.sanitize.column.names = FALSE)
copy_to(sc, iris, overwrite = TRUE)
```
```
# Source: table<iris> [?? x 5]
# Database: spark_connection
Sepal.Length Sepal.Width Petal.Length Petal.Width Species
<dbl> <dbl> <dbl> <dbl> <chr>
1 5.1 3.5 1.4 0.2 setosa
2 4.9 3 1.4 0.2 setosa
3 4.7 3.2 1.3 0.2 setosa
4 4.6 3.1 1.5 0.2 setosa
5 5 3.6 1.4 0.2 setosa
6 5.4 3.9 1.7 0.4 setosa
7 4.6 3.4 1.4 0.3 setosa
8 5 3.4 1.5 0.2 setosa
9 4.4 2.9 1.4 0.2 setosa
10 4.9 3.1 1.5 0.1 setosa
# ... with more rows
```
With this review of how to read data into Spark, we move on to look at how we can write data from our Spark<!--((("", startref="DHread08")))--> session.
## Writing Data
Some<!--((("writing data")))((("data handling", "writing data")))--> projects require that new data generated in Spark be written back to a remote source. For example, the data could be new predicted values returned by a Spark model. The job processes the mass generation of predictions, but then the predictions need to be stored. This section focuses on how you should use Spark for moving the data from Spark into an external destination.
Many new users start by downloading Spark data into R, and then upload it to a target, as illustrated in Figure \@ref(fig:data-avoid-approach). It works for smaller datasets, but it becomes inefficient for larger ones. The data typically grows in size to the point that it is no longer feasible for R to be the middle point.
```{r data-avoid-approach, echo=FALSE, fig.cap='Incorrect use of Spark when writing large datasets', fig.align = 'center', eval = TRUE, out.height = '100pt', out.width = 'auto', fig.align = 'center'}
render_nomnoml("
#direction: right
#arrowSize: 0.4
#lineWidth: 1
#spacing:90
[Source] -> [Spark | Read - Process]
[Spark]collect() -> [R]
[R | Write] -> [Target]
", "images/data-r-pass-through.png")
```
All efforts should be made to have Spark connect to the target location. This way, reading, processing, and writing happens within the same Spark session.
As Figure \@ref(fig:data-recommended-approach) shows, a better approach is to use Spark to read, process, and write to the target. This approach is able to scale as big as the Spark cluster allows, and prevents R from becoming a choke point.
```{r data-recommended-approach, echo=FALSE, eval=TRUE, fig.cap='Correct use of Spark when writing large datasets', fig.align = 'center', out.height = '100pt', out.width = 'auto', fig.align = 'center'}
render_nomnoml("
#direction: right
#arrowSize: 0.4
#lineWidth: 1
#spacing:90
[Source] -> [Spark | Reads - Process - Write]
[Spark] -> [Target]
", "images/data-spark-pass-through.png")
```
Consider the following scenario: a Spark job just processed predictions for a large dataset, resulting in a considerable amount of predictions. Choosing a method to write results will depend on the technology infrastructure you are working on. More specifically, it will depend on Spark and the target running, or not, in the same cluster.
Back to our scenario, we have a large dataset in Spark that needs to be saved. When Spark and the target are in the same cluster, copying the results is not a problem; the data transfer is between RAM and disk of the same cluster or efficiently shuffled through a high-bandwidth connection.
But what to do if the target is not within the Spark cluster? There are two options, and choosing one will depend on the size of the data and network speed:
Spark transfer
: In this case, Spark connects to the remote target location and copies the new data. If this is done within the same datacenter, or cloud provider, the data transfer could be fast enough to have Spark write the data directly.
External transfer and otherwise
: Spark can write the results to disk and transfers them via a third-party application. Spark writes the results as files and then a separate job copies the files over. In the target location, you would use a separate process to transfer the data into the target location.
It is best to recognize that Spark, R, and any other technology are tools. No tool can do everything, nor should be expected to. Next we will describe how to copy data into Spark or collect large datasets that don't fit in-memory, this can be used to transfer data across clusters, or help initialize your distributed datasets.
## Copy
Previous<!--((("copying data")))((("data handling", "copying data")))--> chapters used `copy_to()` as a handy helper to copy data into Spark; however, you can use `copy_to()` only to transfer in-memory datasets that are already loaded in memory. These datasets tend to be much smaller than the kind of datasets you would want to copy into Spark.
For instance, suppose that we have a 3 GB dataset generated as follows:
```{r data-copy-create-travis, echo=FALSE}
dir.create("largefile.txt")
write.table(matrix(rnorm(10^5), ncol = 10), "largefile.txt/1",
append = T, col.names = F, row.names = F)
for (i in 2:30)
file.copy("largefile.txt/1", paste("largefile.txt/", i))
```
```{r data-copy-create, eval=FALSE}
dir.create("largefile.txt")
write.table(matrix(rnorm(10 * 10^6), ncol = 10), "largefile.txt/1",
append = T, col.names = F, row.names = F)
for (i in 2:30)
file.copy("largefile.txt/1", paste("largefile.txt/", i))
```
If we had only 2 GB of memory in the driver node, we would not be able to load this 3 GB file into memory using `copy_to()`. Instead, when using the HDFS as storage in your cluster, you can use the `hadoop` command-line tool to copy files from disk into Spark from the terminal as follows. Notice that the following code works only in clusters using HDFS, not in local environments.
```{bash eval=FALSE}
hadoop fs -copyFromLocal largefile.txt largefile.txt
```
You then can read the uploaded file, as described in the [File Formats](#data-file-formats) section; for text files, you would run:
```{r data-copy-read}
spark_read_text(sc, "largefile.txt", memory = FALSE)
```
```
# Source: spark<largefile> [?? x 1]
line
<chr>
1 0.0982531064914565 -0.577567317599452 -1.66433938237253 -0.20095089489…
2 -1.08322304504007 1.05962389624635 1.1852771207729 -0.230934710049462 …
3 -0.398079835552421 0.293643382374479 0.727994248743204 -1.571547990532…
4 0.418899768227183 0.534037617828835 0.921680317620166 -1.6623094393911…
5 -0.204409401553028 -0.0376212693728992 -1.13012269711811 0.56149527218…
6 1.41192628218417 -0.580413572014808 0.727722566256326 0.5746066486689 …
7 -0.313975036262443 -0.0166426329807508 -0.188906975208319 -0.986203251…
8 -0.571574679637623 0.513472254005066 0.139050812059352 -0.822738334753…
9 1.39983023148955 -1.08723592838627 1.02517804413913 -0.412680186313667…
10 0.6318328148434 -1.08741784644221 -0.550575696474202 0.971967251067794…
# … with more rows
```
`collect()` has a similar limitation in that it can collect only datasets that fit your driver memory; however, if you had to extract a large dataset from Spark through the driver node, you could use specialized tools provided by the distributed storage. For HDFS, you would run the following:
```{bash eval=FALSE}
hadoop fs -copyToLocal largefile.txt largefile.txt
```
Alternatively, you can also collect datasets that don’t fit in memory by providing a callback to `collect()`. A callback is just an R function that will be called over each Spark partition. You then can write this dataset to disk or push to other clusters over the network.
You could use the following code to collect 3 GB even if the driver node collecting this dataset had less than 3 GB of memory. That said, as [Chapter 3](#analysis) explains, you should avoid collecting large datasets into a single machine since this creates a significant performance bottleneck. For conciseness, we will collect only the first million rows; feel free to remove `head(10^6)` if you have a few minutes to spare:
```{r data-copy-collect}
dir.create("large")
spark_read_text(sc, "largefile.txt", memory = FALSE) %>%
head(10^6) %>%
collect(callback = function(df, idx) {
writeLines(df$line, paste0("large/large-", idx, ".txt"))
})
```
Make sure you clean up these large files and empty your recycle bin as well:
```{r data-copy-unlink}
unlink("largefile.txt", recursive = TRUE)
unlink("large", recursive = TRUE)
```
In most cases, data will already be stored in the cluster, so you should not need to worry about copying large datasets; instead, you can usually focus on reading and writing different file formats, which we describe next.
## File Formats {#data-file-formats}
Out<!--((("data handling", "file formats", id="DHfile08")))--> of the box, Spark is able to interact with several file formats, like CSV, JSON, LIBSVM, ORC, and Parquet. Table \@ref(tab:data-formats-table) maps the file format to the function you should use to read and write data in Spark.
```{r data-formats-table, eval=TRUE, echo=FALSE}
knitr::kable(
data.frame(
Format = c(
"Comma separated values (CSV)", "JavaScript Object Notation (JSON)",
"Library for Support Vector Machines (LIBSVM)", "Optimized Row Columnar (ORC)",
"Apache Parquet", "Text"
),
Read = c(
"spark_read_csv()", "spark_read_json()", "spark_read_libsvm()", "spark_read_orc()",
"spark_read_parquet()", "spark_read_text()"
),
Write = c(
"spark_write_csv()", "spark_write_json()", "spark_write_libsvm()", "spark_write_orc()",
"spark_write_parquet()", "spark_write_text()"
)
),
booktabs = TRUE,
caption = 'Spark functions to read and write file formats'
)
```
The following sections will describe special considerations particular to each file format as well as some of the strengths and weaknesses of some popular file formats, starting with the well-known CSV file format.
### CSV
The<!--((("file formats", "CSV")))((("CSV format")))--> CSV format might be the most common file type in use today. It is defined by a text file separated by a given character, usually a comma. It should be pretty straightforward to read CSV files; however, it’s worth mentioning a couple techniques that can help you process CSVs that are not fully compliant with a well-formed CSV file. Spark offers the following modes for addressing parsing issues:
Permissive
: Inserts NULL values for missing tokens
Drop Malformed
: Drops lines that are malformed
Fail Fast
: Aborts if it encounters any malformed line
You can use these in `sparklyr` by passing them inside the `options` argument. The following example creates a file with a broken entry. It then shows how it can be read into Spark:
```{r data-csv-read-drop}
## Creates bad test file
writeLines(c("bad", 1, 2, 3, "broken"), "bad.csv")
spark_read_csv(
sc,
"bad3",
"bad.csv",
columns = list(foo = "integer"),
options = list(mode = "DROPMALFORMED"))
```
```
# Source: spark<bad3> [?? x 1]
foo
<int>
1 1
2 2
3 3
```
Spark provides an issue tracking column, which was hidden by default. To enable it, add `_corrupt_record` to the `columns` list. You can combine this with the use of the `PERMISSIVE` mode. All rows will be imported, invalid entries will receive an `NA`, and the issue will be tracked in the `_corrupt_record` column:
```{r data-csv-read-bad}
spark_read_csv(
sc,
"bad2",
"bad.csv",
columns = list(foo = "integer", "_corrupt_record" = "character"),
options = list(mode = "PERMISSIVE")
)
```
```
# Source: spark<bad2> [?? x 2]
foo `_corrupt_record`
<int> <chr>
1 1 NA
2 2 NA
3 3 NA
4 NA broken
```
Reading and storing data as CSVs is quite common and supported across most systems. For tabular datasets, it is still a popular option, but for datasets containing nested structures and nontabular data, JSON is usually preferred.
### JSON
JSON<!--((("file formats", "JSON")))((("JSON file format")))--> is a file format originally derived from JavaScript that has grown to be language-independent and very popular due to its flexibility and ubiquitous support. Reading and writing JSON files is quite straightforward:
```{r data-json-read}
writeLines("{'a':1, 'b': {'f1': 2, 'f3': 3}}", "data.json")
simple_json <- spark_read_json(sc, "data.json")
simple_json
```
```
# Source: spark<data> [?? x 2]
a b
<dbl> <list>
1 1 <list [2]>
```
However, when you deal with a dataset containing nested fields like the one from this example, it is worth pointing out how to extract nested fields. One approach is to use a JSON path, which is a domain-specific syntax commonly used to extract and query JSON files. You can use a combination of `get_json_object()` and `to_json()` to specify the JSON path you are interested in. To extract `f1` you would run the following transformation:
```{r data-json-path}
simple_json %>% dplyr::transmute(z = get_json_object(to_json(b), '$.f1'))
```
```
# Source: spark<?> [?? x 3]
a b z
<dbl> <list> <chr>
1 1 <list [2]> 2
```
Another approach is to install `sparkly.nested` from CRAN with `install.packages("sparklyr.nested")` and then unnest nested data with `sdf_unnest()`:
```{r data-json-unnest}
sparklyr.nested::sdf_unnest(simple_json, "b")
```
```
# Source: spark<?> [?? x 3]
a f1 f3
<dbl> <dbl> <dbl>
1 1 2 3
```
While JSON and CSVs are quite simple to use and versatile, they are not optimized for performance; however, other formats like ORC, AVRO, and Parquet are.
### Parquet
Apache Parquet, Apache ORC,<!--((("file formats", "Parquet")))((("Parquet")))((("Apache Parquet")))--> and Apache AVRO are all file formats designed with performance in mind. Parquet and ORC store data in columnar format, while AVRO is row-based. All of them are binary file formats, which reduces storage space and improves performance. This comes at the cost of making them a bit more difficult to read by external systems and libraries; however, this is usually not an issue when used as intermediate data storage within Spark.
To illustrate this, Figure \@ref(fig:data-file-format-benchmark) plots the result of running a 1-million-row write-speed benchmark using the `bench` package; feel free to use your own benchmarks over meaningful datasets when deciding which format best fits your needs:
```{r echo=FALSE, eval=FALSE}
numeric <- copy_to(sc, data.frame(nums = runif(10^6)))
benchmark <- bench::mark(
CSV = spark_write_csv(numeric, "data.csv", mode = "overwrite"),
JSON = spark_write_json(numeric, "data.json", mode = "overwrite"),
Parquet = spark_write_parquet(numeric, "data.parquet", mode = "overwrite"),
ORC = spark_write_parquet(numeric, "data.orc", mode = "overwrite"),
iterations = 20,
)
autoplot(benchmark) +
scale_color_grey(start = 0.6, end = 0.2) +
theme(axis.line.x = element_line(colour = "#333333", size = 1, linetype = "solid")) +
ggsave("images/data-file-format-benchmark.png", width = 10, height = 5)
```
```{r data-parquet-benchmark, eval=FALSE}
numeric <- copy_to(sc, data.frame(nums = runif(10^6)))
bench::mark(
CSV = spark_write_csv(numeric, "data.csv", mode = "overwrite"),
JSON = spark_write_json(numeric, "data.json", mode = "overwrite"),
Parquet = spark_write_parquet(numeric, "data.parquet", mode = "overwrite"),
ORC = spark_write_parquet(numeric, "data.orc", mode = "overwrite"),
iterations = 20
) %>% ggplot2::autoplot()
```
```{r data-file-format-benchmark, eval=TRUE, echo=FALSE, fig.cap="One-million-rows write benchmark between CSV, JSON, Parquet, and ORC", fig.align = 'center'}
render_image("images/data-file-format-benchmark.png")
```
From now on, be sure to disconnect from Spark whenever we present a new `spark_connect()` command:
```{r data-parquet-disconnect}
spark_disconnect(sc)
```
This concludes the introduction to some of the out-of-the-box supported file formats, we will present next how to deal with formats that require external packages and customization.
### Others
Spark<!--((("file formats", "others")))--> is a very flexible computing platform. It can add functionality by using extension programs, called packages. You can access a new source type or file system by using the appropriate package.
Packages need to be loaded into Spark at connection time. To load the package, Spark needs its location, which could be inside the cluster, in a file share, or the internet.
In `sparklyr`, the package location is passed to `spark_connect()`. All packages should be listed in the `sparklyr.connect.packages` entry of the connection configuration.
It is possible to access data source types that we didn't previously list. Loading the appropriate default package for Spark is the first of two steps The second step is to actually read or write the data. The `spark_read_source()` and `spark_write_source()` functions do that. They are generic functions that can use the libraries imported by a default package.
For instance, we can read XML files as follows:
```{r data-other-xml}
sc <- spark_connect(master = "local", version = "2.3", config = list(
sparklyr.connect.packages = "com.databricks:spark-xml_2.11:0.5.0"))
writeLines("<ROWS><ROW><text>Hello World</text></ROW>", "simple.xml")
spark_read_source(sc, "simple_xml", "simple.xml", "xml")
```
```
# Source: spark<data> [?? x 1]
text
<chr>
1 Hello World
```
which you can also write back to XML with ease, as follows:
```{r data-other-xml-write}
tbl(sc, "simple_xml") %>%
spark_write_source("xml", options = list(path = "data.xml"))
```
In addition, there are a few extensions developed by the R community to load additional file formats, such as `sparklyr.nested` to assist with nested data, `spark.sas7bdat` to read data from SAS, `sparkavro` to read data in AVRO format, and `sparkwarc` to read WARC files, which use extensibility mechanisms introduced in [Chapter 10](#extensions). [Chapter 11](#distributed) presents techniques to use R packages to load additional file formats, and [Chapter 13](contributing) presents techniques to use Java libraries to complement this further. But first, let’s explore how to retrieve and store files from several different file systems.<!--((("", startref="DHfile08")))-->
```{r data-xml-disconnect, echo=FALSE}
spark_disconnect(sc)
```
## File Systems {#data-file-systems}
Spark<!--((("file systems")))((("data handling", "file systems")))--> defaults to the file system on which it is currently running. In a YARN managed cluster, the default file system will be HDFS. An example path of _/home/user/file.csv_ will be read from the cluster’s HDFS folders, not the Linux folders. The operating system’s file system will be accessed for other deployments, such as Standalone, and `sparklyr`’s local.
The file system protocol can be changed when reading or writing. You do this via the `path` argument of the `sparklyr` function. For example, a full path of _file://home/user/file.csv_ forces the use of the local operating system’s file system.
There are many other file system protocols, such as `_dbfs://_` for Databricks’ file system, `_s3a://_` for Amazon’s S3 service, `_wasb://_` for Microsoft Azure storage, and `_gs://_` for Google storage.
Spark does not provide support for all them directly; instead, they are configured as needed. For instance, accessing the "s3a" protocol requires adding a package to the `sparklyr.connect.packages` configuration setting, while connecting and specifying appropriate credentials might require using the `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY` environment variables.
```{r eval=FALSE}
Sys.setenv(AWS_ACCESS_KEY_ID = my_key_id)
Sys.setenv(AWS_SECRET_ACCESS_KEY = my_secret_key)
sc <- spark_connect(master = "local", version = "2.3", config = list(
sparklyr.connect.packages = "org.apache.hadoop:hadoop-aws:2.7.7"))
my_file <- spark_read_csv(sc, "my-file", path = "s3a://my-bucket/my-file.csv")
```
Accessing other file protocols requires loading different packages, although, in some cases, the vendor providing the Spark environment might load the package for you. Please refer to your vendor’s documentation to find out whether that is the case.
## Storage Systems {#data-storage-systems}
A<!--((("data handling", "storage systems", id="DHstor08")))((("storage systems", "overview of")))--> data lake and Spark usually go hand-in-hand, with optional access to storage systems like databases and data warehouses. Presenting all the different storage systems with appropriate examples would be quite time-consuming, so instead we present some of the commonly used storage systems.
As a start, Apache _Hive_ is a data warehouse software that facilitates reading, writing, and managing large datasets residing in distributed storage using SQL. In fact, Spark has components from Hive built directly into its sources. It is very common to have installations of Spark or Hive side-by-side, so we will start by presenting Hive, followed by Cassandra, and then close by looking at JDBC connections.
### Hive
In<!--((("storage systems", "Hive")))((("Hive project", "Spark integration with")))--> YARN managed clusters, Spark provides a deeper integration with Apache Hive. Hive tables are easily accessible after opening a Spark connection.
You can access a Hive table’s data using `DBI` by referencing the table in a SQL statement:
```{r data-hive-connect}
sc <- spark_connect(master = "local", version = "2.3")
spark_read_csv(sc, "test", "data-csv/", memory = FALSE)
DBI::dbGetQuery(sc, "SELECT * FROM test limit 10")
```
Another way to reference a table is with `dplyr` using the `tbl()` function, which retrieves a reference to the table:
```{r data-hive-tbl}
dplyr::tbl(sc, "test")
```
It is important to reiterate that no data is imported into R; the `tbl()` function only creates a reference. You then can pipe more `dplyr` verbs following the `tbl()` command:
```{r data-hive-group}
dplyr::tbl(sc, "test") %>%
dplyr::group_by(y) %>%
dplyr::summarise(totals = sum(y))
```
Hive table references assume a default database source. Often, the needed table is in a different database within the metastore. To access it using SQL, prefix the database name to the table. Separate them using a period, as demonstrated here:
```{r data-hive-dbi, eval=FALSE}
DBI::dbSendQuery(sc, "SELECT * FROM databasename.table")
```
In `dplyr`, the `in_schema()` function can be used. The function is used inside the `tbl()` call:
```{r data-hive-tbl-name, eval=FALSE}
tbl(sc, dbplyr::in_schema("databasename", "table"))
```
You can also use the `tbl_change_db()` function to set the current session’s default database. Any subsequent call via `DBI` or `dplyr` will use the selected name as the default database:
```{r data-hive-change-db, eval=FALSE}
tbl_change_db(sc, "databasename")
```
The following examples require additional Spark packages and databases which might be difficult to follow unless you happen to have a JDBC driver or Cassandra database accessible to you.
```{r data-hive-disconnect}
spark_disconnect(sc)
```
Next, we explore a less structured storage system, often<!--((("NoSQL databases")))--> referred to as a _NoSQL database_.
### Cassandra
Apache _Cassandra_ is<!--((("storage systems", "Cassandra")))((("Cassandra")))--> a free and open source, distributed, wide-column store, NoSQL database management system designed to handle large amounts of data across many commodity servers. While there are many other database systems beyond Cassandra, taking a quick look at how Cassandra can be used from Spark will give you insight into how to make use of other database and storage systems like Solr, Redshift, Delta Lake, and others.
The following example code shows how to use the `datastax:spark-cassandra-connector` package to read from Cassandra. The key is to use the `org.apache.spark.sql.cassandra` library as the `source` argument. It provides the mapping Spark can use to make sense of the data source. Unless you have a Cassandra database, skip executing the following statement:
```{r eval=FALSE}
sc <- spark_connect(master = "local", version = "2.3", config = list(
sparklyr.connect.packages = "datastax:spark-cassandra-connector:2.3.1-s_2.11"))
spark_read_source(
sc,
name = "emp",
source = "org.apache.spark.sql.cassandra",
options = list(keyspace = "dev", table = "emp"),
memory = FALSE)
```
One<!--((("pushdown predicates")))--> of the most useful features of Spark when dealing with external databases and data warehouses is that Spark can push down computation to the database, a feature known as _pushdown predicates_. In a nutshell, pushdown predicates improve performance by asking remote databases smart questions. When you execute a query that contains the `filter(age > 20)` expression against a remote table referenced through `spark_read_source()` and not loaded in memory, rather than bringing the entire table into Spark, it will be passed to the remote database and only a subset of the remote table is retrieved.
While it is ideal to find Spark packages that support the remote storage system, there will be times when a package is not available and you need to consider vendor JDBC drivers.
### JDBC
When<!--((("storage systems", "JDBC")))((("JDBC")))--> a Spark package is not available to provide connectivity, you can consider a JDBC connection. JDBC is an interface for the programming language Java, which defines how a client can access a database.
It is quite easy to connect to a remote database with `spark_read_jdbc()`, and `spark_write_jdbc()`; as long as you have access to the appropriate JDBC driver, which at times is trivial and other times is quite an adventure. To keep this simple, we can briefly consider how a connection to a remote MySQL database could be accomplished.
First, you would need to download the appropriate JDBC driver from MySQL’s developer portal and specify this additional driver as a `sparklyr.shell.driver-class-path` connection option. Since JDBC drivers are Java-based, the code is contained within a _JAR_ (Java ARchive) file. As soon as you're connected to Spark with the appropriate driver, you can use the _jdbc://_ protocol to access particular drivers and databases. Unless you are willing to download and configure MySQL on your own, skip executing the following statement:
```{r eval=FALSE}
sc <- spark_connect(master = "local", version = "2.3", config = list(
"sparklyr.shell.driver-class-path" =
"~/Downloads/mysql-connector-java-5.1.41/mysql-connector-java-5.1.41-bin.jar"
))
spark_read_jdbc(sc, "person_jdbc", options = list(
url = "jdbc:mysql://localhost:3306/sparklyr",
user = "root", password = "<password>",
dbtable = "person"))
```
If you are a customer of particular database vendors, making use of the vendor-provided resources is usually the best place to start looking for appropriate drivers.<!--((("", startref="DHstor08")))-->
## Recap
This chapter expanded on how and why you should use Spark to connect and process a variety of data sources through a new data storage model known as data lakes—a storage pattern that provides more flexibility than standard ETL processes by enabling you to use raw datasets with, potentially, more information to enrich data analysis and modeling.
We also presented best practices for reading, writing, and copying data into and from Spark. We then returned to exploring the components of a data lake: file formats and file systems, with the former representing how data is stored, and the latter where the data is stored. You then learned how to tackle file formats and storage systems that require additional Spark packages, reviewed some of the performance trade-offs across file formats, and learned the concepts required to make use of storage systems (databases and warehouses) in Spark.
While reading and writing datasets should come naturally to you, you might still hit resource restrictions while reading and writing large datasets. To handle these situations, [Chapter 9](#tuning) shows you how Spark manages tasks and data across multiple machines, which in turn allows you to further improve the performance of your analysis and modeling tasks.