Graph-backed Web Application

Overview

While the focus of several previous posts have been slanted towards the back of a graph-based information storage, we will in this post connect with the user, albeit in a crude and proof-of-concept-ish manner. We’ll show a particular back-end web application infrastructure choice as well as one for the front-end but will stop short of providing a complete sample application.

In order to keep this post concise and able to focus on the key concepts, many shortcuts have been taken that unquestionably disqualifies the code herein from being used verbatim in a production setting. The complete absence of security, even rudimentary error handling and not to mention the glaring omission of even basic DevOps concepts should make that more than clear.

Architecture

With the disclaimer out of the way, what are we trying to accomplish then? The idea is that we have a graph store containing information and knowledge that can be inferred from reasoning over it. Suppose we need a zero-client means of interacting with said graph for the purposes of traversal, property augmentation or even structural change.

Server-side web applications that spit HTML at the browser, while lean on the client side, are ruled out for this task owing to the flickering nature of wholesale page refreshes. AJAX is a more logical option that retains client state and excels for in-place page updates. Specifically, we have chosen to write the application in AngularJS backed by RESTful services running in Node.js, using the koa.js framework specifically.

The simplified schematic below outlines the web application components involved:

Graph Web App Arch

From the client’s perspective, AngularJS is loaded via CDN and the rest of the application is served as static (.js, .html and .css) from the Node.JS server. The very same Node.JS server (this detail is quite important) exposes the APIs through RESTful web services that the AngularJS-based application calls.

As this architecture is based on koa.js, common logic is logically factored out as middleware. This system relies on both existing middleware such as koa-route to route RESTful requests and koa-static to serve static content, i.e. the client side of the application that executes in the browser. These APIs do however also rely on proprietary middleware. Specifically, this is code to facilitate communication with the Gremlin server. More on that next.

Graph middleware

Koa.js (from the authors of Express.js) is an exceedingly clever, very contemporary, framework that promotes cohesive, modular design that takes advantage of the powerful features in ES6+ JavaScript.

The code below is the complete definition of the Gremlin middleware for parameterized queries.

var gremlin = require('gremlin');

module.exports = function (opts) {
    "use strict";
    opts = opts || {};
    var port = opts.port || 8182;
    var host = opts.host || 'localhost';

    return function *(next) {
        var client = gremlin.createClient(port, host);

        this.gr = {
            call: function (query, params) {
                return new Promise(function (resolve, reject) {

                    // Perform the call to the Gremlin Server
                    client.execute(query, params, function (err, result) {
                        if (!err)
                            resolve(result);
                        else {
                            console.log('Execute error. Message: ' + err.message + '\n.Query: ' + query);
                            reject(err.message);
                        }
                    });
                });
            }
        };

        try {
            yield next;
        }
        catch (err) {
            console.log('Returning error to the caller: ' + err);
            this.status = 500;
            this.body = err;
        }
    };
};

The important parts here are that the middleware returns a generator whose ‘next’ object it must yield to in order to play nicely with other middleware providers. The ‘gr’ object exposes a call function that returns a Promise of a response from the Gremlin server. In order to maximize Gremlin server’s benefit from query compilation, the call wrapper supports query parameterization.

Server logic

The server definition is rather plain in its declarative approach to server definition.

var app = require('koa')();
var gremlin = require('./mw-gremlin');
var router = require('koa-router')();
var serve = require('koa-static');

app.use(gremlin({port: 8183, host: '10.22.66.1'}))
    // API routes
    .use(router.routes())
    .use(router.allowedMethods())
    // AngularJS is served as static files
    .use(serve('./public'))
    .listen(9000, 'localhost');

It’s crude and it’s hard-coded but it shows the broad strokes of server declaration:

  • Dependencies
  • Middleware configuration
  • Server network configuration

RESTful API implementation

With a sensibly designed middleware-based application architecture, the RESTful API implementations are focused on the logic that justifies their existence and not on repetitive overhead.

/// GET a node's neighbors
router.get('/api/neighbors/:p0', function *() {
    "use strict";
    let m = this.req.url.match('[0-9]+$');
    if (m !== null) {
        let p0 = Number(m[0]);
        this.body = yield this.gr.call("g.V().hasId(p0).union(outE().as('e').inV(), __.inE().as('e').outV()).as('node').select('e', 'node')", { p0: p0 });
    }
});

After doing some basic URL element parsing (which of course could/should be broken out into a function), we simply yield on the Promise returned by the Gremlin middleware’s call method which achieves the synchronous style calling in the API that enables easier-to-read code and robust error handling.

AngularJS Service implementation

Without going into too much AngularJS detail, suffice to say AngularJS applications communicate with web services through AngularJS services that are defined using the factory method. These services, through the magic of dependency injection, can then be made available in controllers, directives or other services for that matter.

Below is the definition of a graph service that facilitates traversal through neighbor nodes. The injected service dependency of ‘traverseState’ is to control the type of traversal, a concept that is specific to the implementation on which this post is based and is not material to the overall gist.

    /// Service communicating with underlying Gremlin graph
    ///
    .factory('graph', ['$resource', 'traverseState', function ($resource, traverseState) {
        "use strict";
        return {
            getNode: function (actorId, f) {
                return $resource('/api/node/:nodeId', { nodeId: actorId }).query()
                    .$promise.then(response => f(response));
            },

            getResponsibilities: function (actorId, f) {
                return $resource('/api/responsibilities/:nodeId', { nodeId: actorId }).query()
                    .$promise.then(response => f(response));
            },

            // Neighbors can be restricted to traverse outcome-related edges only
            getNeighbors: function (nodeId, f) {
                let apiName = '/api/' + (traverseState.isOutcomeTraversal() ? 'outcomes' : 'neighbors') + '/:nodeId';
                return $resource(apiName, { nodeId: nodeId }).query()
                    .$promise.then(response => f(response));
            }
        }
    }])

Conclusion

We’ve stepped top-down into a contemporary web application design that leverages modern framework and language features that are highly relevant in order to create a robust application framework that connects our end users with a Gremlin-interfaced graph database.

Graph Database Cloud Deployment

Overview

In this post, we’ll cover the first few steps towards deploying a graph database in the cloud to facilitate centralized persistence and collaboration. To keep costs down (zero in fact if you haven’t yet exhausted your AWS Free Tier), we’ll deploy a rather scaled down version of the Titan Graph DB that can run on a t2.micro instance. However, just because it can run on such an instance, it should be understood that this is by no means recommended to run a productive Titan server in this manner. Proper production sizing should prevail and this post should be seen as a  rough template that can be scaled to suit bigger needs.

The key takeaways for the deployment are:

  1. Scalable storage and computation
  2. Secure with end-user processes that meaningful and can be enforced

Architecture

The server deployment outlined herein is a very classic one in the sense that it relies on a full-fledged Linux installation as an EC2 node and as such does not leverage containers or lambdas or any other light-weight mechanisms to achieve elastic scalability.

Titan Deployment

Storage

Currently, AWS is less than stellar in that it doesn’t allow for a launch profile to directly attach an instance to an existing block device. This is desirable as with a detached EBS volume backing our database, we could very easily take advantage of Spot instances with an AMI based boot snapshot and a standalone EBS volume where upon instance termination, our database backing store would remain and ready to be mounted the next time we launched a spot instance. Even so, to prepare for this eventuality, we’ll create an EBS volume that we’ll attach to the instance.

In our case, on a RHEL 7 image, an EBS device name of /dev/sdf showed up in linux as /dev/xvdf. While completely optional, we’ll here create a single LVM partition that will give us much flexibility later on when deciding how to divvy up the storage space.

Use fdisk to create a single, primary, partition that spans the device and has device type 8e (Linux LVM). Optionally, you can create a swap partition (type 82) as well as your image by default won’t have this. If you do, of course, this partition needs to be formatted with mkswap, used with swapon and mounted in /etc/fstab

# fdisk /dev/xvdf

In our LVM, we want one really small (e.g. 64MB) Logical Volume (LV) for the Titan data directory as that’s where we’ll keep our credentials graph. The second LV is the bigger one (e.g. 2GB) where the database files will be stored. The reason these are so small despite that the free tier covers up to 30GB is that the free tier eventually runs out and you don’t want to allocate more than you need. LVM makes it easy to grow these as needed. The LV creation follows these steps, presuming /dev/xvdf1 is your LVM partition:

# pvcreate /dev/xvdf1
# vgcreate vg /dev/xvdf1
# lvcreate -L 64M -n tdata vg
# lvcreate -L 2G -n dbdata vg
# mkfs.xfs /dev/mapper/vg-tdata
# mkfs.xfs /dev/mapper/vg-dbdata
# lsblk

The last command lists the device hierarchy.

OpenVPN

This post is not an in-depth elaboration of how to setup an OpenVPN server. There are maintained references for that. Instead, we’ll cover specific choices made when setting it up.

Specifically, we installed OpenVPN from the official RPM, put keys under /etc/openvpn/keys, i.e. the server’s key and certificate, the CA’s certificate and the DH .pem file. The configuration file is /etc/openvpn/server.conf in which we kept the 1194/udp port and protocol. For its simplicity and performance, we use a routed configuration (as opposed to bridged).

Choose a particular subnet and netmask at the server tag. E.g.

server 10.33.44.0 255.255.255.0

Your OpenVPN server will assign IP address 10.33.44.1, which is the address your client application, i.e. those connecting to Titan once the VPN tunnel is up and running, will be using.

From a security point of view, TLS-Auth and running under an unprivileged account is highly recommended as we need to enable world-wide (0.0.0.0/0) access to the VPN server in our AWS Security Group and as such need to arm ourselves against DoS attacks.

Open the requisite firewall port:

# firewall-cmd --zone-public --add-port=1194/udp
# firewall-cmd --zone=public --add-port=1194/udp --permanent

Generate client certificates either directly (including the .key file that then must be transferred over a secure line) or from certificate requests (.csr files).

Once you’ve confirmed that your setup is working, the final piece is to enable the systemd services file that the OpenVPN RPM provided. The .service file provided is a template file so that when you enable it, make sure that the name provided matches the configuration file, in our case server.conf. Hence:

# systemctl enable openvpn@server.service
# systemctl start openvpn@server.service

Titan

We have covered Titan deployments in previous posts and won’t be going into the same level of detail here. In sum, though, we want Titan installed under /var/lib/titan, running under a dedicated account (also named titan). We will be using the titan account to also run the console when necessary, so it’s important that it’s given a login shell (bash) and that it has a home directory. The Gremlin Console insists on writing user preferences and will throw an exception if it’s not there. Alternatively, create the titan user without a shell and home directory and add the titan group to the user account you’ll be running the Gremlin Console with.

Next, we’ll make titan use the LVs we have created. Basically:

# cd /var/lib/titan
# mkdir db
# mount /dev/mapper/vg-dbdata db
# mkdir db/berkeley
# mkdir db/es
# mv data data.old
# mkdir data
# mount /dev/mapper/vg-tdata data
# mv data.old/* data/
# rmdir data.old

Now the key Titan directories are using the EBS partitions. Update /etc/fstab to mount these on boot as e.g.

/dev/mapper/vg-dbata /var/lib/titan/db xfs defaults,noexec 0 2
/dev/mapper/vg-tdata /var/lib/titan/data xfs defaults,noexec 0 2

Next, we’ll copy and modify the following files and make changes such that Titan will use the BerkeleyDB location we specified.

  1. Copy conf/remote.yaml and change the IP to the one Titan listens on.
  2. Copy conf/titan-berkeleyje-es.properties and change the paths to /var/lib/titan/db/berkeley and /var/lib/titan/db/es. Also add the following entry at the bottom:
    gremlin.graph=com.thinkaurelius.titan.core.TitanFactory
  3. Copy conf/gremlin-server/gremlin-server.yaml and change the IP address and the path to the .properties file from the previous step.

Since we’re done with the file config, make the titan account own the install.

# cd /var/lib
# chown -R titan:titan titan

At this point it makes sense to ensure that Titan starts without errors and that as user titan, you can launch the Gremlin Console and connect to the Titan server.

Once that’s complete, it’s time for the final piece; to create a .services file so that systemd launches it on system boot. Start by changing to the system directory for systemd:

# cd /usr/lib/systemd/system

In there, create a file, named e.g. titan.service and tailor the contents to match the name of the server configuration file created above. E.g.

# /usr/lib/systemd/system/titan.service

[Unit]
Description=Titan
After=openvpn@server.service

[Service]
User=titan
Group=titan
WorkingDirectory=/var/lib/titan
ExecStart=/var/lib/titan/bin/gremlin-server.sh conf/gremlin-server/my-server.yaml
Restart=always

[Install]
WantedBy=multi-user.target

Note the dependence on the OpenVPN service launched earlier. This is required as IP address the Titan server listens to is on the tun0 network device created by OpenVPN and it would not do to start Titan before the OpenVPN service has completely started. Enable and start the Titan service:

# systemctl enable titan.service
# systemctl start titan.service

As we’re also running firewalld, open up the port to allow our clients access to the Titan server:

# firewall-cmd --zone-public --add-port=8182/tcp
# firewall-cmd --zone=public --add-port=8182/tcp --permanent

Process

Our server is now self-sustaining in the sense that its storage is organized and services setup. You can reboot, stop and start and even create AMIs from it that can be launched and it will come up properly.

In order to manage new users we need a few simple processes:

  1. Sign certificates from certificate requests. This is simple to do securely as it does not involve the transference of private keys. However, if you have opted for the TLS-Auth option (and it’s recommended that you do), that pre-shared key needs to be distributed to each client in a secure fashion.
  2. Distribute a pre-configured client.conf (or client.ovpn) configuration file to facilitate a hassle-free connection experience.
  3. Once the clients’ VPN tunnels are running, they should connect to Titan via the OpenVPN assigned address, which in our case was 10.33.44.1.
  4. If your Titan server requires authentication, define these credentials in the credentials graph and restart Titan.

 

Spark on Hadoop

Overview

Data engineers are required to to have a functional understanding of a broad range of systems and tools that operate somewhere in the space between source system and end users. Said tools and systems typically store, transform, visualize – or some combination thereof – the information  as it makes its way to the consumers. One such system has been surrounded by sufficient mystique that people ostensibly have been hired purely by virtue of claiming some unquantified degree knowledge thereof. This system is Apache Hadoop; the open source 800 pound gorilla that knowledge thereof, in addition to providing fodder for oohs and aahs at certain cocktail parties, enables distributed storage and processing of data that doesn’t have to have inherent structure.

While many great packaged distributions of Hadoop along with truly awe-inspiring enhancements are readily available often in Community-style editions, there is benefit to, before rushing to complex integrated solutions, spend a little bit of time with the base components on their own to develop an intuition and feel for how they work in isolation. The purpose of this post is to outline the steps performed to first and foremost get a working Hadoop system. Once this foundation is in place, we’ll add an in-memory processing engine to the mix; Apache Spark.

Arguably, the most sincere effort to understand the software we’re using would require us to work with the source where building it is an absolute minimum. However, in this post we won’t be working with the source as it’s not congruent with our goal to get something up and running quickly. After all, this is about being able to use Spark on Hadoop, not setting up build environments.

Environment

Speaking of environment; we’ll be setting up Spark 1.4.1 on Hadoop 2.7.1 on a CentOS 7 guest image running in VMware Fusion 7 Professional on a MacBook Pro running Yosemite.

A degree of care needs to be taken before rushing off to download the software. On the Spark Download Page, be sure to select “Pre-built for Hadoop 2.6 and later. As for Hadoop, simply pick the appropriate binary package from the Hadoop Release Page.

Depending on the objectives with the installation, it may or may not be acceptable or desirable to install under $HOME. As we in this case will be eliminating as many complexities as possible, we will disregard that we can sudo and will thus proceed with a user-local installation which consequently also runs under the current user. Later, we’ll explore what it takes to promote the installation to /usr/local.

As we’ll be performing these activities in a guest virtual  image, it’s beneficial to have performed the SSH key exchange with the host to facilitate simple host access for file copy operations. The guests on my setup have access to the host through host name hostos. That means I can access files on the host system very easily without needing to provide any password provided that the usernames on host and guest match.

guest$ scp hostos:Downloads/hadoop-2.7.1.tar.gz .

Or, perhaps even better, if you have mounted a networked directory, e.g. on a NAS which is the case on our network:

guest$ scp hostos:/Volumes/Server/Common/hadoop-2.7.1.tar.gz .

Getting started

Untar the binary distribution to a location of your choice:

guest$ tar -xf hadoop-2.7.1.tar.gz

Before you’ll be able to run bin/hadoop, you need to tell it which JVM to use. In CentOS7, the latest JVM can be found at /usr/java/latest and you can either point your JAVA_HOME environment variable to that location or you can edit the appropriate location in your hadoop-local etc/hadoop/hadoop-env.sh configuration file. One reason one may want to consider the latter alternative is that non-volatile configuration such as the JVM stays within the hadoop file structure and tends to remove moving parts when making other changes such as which user the server should run under.

We’ll be configuring a so called Pseudo-Distributed Cluster. The steps to configure that are from the Hadoop Quickstart Guide are summarized below. All console operations assume working directory is the hadoop-2.7.1 root.

Define site properties in the following two files as per:

etc/hadoop/core-site.xml:

<configuration>
    <property>
        <name>fs.defaultFS</name>
        <value>hdfs://localhost:9000</value>
    </property>
</configuration>

etc/hadoop/hdfs-site.xml:

<configuration>
    <property>
        <name>dfs.replication</name>
        <value>1</value>
    </property>
</configuration>

In order for the distributed file system operations to work, ssh to localhost need to work without prompting for a password. Some information about that has been provided as part of an earlier post. Once that’s setup, we can proceed with formatting the file system.

guest$ bin/hdfs namenode -format

With a formatted file system, we’re ready to start the file system daemon:

guest$ sbin/start-dfs.sh

After the namenode and datanode have been started, the server is now listening to three new TCP ports: 9000 and a whole bunch in the 50,000 range where the first is where worker requests are received and the latter for various forms of access. The two most interesting ports from a networking point of view at this stage are 50070 (administrative webapp) and 50075 (HTTP file access).

When the CentOS7 guest image is headless, it’s very helpful to get access to those ports from the host. The steps for enabling that with VMware Fusion 7 and NAT’d CentOS7 guests is outlined in a previous post and briefly summarized again:

Open the TCP port 50070 on the guest OS’ firewall:

guest$ sudo firewall-cmd --zone=public --add-port=50070/tcp --permanent

Add the port mapping in your hosts /Library/Preferences/VMware Fusion/vmnet8/nat.conf and restart VMware Fusion. Yes, this means stopping the guest and once it’s restarted, start the DFS as per above.

Validating DFS

At this point we should have a completely functioning Hadoop single-node pseudo cluster. Again referencing the quick start guide, this is the time to confirm that map-reduce jobs can be run.

The general approach is to use the sbin/hdfs command with dfs argument to create directories or put files. E.g.:

guest$ sbin/hdfs dfs -mkdir /user
guest$ sbin/hdfs dfs -mkdir /user/<your user name>
guest$ sbin/hdfs dfs -put etc/hadoop input

A sequence which creates a three level folder hierarchy and puts all Hadoop configuration files in the input folder.

As for the actual tests themselves, hadoop deploys with example jars including jars containing their source which is really handy. Let’s then run a word count example, the bread and butter for Hadoop:

guest$ bin/hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.1.jar wordcount input output

This command starts a hadoop job that will be defined in a specific, example, jar. To the main function of said jar, the first argument indicates which class to direct the remaining arguments to; folder names input and output. When Hadoop runs these  jars, the working directory will be the /user/<your user name>. This is why it can resolve input as the name of the folder that resides on this level. The last argument, output, is the name of the folder that the class behind wordcount will create and fail id it’s there already. To re-run the example and not have to create a growing number of uniquely named output folders, delete the output folder e.g. like so:

guest$ bin/hdfs dfs -rm  -R output

Run the jar through bin/hadoop without any additional arguments to get a list of all possible command maps that have been defined in the jar (in file ExampleDriver.java).

Back to the example though, if the TCP port 50075 has not been opened in the firewall and NAT’d in the virtualization engine, simply download the output folder to the local file system with the DFS command GET:

guest$ bin/hdfs dfs -get output

Pop into the local output and the word count results will be in a file named part-r-0000.

Adding Spark

Because we’re interested in running spark jobs that consume content that resides in a Hadoop cluster (or pseudo-cluster which is the case above), it’s imperative that the spark binaries are built with the proper Hadoop version in mind. For a 2.7.1 Hadoop, we’ll get Spark 1.4.1 built for Hadoop 2.6 and newer.

We untar it right next to our Hadoop install, but before we do anything else, we need to tell Spark about Hadoop and we need to reduce the verbosity of Hadoop’s logging.

For the first one, we define environment variable HADOOP_CONF_DIR to point to Hadoop’s configuration directory. We put it in our ~/.bash_profile for future sessions:

export HADOOP_CONF_DIR=$HOME/hadoop-2.7.1/etc/hadoop

Second change Hadoop’s root log-level from INFO to ERROR by updating the corresponding entry in $HADOOP_CONF_DIR/log4j.properties:

hadoop.root.logger=ERROR,console

We’re now ready to go into our Spark root and run the Python Spark shell, pyspark:

guest$ bin/pyspark

Perform a very basic hdfs:// connectivity test by verifying that Spark can read one of the files that were uploaded earlier into the input folder. Note though that the full path needs to be provided:

>>> testRDD = sc.textFile("hdfs:///user/testuser/input/hadoop-env.sh")
>>> testRDD.count()
99
>>>

While the number (99) may differ between deployments, it should be possible to run the above commands (substituting the testuser as required) without getting any errors.

The shell, be it Python or Scala (or even a slimmed down version of R as of Spark 1.4), is useful for performing simple tests. In order to run more complex logic, use bin/spark-submit to launch the program.

This concludes the very simple basic setup of a single pseudo-cluster node of Hadoop and standalone spark. Much can be done with this simple environment from a functional point of view such as testing genuine Big Data applications, albeit against very modest data sets. Said applications however can then be deployed unchanged onto production grade clusters.

What about YARN?

Production cluster deployments of Hadoop and Spark will include YARN, the Hadoop version 2 resource manager. In a nutshell, Spark is, in addition to a standalone entity, also a YARN application. YARN is so complex it’s bordering on the fantastical and will be discussed in a future post.