In my previous blog, I made the case on the benefits of running Hadoop on FlashBlade. This second blog focuses on 1) how to configure Hadoop to run MapReduce using NFS in place of HDFS, and 2) performance comparisons between FlashBlade and HDFS-on-SSDs.
The following description assumes you already know how to setup a Hadoop cluster and just describes the differences needed to use NFS instead of HDFS. If you are new to Hadoop, I found this tutorial particularly useful. I used the core Hadoop distribution here, but everything should apply to most vendor-specific distributions of Hadoop as well.
On the NFS side, the only requirement is a single filesystem that is mounted at the same location on all Hadoop nodes.
for node in 'cat $HADOOP_HOME/etc/hadoop/slaves'; do
ssh $node mkdir /mnt/nfs_mount
ssh $node sudo mount $NFS_IP:/$FS_NAME /mnt/nfs_mount
Running MapReduce jobs with input or output destinations on a mounted filesystem is straightforward; append “file://“ to the beginning of any path. To run a multi-node MapReduce, ensure that all nodes have mounted a networked filesystem at the same location and use that as the path. As an example:
hadoop jar hadoop/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.3.jar
wordcount file:///mnt/nfs_mount/input_dir file:///mnt/nfs_mount/output_dir
Assuming all nodes in the cluster have /mnt/nfs_mount/ mounted at the same location, then these paths will be used for input and output for the job. This is effectively combining the “local filesystem” mode and a mounted filesystem to use a shared storage location.
To make the “file://“ prefix the default, it is recommended to change the property <fs.defaultfs> = <file://> in core-site.xml. This way, it is easier to avoid accidentally trying to read/write to an HDFS cluster that does not exist.
The only strict requirement to run MapReduce over NFS is that all nodes in the cluster have the same mount point mounted at the same directory path.
Other important properties for tuning MapReduce and Yarn:
- Read performance of large input files is heavily dependent on the minimum splitsize; this controls the chunks that are created as input to each map task. The property mapreduce.input.fileinputformat.split.minsize controls this and the value I found that worked best was 512MB.
- Yarn’s default capacity scheduler results in poor utilization, at least when running one job at a time and changing to use the fair scheduler resulted in much better utilization. Set this property via <yarn.resourcemanager.scheduler.class> = <org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler>.
It has been my experience that hadoop and/or java do not manage memory usage well and it was required to set the property <mapreduce.reduce.shuffle.input.buffer.percent> = <0.2>. In addition, Java 8 does not use virtual memory as fastidiously as java 7, so I set <yarn.nodemanager.vmem-check-enabled> = <false>.
Some Hadoop/MapReduce programs have been written assuming an unfortunately tight coupling between HDFS and MapReduce. In particular, MapReduce jobs from third-parties may assume too much about locations of intermediate/temporary files.
The root issue is that by using “file://” as the default file system the current working directory is not, by default, shared by all nodes. In other words, if you set the defaultfs property to “file:///mnt/”, the working directory is still the directory in which the job was launched. This requires careful launching of a job if it assumes the current working directory is shared storage.
For example, the terasort benchmark program has a known issue where the distributed cache feature is incorrectly read from, working only by happenstance in HDFS.
Another example is the Grep program packaged amongst the sample programs, which actually is two MapReduce phases packaged together. This program assumes the current working directory of the job controller is a shared location. This means that in order for the job to succeed, it must be run from the shared mount point so that it inherits a valid, shared location for the intermediate output.
Experiments have been run using a cluster with 25 physical nodes, each with 192GB total RAM and Intel DC S3510 120GB SSDs running CentOS Linux release 7.3.1611. The CPUs are dual-socket Xeon E5-2660v3 @ 2.6GHz (with 2 hyperthreads per core) with a total of 40 cores per node.
For fair performance comparisons, these nodes are using SSDs for HDFS and FlashBlade uses all-flash storage. These HDFS data points represent a higher-performance and higher-cost solution than is commonly deployed, so these experiments focus on performance and not lowest-cost.
Using iozone, the local SSDs have measured write throughput of 140MB/s and read throughput of 445MB/s, where the drive specs indicate 135MB/S and 475MB/s respectively.
For HDFS tests, the nodes are configured both as HDFS and Yarn nodes, with the default 3x replication. For NFS, each node is only a Yarn node and all mount the same shared filesystem via NFSv3 from a 15 blade FlashBlade. For each scenario, the result is presented as the average of three runs.
The first experiment is to compare the completion time of a non-trivial MapReduce. The job chosen is WordCount which creates a histogram of all character strings found in the input data. This particular job was chosen because it produces a significant amount of data in the intermediate shuffle step and output of the reduce step. Further, WordCount is not an “embarrassingly parallel” application and does rely on a high-performance shuffle step.
The input used is a 300GB uncompressed Wikipedia archive dataset. The output from the map collectors (after combiner stage) is 55GB and the final output is 11GB.
The MapReduce running on FlashBlade completed in 4m14s on average in comparison to HDFS which completed in 5m34s on average. This is a 24% faster runtime.
Write Throughput Test:
To test the write throughput of both storage tiers, I used the MapReduce program Teragen. This job is designed to create the input for TeraSort by generating pseudo-random data to be later sorted. In other words, it is writing out data as fast as the pseudo-random number generator can work and is parallelized to all nodes in the cluster. Hence, it mostly stresses the write bandwidth of the underlying storage tier.
The graph contrasts a shared NFS mount on FlashBlade against two HDFS policies: the standard lazy persistence of 3 replicas and synchronous writes of all 3 copies.
For context, the default behavior of HDFS is to not force a sync to disk when acknowledging a write. This trades off write performance for an undefined-length window of unprotected data in case of a power-loss event. The property dfs.datanode.sync.behind.writes was set to true for the fsync version.
For small output sizes, the write throughput of HDFS is approximately 30% higher than NFS. This reflects the fact that the aggregate write throughput is more correlated to the speed of transferring data to the datanode and not the write speed of the underlying disk as the writes are not actually persisted when acknowledged. As the amount of data to be written grows larger, the need to persist in-memory data at the datanode slows down the writes.
The FlashBlade NFS service has significantly more scale so larger tests could only be run on that setup. Below is a screenshot of the sustained write throughput of ~4.5GB/s while creating a 5TB teragen dataset.
Read Throughput Test:
To test the read throughput of the storage tier, I used the Grep MapReduce example to search for a random non-occurring string in the input text. The effective result is that the entire input is read, quickly scanned, and discarded from memory, making this job entirely dependent on read IO performance.
The completion time of the NFS version was 40% faster than the HDFS version.
SplitSize’s Impact on Efficiency:
Before the map tasks start, the job must split the input file or files into tasks. The way the splits occur is normally handled by the FileInputFormat class. For a large file, the map inputs are chunks of the file, where the size is controlled by the property mapreduce.input.fileinputformat.split.minsize.
The default value of this property, 32MB, results in a significant fraction of time being spent in starting java tasks. The larger splits result in more work being done per map task but have to redo more work on a node failure.
The graph below shows the impact on overall completion time running a WordCount MapReduce on a text dump of wikipedia using 4 compute nodes and NFS. The input is a single 57GB file, which is then split into map tasks based on the minimum splitsize property.
The overall completion time is significantly faster with larger effective splits, i.e., more work per map task. The fastest completions were measured with 512MB splits and 1GB showed no improvement in runtime.
The Hadoop standard distribution comes with the easy to use distcp tool for migrating data from one storage location to another. This is a simple MapReduce program that is an effective parallelized copy operation.
It may be necessary to set the number of mappers to a higher number than default using the “-d” option to distcp. In one experiment copying 300GB from HDFS to NFS, the automatically chosen number of mappers was 21. Increasing the number of mappers to 200 resulted in the job completing in 68 seconds instead of a baseline 216 seconds.
The preceding blog post in this series discussed the advantages to disaggregating the compute and storage resources in a Hadoop cluster: simplicity, flexibility, scalability, and performance. This post walked through the steps necessary to run MapReduce on FlashBlade using NFS. No science-project code plugins are required, instead relying on the incredibly well-tested java local filesystem calls and the linux NFS client. In the scenarios tested, MapReduce jobs on the FlashBlade outperform even an SSD-backed HDFS cluster. The FlashBlade platform tested here answers the why and how of using shared-storage effectively due to it’s simplicity, scale-out architecture, and all-flash performance.