HDFS can partition large files into blocks to share the storage across many workers, and it can replicate those blocks so that data is not lost even if some workers die.
In this project, you'll deploy a small HDFS cluster and upload a large file to it, with different replication settings. You'll write Python code to read the file. When data is partially lost (due to a node failing), your code will recover as much data as possible from the damaged file.
Learning objectives:
- use the HDFS command line client to upload files
- use the webhdfs API (https://hadoop.apache.org/docs/r1.0.4/webhdfs.html) to read files
- measure the impact buffering has on read performance
- relate replication count to space efficiency and fault tolerance
Before starting, please review the general project directions.
- none yet
For this project, you'll create three containers, each from the same
base image (p3-base
). Create a directory called image
that
contains a Dockerfile
with the following:
FROM ubuntu:22.04
RUN apt-get update; apt-get install -y wget curl openjdk-11-jdk python3-pip net-tools lsof nano
RUN pip3 install jupyterlab==3.4.5 MarkupSafe==2.0.1 pandas
# HDFS
RUN wget https://pages.cs.wisc.edu/~harter/cs639/hadoop-3.2.4.tar.gz; tar -xf hadoop-3.2.4.tar.gz; rm hadoop-3.2.4.tar.gz
ENV JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64
ENV PATH="${PATH}:/hadoop-3.2.4/bin"
CMD ["sh", "/start.sh"]
Build it with docker build -t p3-image ./image
.
Now, create a docker-compose.yml
that starts your three containers. You can use the following as a starting point:
services:
main:
image: p3-image
ports:
- "127.0.0.1:5000:5000"
volumes:
- "./nb:/notebooks"
- "./main.sh:/start.sh"
# worker:
# TODO: create 2 replicas running the worker
networks:
default:
name: cs544net
driver: bridge
Even though all three containers have the same image, they will do
different things because /start.sh
is the ENTRYPOINT, and you'll map
in different scripts to run startup code (main.sh
for the main
service, and whatever you call it for the worker service).
The main
service should do three things:
- format an HDFS file system
- start an HDFS Namenode
- start a Jupyter notebook
The worker
service should just start a datanode.
Note: in this project, you will run your JupyterLab server inside a container; you can use the web interface and SSH tunneling for development. We're using this approach because then your code can run in the cs544net and easily communicate with the HDFS processes. Even if you normally use VSCode, it will be difficult to do so for this project (unless you can find a way to use it to connect to the container -- the 544 team does not know a way).
Here are some example commands that you can use for inspiration when writing your .sh files (you'll probably need to modify them):
hdfs namenode -format
hdfs namenode -D dfs.namenode.stale.datanode.interval=10000 -D dfs.namenode.heartbeat.recheck-interval=30000 -fs ????
python3 -m jupyterlab --no-browser --ip=0.0.0.0 --port=???? --allow-root --NotebookApp.token=''
hdfs datanode -D dfs.datanode.data.dir=/var/datanode -fs ????
Hints:
- HDFS formatting sometimes prompts you if you want to overwrite the previous file system. You can pass
-force
to make it do so without prompting (useful since this is scripted instead of done manually) - note how the namenode is configured with a couple
-D
options. You should also havedfs.webhdfs.enabled
betrue
- for the
-fs
, you can pass something likehdfs://SERVER:PORT
. Use port9000
andmain
for the server name (matching the Docker service name). - we want to access Jupyter from outside the container, so when setting the port number, review our port forwarding options from the compose file
- the namenode and Jupyter both run in the foreground by default, so whichever one that runs first will block the other from starting. You will need to send one of them to the background.
You can use docker compose up
to start your mini cluster of three containers. Some docker commands that might be helpful for debugging:
docker compose ps -a
to see what containers are running or exiteddocker logs <CONTAINER NAME>
to see the output of a containerdocker exec -it <CONTAINER NAME> bash
to get shell inside the containerdocker compose kill; docker compose rm -f
to get a fresh start
The last command above stops and deletes all the containers in your cluster. For simplicity, we recommend this rather than restarting a single container when you need to change something as it avoids some tricky issues with HDFS. For example, if you just restart+reformat the container with the NameNode, the old DataNodes will not work with the new NameNode without a more complicated process/config.
If all is well, you should be to connect to Jupyter inside the the
main container and create a notebook called p3.ipynb
where you'll do
the rest of your work. You can run !CMD
inside a cell to run CMD
as a shell command. Use this approach to show both your shell and
Python work for this project.
Note that each line under the volumes
section in docker-compose.yml
takes the form of <path on host>:<path in container>
. This tells the container to directly map certain files / folders from the host machine to inside the container so that when you change its content from inside the container, the changes will show up in the path on the host machine. This is how you ensure that p3.ipynb
does not get lost even if you removes the container running Jupyter.
Use a shell command in your notebook to download
https://pages.cs.wisc.edu/~harter/cs639/data/hdma-wi-2021.csv. Next,
use two hdfs dfs -cp
commands to upload this same file to HDFS
twice, to the following locations:
hdfs://main:9000/single.csv
hdfs://main:9000/double.csv
In both cases, use a 1MB block size (dfs.block.size
), and
replication (dfs.replication
) of 1 and 2 for single.csv
and
double.csv
, respectively.
Double check the sizes of the two files with the following commands:
hdfs dfs -du -h hdfs://main:9000/
You should see something like this:
166.8 M 333.7 M hdfs://main:9000/double.csv
166.8 M 166.8 M hdfs://main:9000/single.csv
The first columns show the logical and physical sizes. The two CSVs contain the same data, so the have the same logical sizes. Note the difference in physical size due to replication, though.
If you correctly configured the block size, single.csv should have 167 blocks, some of which will be stored on each of your two Datanodes. Your job is to write some Python code to count how many blocks are stored on each worker by using the webhdfs interface.
Read about the OPEN
call here:
https://hadoop.apache.org/docs/r1.0.4/webhdfs.html#OPEN.
Adapt the curl examples to use requests.get
in Python instead. Your URLs will be like this:
http://main:9870/webhdfs/v1/single.csv?op=OPEN&offset=????
Note that main:9870
is the Namenode, which will reply with a
redirection response that sends you to a Datanode for the actual data.
If you pass allow_redirects=False
to requests.get
and look at the
.headers
of the Namenode's repsonse, you will be able to infer which
Datanode stores that data corresponding to a specific offset in the
file. Loop over offsets corresponding to the start of each block
(your blocksize is 1MB, so the offsets will be 0, 1MB, 2MB, etc).
Construct and print a dictionary like the following that shows how
many blocks of single.csv
are on each Datanode:
{'http://70d2c4b6ccee:9864/webhdfs/v1/single.csv': 92,
'http://890e8d910f92:9864/webhdfs/v1/single.csv': 75}
Your data will probably be distributed differently between the two,
and you will almost certainly have container names that are different
than 70d2c4b6ccee
and 890e8d910f92
.
In this part, you'll make a new reader class that makes it easy to loop over the lines in an HDFS file.
You'll do this by inheriting from the io.RawIOBase
class: https://docs.python.org/3/library/io.html#class-hierarchy. Here is some starter code:
import io
class hdfsFile(io.RawIOBase):
def __init__(self, path):
self.path = path
self.offset = 0
self.length = 0 # TODO
def readable(self):
return True
def readinto(self, b):
return 0 # TODO
In the end, somebody should be able to loop over the lines in an HDFS file like this:
for line in io.BufferedReader(hdfsFile("single.csv")):
line = str(line, "utf-8")
print(line)
Implementation:
- use
GETFILESTATUS
(https://hadoop.apache.org/docs/r1.0.4/webhdfs.html#GETFILESTATUS) to correctly setself.length
in the constructor - whenever
readinto
is called, read some data at positionself.offset
from the HDFS file: https://hadoop.apache.org/docs/r1.0.4/webhdfs.html#OPEN - the data you read should be put into
b
. The type ofb
will generally be amemoryview
which is like a fixed-size list of bytes. You can use slices to put values here (something likeb[0:3] = b'abc'
). Since it is fixed size, you should uselen(b)
to determine how much data to request from HDFS. readinto
should return the number of bytes written intob
-- this will usually be thelen(b)
, but not always (for example, when you get to the end of the HDFS file), so base it on how much data you get from webhdfs.- before returning from
readinto
, increaseself.offset
so that the next read picks up where the previous one left off - if
self.offset >= self.length
, you know you're at the end of the file, soreadinto
should return 0 without calling to webhdfs
Use your class to loop over every line of single.csv.
Count how many lines contain the text "Single Family" and how many contain "Multifamily". Print out your counts like the following:
Counts from single.csv
Single Family: 444874
Multi Family: 2493
Seconds: 24.33926248550415
Note that by default io.BufferedReader
uses 8KB for buffering, which
creates many small reads to HDFS, so your code will be unreasonably
slow. Experiment with passing buffer_size=????
to use a larger
buffer.
Your code should show at least two different sizes you tried (and the resulting times).
You have two datanodes. What do you think will happen to single.csv
and double.csv
if one of these nodes dies?
Find out by manually running a docker kill <CONTAINER NAME>
command
on your VM to abruptly stop one of the Datanodes.
Wait until the Namenode realizes the Datanode has died before proceeding. Run !hdfs dfsadmin -fs hdfs://main:9000/ -report
in your notebook to see when this happens. Before proceeding, the report should show one dead Datanode, something
like the following:
Expand
Configured Capacity: 83111043072 (77.40 GB) Present Capacity: 25509035132 (23.76 GB) DFS Remaining: 24980049920 (23.26 GB) DFS Used: 528985212 (504.48 MB) DFS Used%: 2.07% Replicated Blocks: Under replicated blocks: 0 Blocks with corrupt replicas: 0 Missing blocks: 0 Missing blocks (with replication factor 1): 0 Low redundancy blocks with highest priority to recover: 0 Pending deletion blocks: 0 Erasure Coded Block Groups: Low redundancy block groups: 0 Block groups with corrupt internal blocks: 0 Missing block groups: 0 Low redundancy blocks with highest priority to recover: 0 Pending deletion blocks: 0Live datanodes (1):
Name: 172.19.0.4:9866 (spark-lecture-worker-1.cs544net) Hostname: 890e8d910f92 Decommission Status : Normal Configured Capacity: 41555521536 (38.70 GB) DFS Used: 255594721 (243.75 MB) Non DFS Used: 28793143071 (26.82 GB) DFS Remaining: 12490006528 (11.63 GB) DFS Used%: 0.62% DFS Remaining%: 30.06% Configured Cache Capacity: 0 (0 B) Cache Used: 0 (0 B) Cache Remaining: 0 (0 B) Cache Used%: 100.00% Cache Remaining%: 0.00% Xceivers: 1 Last contact: Sat Dec 31 20:07:46 GMT 2022 Last Block Report: Sat Dec 31 20:04:00 GMT 2022 Num of Blocks: 242
Dead datanodes (1):
Name: 172.19.0.3:9866 (172.19.0.3) Hostname: 70d2c4b6ccee Decommission Status : Normal Configured Capacity: 41555521536 (38.70 GB) DFS Used: 273390491 (260.73 MB) Non DFS Used: 28775310437 (26.80 GB) DFS Remaining: 12490043392 (11.63 GB) DFS Used%: 0.66% DFS Remaining%: 30.06% Configured Cache Capacity: 0 (0 B) Cache Used: 0 (0 B) Cache Remaining: 0 (0 B) Cache Used%: 100.00% Cache Remaining%: 0.00% Xceivers: 1 Last contact: Sat Dec 31 20:06:15 GMT 2022 Last Block Report: Sat Dec 31 20:04:00 GMT 2022 Num of Blocks: 259
Note that HDFS datanodes use heartbeats to inform the namenode of its liveness. That is, the datanodes send a small dummy message (heartbeat) periodically (every 3 seconds by default) to inform the namenode of its presence. Recall that when we start the namenode, we specify dfs.namenode.stale.datanode.interval=10000
and dfs.namenode.heartbeat.recheck-interval=30000
. The first says that the namenode considers the datanode stale if it does not receive its heartbeat for 10 seconds (10000 ms) and the second says that it will consider the datanode dead after another 30 seconds. Hence, if you configure your cluster correctly, the namenode will become aware of the loss of datanode within 40 seconds after you killed the datanode.
Run your code from part 3 again that counts the multi and single family dwellings in new cells. Do so on both double.csv and single.csv.
double.csv should work just like before. You will have lost some
blocks from single.csv, but modify readinto
in your hdfsFile
class
so that it still returns as much data as possible. If reading a block
from webhdfs fails, readinto
should do the following:
- put
\n
into theb
buffer - move
self.offset
forward to the start of the next block - return 1 (because
\n
is a single character)
You should get some prints like this:
Counts from double.csv
Single Family: 444874
Multi Family: 2493
AND
Counts from single.csv
Single: 200608
Multi: 929
Observe that we're still getting some lines from single.csv, but only about half as many as before the data loss (exact counts will depend on how many blocks each datanode was storing).
We should be able to run the following on your submission to create the mini cluster:
docker build -t p3-image ./image
docker compose up
We should then be able to open http://localhost:5000/lab
, find your
notebook, and run it.
The following is approximately how we will grade, but we may make changes if we overlooked an important part of the specification or did not consider a common mistake.
- [x/1] a
Dockerfile
anddocker-compose.yml
can be used to deploy the cluster (part 1) - [x/1]
dfs
cp and du commands create (and show) single.csv and double.csv with the correct block size and replication settings (part 1) - [x/1] a dict shows the division of blocks between the two Datanodes (part 2)
- [x/1]
__init__
correctly uses webhdfsGETFILESTATUS
to get the file length (part 3) - [x/1]
readinto
correctly uses webhdfsOPEN
to get a chunk of the file (part 3) - [x/1] the correct single and multi family counts are computed and printed (part 3)
- [x/1] two different buffer sizes are evaluated with
io.BufferedReader
(part 3) - [x/1] the
-report
command shows that one Datanode is alive and one is dead (part 4) - [x/1]
readinto
correctly replaces missing blocks with a single newline (part 4) - [x/1] counts are correctly shown for both single.csv and double.csv (part 4)