HADOOP
Introduction
Hadoop is a library intended for processing very large datasets in a distributed way across computer clusters. It was originally designed for shared-nothing systems; for example, a cluster where each node has its own CPU, memory, and disk space. To run Hadoop jobs on HPC clusters which have shared/parallel file system such as Panasas, you must use myhadoop. myhadoop allows you to submit a Hadoop job as a batch job to a scheduler.
Running Hadoop on the HPC Cluster
In the following, You need to 1 need be done only once, whereas steps 2, 3, and 4 should be wrapped in your job submit script.
Step 1: Install Apache hadoop-1.2.1 and myhadoop-0.30 at your home directory.
First, obtain the source code for Hadoop:
$ cd ~
$ mkdir hadoop-stack
$ cp /gpfs/research/software/hadoop/hadoop-1.2.1-bin.tar.gz ./hadoop-stack/
$ cp /gpfs/research/software/hadoop/v0.30.tar.gz ./hadoop-stack/
$ cd hadoop-stack
$ tar xvfz hadoop-1.2.1-bin.tar.gz
$ tar xvfz v0.30.tar.gz
Next, apply patches created by myhadoop to a few hadoop configuration files:
$ cd hadoop-1.2.1/conf
$ patch < ../../myhadoop-0.30/myhadoop-1.2.1.patch
This will produce the following output:
patching file core-site.xml
patching file hdfs-site.xml
patching file mapred-site.xml
Add umask 0022 to the ~/.bashrc file in your home directory:
echo -e "\n#For hadoop\numask 0022" >> ~/.bashrc && source ~/.bashrc
Note. Currently there is a copy of hadoop-1.2.1 installed at the following directory:
/gpfs/research/software/hadoop
However, since it is quite easy to install Hadoop and Myhadoop, we suggest you to install your own version to your home directory to avoid possible interference with other users.
Step 2. Start up a Hadoop Cluster
The following code snippets should all become part of your submit script. Refer to Example 1 below for a complete example.
Myhadoop bridges the gap between Hadoop and Batch Scheduling Systems (e.g., our scheduler, Slurm). When you submit a job using a Slurm script, the scheduler will allocate resources automatically. Myhadoop will configure the Hadoop distributed file system (HDFS) dynamically according to the computing resources allocated to you. This means that a new HDFS will be created each time your submit a job. Your Hadoop file system will be created/maintained in your Panasas volume. This means any Hadoop work your job performs must respect your Panasas quota.
Define a few environmental variables (DO NOT change the names of the following environmental variables):
$ export JAVA_HOME=/usr
$ export HADOOP_HOME=$HOME/hadoop-stack/hadoop-1.2.1
$ export MY_HADOOP_HOME=$HOME/hadoop-stack/myhadoop-0.30
$ export PATH=$HADOOP_HOME/bin:$MY_HADOOP_HOME/bin:$PATH
Create a directory to store configuration files. Every time you start a Hadoop cluster, there will be some (per-job) configuration files.
$ mkdir -p $HOME/hadoop/config
$ export HADOOP_CONF_DIR=$HOME/hadoop/config/conf-$SLURM_JOBID
Also create need a directory to store your hdfs (hadoop distributed file system) data:
$ mkdir -p $HOME/hadoop/hdfs
$ export MH_PERSIST_DIR=$HOME/hadoop/hdfs
MH_SCRATCH_DIR is the directory for node-local scratch space:
$ export MH_SCRATCH_DIR=/tmp
(this is needed even though there is no node-local disk space for HPC cluster.)
d. Running myhadoop-configure.sh (must under persistent mode, see the -p option)
$ myhadoop-configure.sh -s $ MH_SCRATCH_DIR -p $MH_PERSIST_DIR
e. Starting up a Hadoop cluster
$ $HADOOP_HOME/bin/start-all.sh
f. to check if your Hadoop cluster is up, run the following Hadoop administrating command:
$ hadoop dfsadmin -report
You will see a brief status report about basic information about the overall health of the HDFS cluster, as well as some per-server metrics.
Step 3. Run your hadoop job
Refer to Example 1 below.
Step 4 Stop and Clean
$ $HADOOP_HOME/bin/stop-all.sh
$ myhadoop-cleanup.sh
Note. Since the hHdoop cluster is dynamically configured through the job submit script and myhadoop, it is not advisable to run hadoop commands directly (outside your hadoop job submit script).
Example 1: Running the word-counting example on the HPC cluster.
To run Hadoop on the HPC cluster, your Hadoop job must be submitted as a batch job. Below is a sample job submission script (hadoop_sub.sh
) for the well-known word-counting example (i.e., for one or multiple books, count the number of times each word occurs).
#!/bin/bash
#####################################################################################
# name: hadoop_sub.sh
# A sample slurm submit script that illustrates how to spin up a Hadoop cluster on the HPC cluster at FSU
######################################################################################
#SBATCH -J "test_hadoop"
#SBATCH -N 2
#SBATCH --ntasks-per-node=4
#SBATCH -p genacc_q
#SBATCH --mail-type=ALL
#SBATCH -t 00:30:00
# define a few environmental variables needed:
export JAVA_HOME=/usr
export HADOOP_HOME=$HOME/hadoop-stack/hadoop-1.2.1
export MY_HADOOP_HOME=$HOME/hadoop-stack/myhadoop-0.30
export PATH=$HADOOP_HOME/bin:$MY_HADOOP_HOME/bin:$PATH
if [ ! -d $HOME/hadoop ]; then
mkdir $HOME/hadoop
mkdir $HOME/hadoop/config
mkdir $HOME/hadoop/hdfs
fi
if [ -d $HOME/hadoop/hdfs ]; then
rm -rf $HOME/hadoop/hdfs/*
fi
# HADOOP_CONF_DIR is where your hadoop cluster's configuration will be located.
export HADOOP_CONF_DIR=$HOME/hadoop/config/conf-$SLURM_JOBID
#MH_SCRATCH_DIR is directory for node-local scratch space,
export MH_SCRATCH_DIR=/tmp
# We have to work under persistent mode for HPC at FSU
export MH_PERSIST_DIR=$HOME/hadoop/hdfs
echo "Running myhadoop-configure.sh under persistent mode: "
# both -s and -p options have to be specified for persistent mode
myhadoop-configure.sh -s $MH_SCRATCH_DIR -p $MH_PERSIST_DIR
echo "Starting all Hadoop Daemons: "
$HADOOP_HOME/bin/start-all.sh
echo
echo "check if hadoop is deployed (in trouble if you see a lot or zeros) "
hadoop dfsadmin -report
# need pg2701.txt, an E-book, for the word counting example
if [ ! -f ./pg2701.txt ]; then
cp /gpfs/research/software/hadoop/Doc/pg2701.txt .
fi
# wordcount-output is the directory in your local file system storing the results
if [ -d ./wordcount-output ]; then
rm -rf ./wordcount-output
fi
if [ -d ./wordcount-output2 ]; then
rm -rf ./wordcount-output2
fi
echo "Running a test word-counting example: "
# create a directory "data" in your HDFS file system
hadoop dfs -mkdir data
# copy the book "pg2701.txt" from your LOCAL file system to the HDFS file system
hadoop dfs -put ./pg2701.txt data/
# check if the book is copied successfully
hadoop dfs -ls data
# run the Hadoop word count example
# note: "data" and "wordcount-output" are the input/output directory in the HDFS system
hadoop jar $HADOOP_HOME/hadoop-examples-*.jar wordcount data wordcount-output
hadoop dfs -ls wordcount-output
# copy results from the HDFS file system to LOCAL file system
hadoop dfs -get wordcount-output ./
echo
echo "Next, re-run the word count example but with 100 copies of pg2701.txt"
echo " it will take a few minutes for the map-reduce to be finished "
hadoop dfs -mkdir data2
for i in {1..100}
do
hadoop dfs -cp data/pg2701.txt data2/pg2701_copy.$i.txt
done
hadoop dfs -dus data2
hadoop jar $HADOOP_HOME/hadoop-examples-*.jar wordcount data2 wordcount-output2
hadoop dfs -get wordcount-output2 ./
echo "results should be in the directory 'wordcount-output2' in your local file system "
echo "Please check/compare the results in two output directories."
echo "Stopping all Hadoop daemons:"
$HADOOP_HOME/bin/stop-all.sh
echo
echo "cleanning up working directories: "
myhadoop-cleanup.sh
echo
If the above example runs successfully, you will see three files in the wordcount-output directory
$ ls wordcount-output/
_logs part-r-00000 _SUCCESS
The file part-r-00000
stores the results of the map/reduce job:
$ head part-r-00000
"'A 3
"'Also 1
"'Are 1
"'Aye, 2
"'Aye? 1
"'Best 1
"'Better 1
"'Bout 1
"'But 2
"'Canallers!' 1
......
In the wordcount-output2 directory you will see similar things as expected:
$ head part-r-00000
"'A 300
"'Also 100
"'Are 100
"'Aye, 200
"'Aye? 100
"'Best 100
"'Better 100
"'Bout 100
"'But 200
"'Canallers!' 100
......
Note. In the above submit script, the Hadoop file system commands...
hadoop dfs -mkdir
hadoop dfs -ls
...are very similar to the Unix file system commands mkdir
and ls
, respectively (dfs
is a module in HDFS). Since the Hadoop Distributed File System cannot be directly probed by traditional Unix commands, Hadoop command -put and -get are used to copy between HDFS and the local file system:
hadoop dfs -put local_file hadoop_file
hadoop dfs -get hadoop_file local_file
Note. The source code WordCount.java
is provided here (there are several versions of java code for this word counting example. Do not worry about the details if you are new to Hadoop, Map/Reduce, or Java):
// WordCount.java
package org.myorg;
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
public class WordCount {
public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken());
output.collect(word, one);
}
}
}
public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
output.collect(key, new IntWritable(sum));
}
}
public static void main(String[] args) throws Exception {
JobConf conf = new JobConf(WordCount.class);
conf.setJobName("wordcount");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(Map.class);
conf.setCombinerClass(Reduce.class);
conf.setReducerClass(Reduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
JobClient.runJob(conf);
}
}
Persistency of HDFS data on the HPC cluster
You can persist data in the HDFS between Hadoop restarts and job submissions. To achieve this, you must run your jobs in persistent mode and use the $MH_PERSIST_DIR
variable.
Example 2. Retrieve data From a previous Hadoop job
Suppose you have run the Example 1
above successfully. Now, start up a new hadoop cluster via the following script:
#!/bin/bash
#####################################################################################
# name: hadoop_persistency_test.sh
# A sample slurm submit script testing persistency of Hadoop data.
######################################################################################
#SBATCH -J "test_hadoop_persistency"
#SBATCH -N 4
#SBATCH --ntasks-per-node=4
#SBATCH -p genacc_q
#SBATCH --mail-type=ALL
#SBATCH -t 01:00:00
export JAVA_HOME=/usr
export HADOOP_HOME=$HOME/hadoop-stack/hadoop-1.2.1
export MY_HADOOP_HOME=$HOME/hadoop-stack/myhadoop-0.30
export PATH=$HADOOP_HOME/bin:$MY_HADOOP_HOME/bin:$PATH
export HADOOP_CONF_DIR=$HOME/hadoop/config/conf-$SLURM_JOBID
export MH_SCRATCH_DIR=/tmp
# to use previous HDFS data, use same $MH_PERSISIT_DIR as previous job
export MH_PERSIST_DIR=$HOME/hadoop/hdfs
myhadoop-configure.sh -s $MH_SCRATCH_DIR -p $MH_PERSIST_DIR
$HADOOP_HOME/bin/start-all.sh
echo "Check if there is any thing in my HDFS system: "
echo "Should be something since you were/are running under persistent mode"
hadoop dfs -ls
hadoop dfs -ls data
echo
echo "Stopping all Hadoop daemons and cleaning working directories:"
$HADOOP_HOME/bin/stop-all.sh
myhadoop-cleanup.sh
You will see content of your HDFS similar to the following (refer to the Slurm output file test_hadoop_persistency.oJOBID, where JOBID is the Slurm Job ID):
$ hadoop dfs -ls
Found 2 items
drwxr-xr-x -$USER supergroup 0 2014-05-12 14:11 /user/$USER/data
drwxr-xr-x -$USER supergroup 0 2014-05-12 14:12 /user/$USER/wordcount-output
$ hadoop dfs -ls data
Found 1 items
-rw-r--r-- 3 $USER supergroup 1257275 2014-05-12 14:11 /user/$USER/data/pg2701.txt
Example 3: Interacting with HDFS programmatically on HPC cluster
The following Java code HDFSHelloWorld.java
creates a file hello.txt
in the HDFS file system, writes a hello message, reads it back, and prints it as standard output.
//HDFSHelloWorld.java
import java.io.File;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
public class HDFSHelloWorld {
public static final String theFilename = "hello.txt";
public static final String message = "Hello, world!\n";
public static void main (String [] args) throws IOException {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
Path filenamePath = new Path(theFilename);
try {
if (fs.exists(filenamePath)) {
// remove the file first
fs.delete(filenamePath);
}
FSDataOutputStream out = fs.create(filenamePath);
out.writeUTF(message);
out.close();
FSDataInputStream in = fs.open(filenamePath);
String messageIn = in.readUTF();
System.out.print(messageIn);
in.close();
}
catch (IOException ioe) {
System.err.println("IOException during operation: " + ioe.toString());
System.exit(1);
}
}
}
To run the above java code on the HPC cluster, follow the following steps:
Compile HDFSHelloWorld.java
:
javac -classpath /path/to/your/hadoop-1.2.1/hadoop-core-1.2.1.jar HDFSHelloWorld.java
Create a .jar file, say, hello.jar
:
jar cfm hello.jar manifest.txt HDFSHelloWorld.class
...where manifest.txt
contains the headers to append to the META-INF/MANIFEST.MF
$ cat manifest.txt
Main-Class: HDFSHelloWorld
Class-Path: $HOME/hadoop-stack/hadoop-1.2.1/hadoop-core-1.2.1.jar
Run hello.jar
within a Hadoop cluster on HPC. Again, Hadoop jobs must be run within a job submit script (please refer to the examples above).
configure and start up your hadoop cluster
......
echo "Running java code hello.jar: "
echo "will create hello.txt within hdfs, write Hello, world! to it, "
echo "and read it back from HDFS, and print it to stdout: "
hadoop jar hello.jar
echo "check if you have the hello.txt created in the hdfs"
hadoop dfs -ls
.......
stop your hadoop cluster
You will see a file similar to the following upon successful execution of your code:
-rw-r--r-- 3 $USER supergroup 16 2014-05-07 11:32 /user/$USER/hello.txt
Example 4: Hadoop Word-Counting Using Python
Even though the Hadoop framework is written in Java, Map/Reduce programs can be developed in other languages such as Python or C++. This example demonstrates how to run the simple word-count example again, but with Mapper/Reducer developed in Python.
Here is the source code of mapper.py
:
#!/usr/bin/env python
import sys
# input comes from STDIN (standard input)
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
# split the line into words
words = line.split()
# increase counters
for word in words:
# write the results to STDOUT (standard output);
# what we output here will be the input for the
# Reduce step, i.e. the input for reducer.py
#
# tab-delimited; the trivial word count is 1
print '%s\t%s' % (word, 1)
Here is the Python code for reducer.py
:
#!/usr/bin/env python
from operator import itemgetter
import sys
current_word = None
current_count = 0
word = None
# input comes from STDIN
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
# parse the input we got from mapper.py
word, count = line.split('\t', 1)
# convert count (currently a string) to int
try:
count = int(count)
except ValueError:
# count was not a number, so silently
# ignore/discard this line
continue
# this IF-switch only works because Hadoop sorts map output
# by key (here: word) before it is passed to the reducer
if current_word == word:
current_count += count
else:
if current_word:
# write result to STDOUT
print '%s\t%s' % (current_word, current_count)
current_count = count
current_word = word
# do not forget to output the last word if needed!
if current_word == word:
print '%s\t%s' % (current_word, current_count)
To run this example, first copy mapper.py
and reducer.py
to your home directory. Next, create a submit script by replacing the line #72 of the submit script hadoop_sub.sh
in the Example 1 above, i.e.,
hadoop jar $HADOOP_HOME/hadoop-examples-*.jar wordcount data wordcount-output
...with...
hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-*streaming*.jar \
-file mapper.py -mapper mapper.py \
-file reducer.py -reducer reducer.py \
-input data -output wordcount-output
...and the line #86, i.e.,
hadoop jar $HADOOP_HOME/hadoop-examples-*.jar wordcount data2 wordcount-output2
...with...
hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-*streaming*.jar \
-file mapper.py -mapper mapper.py \
-file reducer.py -reducer reducer.py \
-input data2 -output wordcount-output2
This Python example will generate the same output as the Java example above.
Note. The idea behind this example is that the Hadoop Streaming API helps pass data between the Map and Reduce code via STDIN (standard input) and STDOUT (standard output).
Note. There is no intermediate combining stage before the reducing stage, so the code is a little slower than the word count example written in Java.
Further Reading
- Webpage for myhadoop
- Webpage for Apache hadoop
- A nice introduction to Hadoop can be found from Yahoo
- The Example 4 running Hadoop job using Python was from Michael G. Noll
- Apache Hadoop documentation for Hadoop_Streaming
- Hadoop Programming with Arbitrary Languages