Hadoop

The following guide explains how to provision a Multi Node Hadoop Cluster locally and play with it. Checkout the Vagrantfile and the Vagrant guide for more details.

Resources

Setup

Requirements

Directory structure

tree -a hadoop/
hadoop/
├── .data # mounted volume
│   ├── hadoop_rsa
│   ├── hadoop_rsa.pub
│   ├── master
│   │   ├── hadoop
│   │   │   ├── log
│   │   │   │   ├── hadoop
│   │   │   │   ├── mapred
│   │   │   │   └── yarn
│   │   │   ├── namenode
│   │   │   └── secondary
│   │   ├── oozie
│   │   │   ├── data
│   │   │   └── log
│   │   ├── spark
│   │   │   └── log
│   │   └── zeppelin
│   │       ├── log
│   │       └── notebook
│   ├── node-1
│   │   └── hadoop
│   │       ├── datanode
│   │       └── log
│   │           ├── hadoop
│   │           ├── mapred
│   │           └── yarn
│   ├── node-2
│   ├── node-3
├── example
│   ├── map-reduce
│   └── spark
├── file
│   ├── hadoop
│   │   ├── config
│   │   │   ├── core-site.xml
│   │   │   ├── fair-scheduler.xml
│   │   │   ├── hdfs-site.xml
│   │   │   ├── mapred-site.xml
│   │   │   ├── masters
│   │   │   ├── slaves
│   │   │   └── yarn-site.xml
│   │   └── profile-hadoop.sh
│   ├── hosts
│   ├── motd
│   ├── oozie
│   │   ├── config
│   │   │   ├── oozie-env.sh
│   │   │   └── oozie-site.xml
│   │   └── profile-oozie.sh
│   ├── spark
│   │   ├── config
│   │   │   ├── log4j.properties
│   │   │   └── spark-env.sh
│   │   └── profile-spark.sh
│   ├── ssh
│   │   └── config
│   └── zeppelin
│       ├── config
│       │   └── zeppelin-env.sh
│       └── profile-zeppelin.sh
├── script
│   ├── bootstrap.sh
│   ├── setup_hadoop.sh
│   ├── setup_oozie.sh
│   ├── setup_spark.sh
│   ├── setup_ubuntu.sh
│   └── setup_zeppelin.sh
├── Vagrantfile
└── vagrant_hadoop.sh

Import the script

source vagrant_hadoop.sh

Create and start a Multi Node Hadoop Cluster

hadoop-start

The first time it might take a while

Access the cluster via ssh, check also the /etc/hosts file

vagrant ssh master
ssh hadoop@172.16.0.10 -i .data/hadoop_rsa

# 3 nodes
vagrant ssh node-1
ssh hadoop@172.16.0.101 -i .data/hadoop_rsa

Destroy the cluster

hadoop-destroy

For convenience add to the host machine

cat hadoop/file/hosts | sudo tee --append /etc/hosts

Web UI links

(*) Not installed by default

HDFS and MapReduce

HDFS is a distributed file system that provides high-throughput access to application data

YARN is a framework for job scheduling and cluster resource management

MapReduce is a YARN-based system for parallel processing of large data sets

Documentation

Admin

HDFS cli

# help
hdfs

# filesystem statistics
hdfs dfsadmin -report

# filesystem check
hdfs fsck /

YARN cli

# help
yarn

# list yarn applications
yarn application -list

# list nodes
yarn node -list

# view application logs
yarn logs -applicationId APPLICATION_ID

# kill yarn application
yarn application -kill APPLICATION_ID

Useful paths

# data and logs
devops/hadoop/.data/master/hadoop # host
/vol/hadoop # guest

# (guest) config
/usr/local/hadoop/etc/hadoop

# (hdfs) map-reduce history
/mr-history/history/done_intermediate/hadoop

# (hdfs) aggregated app logs
/yarn/app/hadoop/logs/application_XXX

MapReduce WordCount Job

# build jar on the host machine
cd devops/hadoop/example/map-reduce
./gradlew clean build

cd devops/hadoop
vagrant ssh master

# create base directory using hdfs
hdfs dfs -mkdir -p /user/ubuntu

# create example directory
hadoop fs -mkdir -p /user/ubuntu/word-count/input

# list directory
hadoop fs -ls -h -R /
hadoop fs -ls -h -R /user/ubuntu

# create sample files
echo "Hello World Bye World" > file01
echo "Hello Hadoop Goodbye Hadoop" > file02

# copy from local to hdfs
hadoop fs -copyFromLocal file01 /user/ubuntu/word-count/input
hadoop fs -put file02 /user/ubuntu/word-count/input

# verify copied files
hadoop fs -ls -h -R /user/ubuntu
hadoop fs -cat /user/ubuntu/word-count/input/file01
hadoop fs -cat /user/ubuntu/word-count/input/file02
hadoop fs -cat /user/ubuntu/word-count/input/*

# run application
hadoop jar /vagrant/example/map-reduce/build/libs/map-reduce.jar \
  /user/ubuntu/word-count/input \
  /user/ubuntu/word-count/output

# check output
hadoop fs -cat /user/ubuntu/word-count/output/part-r-00000

# delete directory to run it again
hadoop fs -rm -R /user/ubuntu/word-count/output

# run sample job in a different queue
hadoop jar \
  $HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-examples-*.jar \
  wordcount \
  -Dmapreduce.job.queuename=root.priority_queue \
  /user/ubuntu/word-count/input \
  /user/ubuntu/word-count/output

# well known WARN issue
# https://issues.apache.org/jira/browse/HDFS-10429

Benchmarking MapReduce with TeraSort

# generate random data
hadoop jar \
  $HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-examples-*.jar \
  teragen 1000 random-data

# run terasort benchmark
hadoop jar \
  $HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-examples-*.jar \
  terasort random-data sorted-data

# validate data
hadoop jar \
  $HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-examples-*.jar \
  teravalidate sorted-data report

# useful commands
hadoop fs -ls -h -R .
hadoop fs -rm -r random-data
hadoop fs -cat random-data/part-m-00000
hadoop fs -cat sorted-data/part-r-00000


Spark

Spark is an open-source cluster-computing framework

Resources

spark-architecture

Spark application on YARN

spark-job

# start REPL
spark-shell
pyspark

Interactive Analysis example

spark-shell
# spark shell with yarn
spark-shell --master yarn --deploy-mode client

# view all configured parameters
sc.getConf.getAll.foreach(x => println(s"${x._1}: ${x._2}"))

val licenceLines = sc.textFile("file:/usr/local/spark/LICENSE")
val lineCount = licenceLines.count
val isBsd = (line: String) => line.contains("BSD")
val bsdLines = licenceLines.filter(isBsd)
bsdLines.count
bsdLines.foreach(println)

Spark Job examples

Example local

# run SparkPi example
spark-submit \
  --class org.apache.spark.examples.SparkPi \
  --master local[*] \
  $SPARK_HOME/examples/jars/spark-examples_*.jar 10

# GitHub event documentation
# https://developer.github.com/v3/activity/events/types

# build jar on the host machine
cd devops/hadoop/example/spark
sbt clean package

cd devops/hadoop
vagrant ssh master

# sample dataset
mkdir -p github-archive && \
  cd $_ && \
  wget http://data.githubarchive.org/2018-01-01-{0..10}.json.gz && \
  gunzip -k *
# sample line
head -n 1 2018-01-01-0.json | jq '.'

# run local job
spark-submit \
  --class "com.github.niqdev.App" \
  --master local[*] \
  /vagrant/example/spark/target/scala-2.11/spark-github_2.11-0.1.0-SNAPSHOT.jar

Example cluster

# run job in YARN cluster-deploy mode
spark-submit \
  --class org.apache.spark.examples.SparkPi \
  --master yarn \
  --deploy-mode cluster \
  --driver-memory 2g \
  --executor-memory 1g \
  --executor-cores 3 \
  --queue default \
  $SPARK_HOME/examples/jars/spark-examples*.jar \
  10

# --conf "spark.yarn.jars=hdfs://namenode.local:9000/user/spark/share/lib/*.jar"


Zeppelin

Zeppelin is a web-based notebook that enables data-driven, interactive data analytics and collaborative documents with SQL, Scala and more

Resources

Setup

Install and start Zeppelin

# access master node
vagrant ssh master

# login as root
sudo su -

# install and init
/vagrant/script/setup_zeppelin.sh

# start manually (first time only)
su --login hadoop /vagrant/script/bootstrap.sh zeppelin

Examples

# markdown interpreter
%md
hello

# shell interpreter
%sh
hadoop fs -ls -h -R /

Cluster issue: verify to have enough memory with free -m e.g. Error: Cannot allocate memory


Oozie

Oozie is a workflow scheduler system to manage Hadoop jobs

Resources

Setup

Optional PostgreSQL configuration - By default Oozie is configured to use Embedded Derby

# access master node
vagrant ssh master

# install docker
curl -fsSL get.docker.com -o get-docker.sh && \
  chmod u+x $_ && \
  ./$_ && \
  sudo usermod -aG docker hadoop

# logout and login again to verify docker installation
exit
vagrant ssh master
whoami # hadoop
docker ps -a

# uncomment PostgreSQL configurations
vim devops/hadoop/file/oozie/config/oozie-site.xml # from host
vim /vagrant/file/oozie/config/oozie-site.xml # from guest

# start postgres on guest machine 
docker run \
  --detach \
  --name oozie-postgres \
  -p 5432:5432 \
  -e POSTGRES_DB="oozie-db" \
  -e POSTGRES_USER="postgres" \
  -e POSTGRES_PASSWORD="password" \
  postgres

# permission issue
# https://github.com/docker-library/postgres/issues/116
# --volume /vol/postgres:/var/lib/postgresql/data

# access container
docker exec -it oozie-postgres bash
psql --username=postgres
# list all databases
\list
\connect oozie-db
# list all tables
\dt
# describe table
\d+ wf_jobs
# list workflow
select * from wf_jobs;

Install and start Oozie

# access master node
vagrant ssh master

# login as root
sudo su -

# build, install and init
/vagrant/script/setup_oozie.sh

# start oozie manually (first time only)
su --login hadoop /vagrant/script/bootstrap.sh oozie

It might take a while to build the sources

Useful paths

# data and logs
devops/hadoop/.data/master/oozie # host
/vol/oozie # guest

# (guest) config
/usr/local/oozie/conf

# (hdfs) examples
/user/hadoop/examples

Examples

Run bundled examples within distribution

# examples path
.data/master/oozie/examples # host
/vol/oozie/examples # guest

# access master node as hadoop user
vagrant ssh master

export OOZIE_EXAMPLE_PATH=/vol/oozie/examples
export OOZIE_HDFS_PATH=/user/$(whoami)/examples

# open map-reduce job.properties
vim $OOZIE_EXAMPLE_PATH/apps/map-reduce/job.properties

# edit the following properties
nameNode=hdfs://namenode.local:9000 # fs.defaultFS @ core-site.xml
jobTracker=resource-manager.local:8032 # yarn.resourcemanager.address @ yarn-site.xml
queueName=priority_queue # or default @ fair-scheduler.xml

# upload all the examples
hadoop fs -put $OOZIE_EXAMPLE_PATH $OOZIE_HDFS_PATH

# verify uploaded files
hadoop fs -ls -h -R /user/$(whoami)

# run the map-reduce workflow example
oozie job \
  -oozie http://oozie.local:11000/oozie \
  -config $OOZIE_EXAMPLE_PATH/apps/map-reduce/job.properties \
  -run

# verify status
oozie job -oozie http://oozie.local:11000/oozie -info WORKFLOW_ID

# verify result
hadoop fs -cat $OOZIE_HDFS_PATH/output-data/map-reduce/part-00000

# remove all the examples
hadoop fs -rm -R $OOZIE_HDFS_PATH

Useful commands

  • Workflow requires oozie.wf.application.path property
  • Coordinator requires oozie.coord.application.path property
# verify oozie status
oozie admin \
  -oozie http://oozie.local:11000/oozie \
  -status

# verify workflow or coordinator status
oozie job \
  -oozie http://oozie.local:11000/oozie \
  -info JOB_ID \
  -verbose

# poll workflow or coordinator status
oozie job \
  -oozie http://oozie.local:11000/oozie \
  -poll JOB_ID \
  -interval 10 \
  -timeout 60 \
  -verbose

# find running coordinator
oozie jobs \
  -oozie http://oozie.local:11000/oozie/ \
  -filter status=RUNNING \
  -jobtype coordinator

# suspend|resume|kill coordinator
oozie job \
  -oozie http://oozie.local:11000/oozie/ \
  [-suspend|-resume|-kill] \
  XXX-C

# re-run coordinator's workflow (action)
oozie job \
  -oozie http://oozie.local:11000/oozie/ \
  -rerun XXX-C \
  -action 1,2,3,N

# kill workflow
oozie job \
  -oozie http://oozie.local:11000/oozie/ \
  -kill \
  XXX-W

# re-run all workflow's actions
oozie job \
  -oozie http://oozie.local:11000/oozie/ \
  -rerun \
  XXX-W \
  -Doozie.wf.rerun.failnodes=false