Spark on Hadoop

Overview

Data engineers are required to to have a functional understanding of a broad range of systems and tools that operate somewhere in the space between source system and end users. Said tools and systems typically store, transform, visualize – or some combination thereof – the informationĀ  as it makes its way to the consumers. One such system has been surrounded by sufficient mystique that people ostensibly have been hired purely by virtue of claiming some unquantified degree knowledge thereof. This system is Apache Hadoop; the open source 800 pound gorilla that knowledge thereof, in addition to providing fodder for oohs and aahs at certain cocktail parties, enables distributed storage and processing of data that doesn’t have to have inherent structure.

While many great packaged distributions of Hadoop along with truly awe-inspiring enhancements are readily available often in Community-style editions, there is benefit to, before rushing to complex integrated solutions, spend a little bit of time with the base components on their own to develop an intuition and feel for how they work in isolation. The purpose of this post is to outline the steps performed to first and foremost get a working Hadoop system. Once this foundation is in place, we’ll add an in-memory processing engine to the mix; Apache Spark.

Arguably, the most sincere effort to understand the software we’re using would require us to work with the source where building it is an absolute minimum. However, in this post we won’t be working with the source as it’s not congruent with our goal to get something up and running quickly. After all, this is about being able to use Spark on Hadoop, not setting up build environments.

Environment

Speaking of environment; we’ll be setting up Spark 1.4.1 on Hadoop 2.7.1 on a CentOS 7 guest image running in VMware Fusion 7 Professional on a MacBook Pro running Yosemite.

A degree of care needs to be taken before rushing off to download the software. On the Spark Download Page, be sure to select “Pre-built for Hadoop 2.6 and later. As for Hadoop, simply pick the appropriate binary package from the Hadoop Release Page.

Depending on the objectives with the installation, it may or may not be acceptable or desirable to install under $HOME. As we in this case will be eliminating as many complexities as possible, we will disregard that we can sudo and will thus proceed with a user-local installation which consequently also runs under the current user. Later, we’ll explore what it takes to promote the installation to /usr/local.

As we’ll be performing these activities in a guest virtualĀ  image, it’s beneficial to have performed the SSH key exchange with the host to facilitate simple host access for file copy operations. The guests on my setup have access to the host through host name hostos. That means I can access files on the host system very easily without needing to provide any password provided that the usernames on host and guest match.

guest$ scp hostos:Downloads/hadoop-2.7.1.tar.gz .

Or, perhaps even better, if you have mounted a networked directory, e.g. on a NAS which is the case on our network:

guest$ scp hostos:/Volumes/Server/Common/hadoop-2.7.1.tar.gz .

Getting started

Untar the binary distribution to a location of your choice:

guest$ tar -xf hadoop-2.7.1.tar.gz

Before you’ll be able to run bin/hadoop, you need to tell it which JVM to use. In CentOS7, the latest JVM can be found at /usr/java/latest and you can either point your JAVA_HOME environment variable to that location or you can edit the appropriate location in your hadoop-local etc/hadoop/hadoop-env.sh configuration file. One reason one may want to consider the latter alternative is that non-volatile configuration such as the JVM stays within the hadoop file structure and tends to remove moving parts when making other changes such as which user the server should run under.

We’ll be configuring a so called Pseudo-Distributed Cluster. The steps to configure that are from the Hadoop Quickstart Guide are summarized below. All console operations assume working directory is the hadoop-2.7.1 root.

Define site properties in the following two files as per:

etc/hadoop/core-site.xml:

<configuration>
    <property>
        <name>fs.defaultFS</name>
        <value>hdfs://localhost:9000</value>
    </property>
</configuration>

etc/hadoop/hdfs-site.xml:

<configuration>
    <property>
        <name>dfs.replication</name>
        <value>1</value>
    </property>
</configuration>

In order for the distributed file system operations to work, ssh to localhost need to work without prompting for a password. Some information about that has been provided as part of an earlier post. Once that’s setup, we can proceed with formatting the file system.

guest$ bin/hdfs namenode -format

With a formatted file system, we’re ready to start the file system daemon:

guest$ sbin/start-dfs.sh

After the namenode and datanode have been started, the server is now listening to three new TCP ports: 9000 and a whole bunch in the 50,000 range where the first is where worker requests are received and the latter for various forms of access. The two most interesting ports from a networking point of view at this stage are 50070 (administrative webapp) and 50075 (HTTP file access).

When the CentOS7 guest image is headless, it’s very helpful to get access to those ports from the host. The steps for enabling that with VMware Fusion 7 and NAT’d CentOS7 guests is outlined in a previous post and briefly summarized again:

Open the TCP port 50070 on the guest OS’ firewall:

guest$ sudo firewall-cmd --zone=public --add-port=50070/tcp --permanent

Add the port mapping in your hosts /Library/Preferences/VMware Fusion/vmnet8/nat.conf and restart VMware Fusion. Yes, this means stopping the guest and once it’s restarted, start the DFS as per above.

Validating DFS

At this point we should have a completely functioning Hadoop single-node pseudo cluster. Again referencing the quick start guide, this is the time to confirm that map-reduce jobs can be run.

The general approach is to use the sbin/hdfs command with dfs argument to create directories or put files. E.g.:

guest$ sbin/hdfs dfs -mkdir /user
guest$ sbin/hdfs dfs -mkdir /user/<your user name>
guest$ sbin/hdfs dfs -put etc/hadoop input

A sequence which creates a three level folder hierarchy and puts all Hadoop configuration files in the input folder.

As for the actual tests themselves, hadoop deploys with example jars including jars containing their source which is really handy. Let’s then run a word count example, the bread and butter for Hadoop:

guest$ bin/hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.1.jar wordcount input output

This command starts a hadoop job that will be defined in a specific, example, jar. To the main function of said jar, the first argument indicates which class to direct the remaining arguments to; folder names input and output. When Hadoop runs theseĀ  jars, the working directory will be the /user/<your user name>. This is why it can resolve input as the name of the folder that resides on this level. The last argument, output, is the name of the folder that the class behind wordcount will create and fail id it’s there already. To re-run the example and not have to create a growing number of uniquely named output folders, delete the output folder e.g. like so:

guest$ bin/hdfs dfs -rm  -R output

Run the jar through bin/hadoop without any additional arguments to get a list of all possible command maps that have been defined in the jar (in file ExampleDriver.java).

Back to the example though, if the TCP port 50075 has not been opened in the firewall and NAT’d in the virtualization engine, simply download the output folder to the local file system with the DFS command GET:

guest$ bin/hdfs dfs -get output

Pop into the local output and the word count results will be in a file named part-r-0000.

Adding Spark

Because we’re interested in running spark jobs that consume content that resides in a Hadoop cluster (or pseudo-cluster which is the case above), it’s imperative that the spark binaries are built with the proper Hadoop version in mind. For a 2.7.1 Hadoop, we’ll get Spark 1.4.1 built for Hadoop 2.6 and newer.

We untar it right next to our Hadoop install, but before we do anything else, we need to tell Spark about Hadoop and we need to reduce the verbosity of Hadoop’s logging.

For the first one, we define environment variable HADOOP_CONF_DIR to point to Hadoop’s configuration directory. We put it in our ~/.bash_profile for future sessions:

export HADOOP_CONF_DIR=$HOME/hadoop-2.7.1/etc/hadoop

Second change Hadoop’s root log-level from INFO to ERROR by updating the corresponding entry in $HADOOP_CONF_DIR/log4j.properties:

hadoop.root.logger=ERROR,console

We’re now ready to go into our Spark root and run the Python Spark shell, pyspark:

guest$ bin/pyspark

Perform a very basic hdfs:// connectivity test by verifying that Spark can read one of the files that were uploaded earlier into the input folder. Note though that the full path needs to be provided:

>>> testRDD = sc.textFile("hdfs:///user/testuser/input/hadoop-env.sh")
>>> testRDD.count()
99
>>>

While the number (99) may differ between deployments, it should be possible to run the above commands (substituting the testuser as required) without getting any errors.

The shell, be it Python or Scala (or even a slimmed down version of R as of Spark 1.4), is useful for performing simple tests. In order to run more complex logic, use bin/spark-submit to launch the program.

This concludes the very simple basic setup of a single pseudo-cluster node of Hadoop and standalone spark. Much can be done with this simple environment from a functional point of view such as testing genuine Big Data applications, albeit against very modest data sets. Said applications however can then be deployed unchanged onto production grade clusters.

What about YARN?

Production cluster deployments of Hadoop and Spark will include YARN, the Hadoop version 2 resource manager. In a nutshell, Spark is, in addition to a standalone entity, also a YARN application. YARN is so complex it’s bordering on the fantastical and will be discussed in a future post.