Parkour is a Clojure library for writing Hadoop MapReduce programs. The most recent release of Parkour1 includes new features designed to integrate Parkour and Hadoop with the typical REPL-based Clojure workflow. Parkour can now lend a Clojure REPL most of the interactive Hadoop features of the Pig or Hive shells, but backed by the full power of the Clojure programming language.

This post will walk through setting up a Clojure REPL connected to a live Hadoop cluster (running via Amazon’s EMR) and interactively developing a MapReduce program to run on the cluster.

Problem

We need a problem to solve with MapReduce. Treading new data-analysis ground isn’t the goal here, so for this post we’re just going to write a Parkour version of a program to identify decade-wise trending terms in the Google Books n-grams corpus. To make things a bit interesting, we will make an attempt at improving on the original algorithm to include terms which first appear in a given decade.

Preliminaries

This post assumes you either already have fully-configured access to a local Hadoop cluster or know how to launch a cluster on Amazon EMR.

If launching a cluster on EMR, you’ll need to run your REPL process such that the local system Hadoop version matches the EMR cluster Hadoop version and has network access to the cluster services. The easiest way to do this is to run everything on the EMR cluster master node. This is less convenient for actual development than other approaches (e.g. configuring Hadoop to use SOCKS), but involves less setup, and so is what we’ll assume for this post.

Parkour supports the current stable (plus EMR-supported) versions of both Hadoop 1 and Hadoop 2. For this post’s EMR cluster we’ll be using Amazon Hadoop 2.2.0, but Amazon Hadoop 1.0.2 should work just as well.

Create a project

First, install Leiningen2 and create a new project:

$ lein new app trending-terms
Generating a project called trending-terms based on the 'app' template.

Then update the project file to include Parkour and our version of Hadoop:

(defproject trending-terms "0.1.0-SNAPSHOT"
  :description "Decade-wise trending terms in the Google Books n-grams corpus."
  :url "http://github.com/llasram/trending-terms"
  :license {:name "Eclipse Public License"
            :url "http://www.eclipse.org/legal/epl-v10.html"}
  :global-vars {*warn-on-reflection* true}
  :dependencies [[org.clojure/clojure "1.5.1"]
                 [com.damballa/parkour "0.5.4"]
                 [org.apache.avro/avro "1.7.5"]
                 [org.apache.avro/avro-mapred "1.7.5"
                  :classifier "hadoop2"]
                 [transduce/transduce "0.1.1"]]
  :exclusions [org.apache.hadoop/hadoop-core
               org.apache.hadoop/hadoop-common
               org.apache.hadoop/hadoop-hdfs
               org.slf4j/slf4j-api org.slf4j/slf4j-log4j12 log4j
               org.apache.avro/avro
               org.apache.avro/avro-mapred
               org.apache.avro/avro-ipc]
  :profiles {:provided
             {:dependencies [[org.apache.hadoop/hadoop-client "2.2.0"]
                             [org.apache.hadoop/hadoop-common "2.2.0"]
                             [org.slf4j/slf4j-api "1.6.1"]
                             [org.slf4j/slf4j-log4j12 "1.6.1"]
                             [log4j "1.2.17"]]}
             :test {:resource-paths ["test-resources"]}
             :aot {:aot :all, :compile-path "target/aot/classes"}
             :uberjar [:aot]
             :jobjar [:aot]})

There’s currently some incidental complexity to the project file (the :exclusions and logging-related dependencies), but just roll with it for now.

Launch a REPL

In order to launch a cluster-connected REPL, we’ll want to use the lein-hadoop-cluster Leiningen plugin. We’ll also want the Alembic library for some of Parkour’s REPL support functionality. We could add these directly to the project file, but they’re pretty orthogonal to any given individual project, so we’ll just add them to the :user profile in our ~/.lein/profiles.clj:

{:user
 {:plugins [[lein-hadoop-cluster "0.1.2"]]
  :dependencies [[alembic "0.2.1"]]}}

With those changes made, we can actually launch the REPL from within the project directory:

$ lein hadoop-repl

Then (optionally, but suggested) connect to the REPL process from our editor.

Examine the data

Let’s get started writing code, in src/trending_terms/core.clj. First, the namespace preliminaries:

(ns trending-terms.core
  (:require [clojure.string :as str]
            [clojure.core.reducers :as r]
            [transduce.reducers :as tr]
            [abracad.avro :as avro]
            [parkour (conf :as conf) (fs :as fs) (mapreduce :as mr)
             ,       (graph :as pg) (reducers :as pr)]
            [parkour.io (seqf :as seqf) (avro :as mra) (dux :as dux)
             ,          (sample :as sample)]))

Then we can define some functions to access the n-gram corpus:

(def ngram-base-url
  "Base URL for Google Books n-gram dataset."
  "s3://datasets.elasticmapreduce/ngrams/books/20090715")

(defn ngram-url
  "URL for Google Books `n`-gram dataset."
  [n] (fs/uri ngram-base-url "eng-us-all" (str n "gram") "data"))

(defn ngram-dseq
  "Distributed sequence for Google Books `n`-grams."
  [n] (seqf/dseq (ngram-url n)))

With these functions in hand, we can start exploring the data:

trending-terms.core> (->> (ngram-dseq 1) (r/take 2) (into []))
[[1 "#\t1584\t6\t6\t1"] [2 "#\t1596\t1\t1\t1"]]
trending-terms.core> (->> (ngram-dseq 1) (sample/dseq)
                          (r/take 2) (into []))
[[265721405 "APPROPRIATE\t1999\t492\t470\t325"]
 [265721406 "APPROPRIATE\t2000\t793\t723\t375"]]
trending-terms.core> (->> (ngram-dseq 1) (sample/dseq {:seed 2})
                          (r/take 2) (into []))
[[141968715 "poseen\t2007\t10\t10\t8"]
 [141968716 "poseen\t2008\t13\t13\t10"]]

The cluster-connected REPL gives us direct live access to the actual data we want to analyze. The sample/dseq function allows us to select small samples from that data, random within the limits of what Hadoop allows to be efficient.

Write a function

With direct access to the raw records our jobs get from Hadoop, we can easily begin writing functions to operate on those records:

(defn parse-record
  "Parse text-line 1-gram record `rec` into ((gram, decade), n) tuple."
  [rec]
  (let [[gram year n] (str/split rec #"\t")
        gram (str/lower-case gram)
        year (Long/parseLong year)
        n (Long/parseLong n)
        decade (-> year (quot 10) (* 10))]
    [[gram decade] n]))

(defn select-record?
  "True iff argument record should be included in analysis."
  [[[gram year]]]
  (and (<= 1890 year)
       (re-matches #"^[a-z+'-]+$" gram)))

And doing some quick REPL-testing:

trending-terms.core> (->> (ngram-dseq 1) (sample/dseq {:seed 1}) (r/map second)
                          (r/map parse-record)
                          (r/take 2) (into []))
[[["appropriate" 1990] 492] [["appropriate" 2000] 793]]
trending-terms.core> (->> (ngram-dseq 1) (r/map second)
                          (r/map parse-record)
                          (r/filter select-record?)
                          (r/take 2) (into []))
[[["a+" 1890] 1] [["a+" 1890] 2]]

We can glue these functions together into the first map task function we’ll need for our jobs:

(defn normalized-m
  "Parse, normalize, and filter 1-gram records."
  {::mr/source-as :vals, ::mr/sink-as :keyvals}
  [records]
  (->> records
       (r/map parse-record)
       (r/filter select-record?)
       (pr/reduce-by first (pr/mjuxt pr/arg1 +) [nil 0])))

Because Parkour task functions are just functions, we can test that in the REPL as well:

trending-terms.core> (->> (ngram-dseq 1) (sample/dseq {:seed 1}) (r/map second)
                          normalized-m (r/take 3) (into []))
[[["appropriate" 1990] 492]
 [["appropriate" 2000] 7242]
 [["appropriated" 1890] 59]]

As we develop the functions composing our program, we can iterate rapidly by immediately seeing the effect of calling our in-development functions on real data.

Write some jobs

Writing the rest of the jobs is a simple matter of programming. We’ll largely follow the original Hive version, but we will attempt to add the ability for entirely new terms to trend by applying Laplace smoothing to the occurrence counts.

Parkour allows us to optimize the entire process down to just two jobs (and no reduce-side joins). This is something which is equally possible via the base Java Hadoop interfaces, but which you’d rarely bother to do, because it’d be too fiddly in the raw APIs and completely impossible in most higher-level frameworks. In Clojure with Parkour however, it’s relatively straightforward.

But! – today’s main, REPL-based excitement comes after. So, without further commentary, the rest of the code:

(defn nth0-p
  "Partitioning function for first item of key tuple."
  ^long [k v ^long n] (-> k (nth 0) hash (mod n)))

(defn normalized-r
  "Collect: tuples of 1-gram and occurrence counts by decade; total 1-gram
occurrence counts by decade; and counter of total number of 1-grams."
  {::mr/source-as :keyvalgroups,
   ::mr/sink-as (dux/named-keyvals :totals)}
  [input]
  (let [nwords (.getCounter mr/*context* "normalized" "nwords")
        fnil+ (fnil + 0)]
    (->> input
         (r/map (fn [[[gram decade] ns]]
                  [gram [decade (reduce + 0 ns)]]))
         (pr/reduce-by first (pr/mjuxt pr/arg1 conj) [nil {}])
         (reduce (fn [totals [gram counts]]
                   (dux/write mr/*context* :counts gram (seq counts))
                   (merge-with + totals counts))
                 {})
         (seq))))

(defn normalized-j
  "Run job accumulating maps of decade-wise raw occurrence counts per 1-gram
and map of total decade-wise Laplace-smoothed occurrence counts."
  [conf workdir ngrams]
  (let [counts-path (fs/path workdir "counts")
        totals-path (fs/path workdir "totals")
        pkey-as (avro/tuple-schema ['string 'long])
        counts-as {:type 'array, :items (avro/tuple-schema ['long 'long])}
        [counts totals]
        , (-> (pg/input ngrams)
              (pg/map #'normalized-m)
              (pg/partition (mra/shuffle pkey-as 'long) #'nth0-p)
              (pg/reduce #'normalized-r)
              (pg/output :counts (mra/dsink ['string counts-as] counts-path)
                         :totals (mra/dsink ['long 'long] totals-path))
              (pg/execute conf "normalized"))
        gramsc (-> (mr/counters-map totals)
                   (get-in ["normalized" "nwords"])
                   double)
        fnil+ (fnil + gramsc)
        totals (reduce (fn [m [d n]]
                         (update-in m [d] fnil+ n))
                       {} totals)]
    [counts totals]))

(defn trending-m
  "Transform decade-wise 1-gram occurrence counts into negated ratios of
occurrence frequencies in adjacent decades."
  {::mr/source-as :keyvals, ::mr/sink-as :keyvals}
  [totals input]
  (r/mapcat (fn [[gram counts]]
              (let [counts (into {} counts)
                    ratios (reduce-kv (fn [m dy nd]
                                        (let [ng (inc (counts dy 0))]
                                          (assoc m dy (/ ng nd))))
                                      {} totals)]
                (->> (seq ratios)
                     (r/map (fn [[dy r]]
                              (let [r' (ratios (- dy 10))]
                                (if (and r' (< 0.000001 r))
                                  [[dy (- (/ r r'))] gram]))))
                     (r/remove nil?))))
            input))

(defn trending-r
  "Select top `n` 1-grams per decade by negated occurrence frequency ratios."
  {::mr/source-as :keyvalgroups, ::mr/sink-as :keyvals}
  [n input]
  (r/map (fn [[[decade] grams]]
           [decade (into [] (r/take n grams))])
         input))

(defn trending-j
  "Run job selecting trending terms per decade."
  [conf workdir topn counts totals]
  (let [ratio-as (avro/tuple-schema ['long 'double])
        ratio+g-as (avro/grouping-schema 1 ratio-as)
        grams-array {:type 'array, :items 'string}
        trending-path (fs/path workdir "trending")
        [trending]
        , (-> (pg/input counts)
              (pg/map #'trending-m totals)
              (pg/partition (mra/shuffle ratio-as 'string ratio+g-as)
                            #'nth0-p)
              (pg/reduce #'trending-r topn)
              (pg/output (mra/dsink ['long grams-array] trending-path))
              (pg/execute conf "trending"))]
    (into (sorted-map) trending)))

(defn trending-terms
  "Calculate the top `topn` trending 1-grams per decade from Google Books 1-gram
corpus dseq `ngrams`.  Writes job outputs under `workdir` and configure jobs
using Hadoop configuration `conf`.  Returns map of initial decade years to
vectors of trending terms."
  [conf workdir topn ngrams]
  (let [[counts totals] (normalized-j conf workdir ngrams)]
    (trending-j conf workdir topn counts totals)))

Once we start writing our job, we can use Parkour’s “mixed mode” job execution to iterate. Mixed mode allows us to run the job in the REPL process, but on the same live sampled data we were examining before:

trending-terms.core> (->> (ngram-dseq 1) (sample/dseq {:seed 1, :splits 20})
                          (trending-terms (conf/local-mr!) "file:tmp/tt/0" 5))
{1900 ["deglet" "warroad" "delicado" "erostrato" "warad"],
 1910 ["esbly" "wallonie" "wallstein" "dehmel's" "dellion"],
 1920 ["ernestino" "walska" "wandis" "wandke" "watasenia"],
 1930 ["delacorte" "phytosociological" "priuatly" "delber" "escapism"],
 1940 ["phthalates" "phylic" "espiner" "degrease" "wallonie"],
 1950 ["demokos" "wanotan" "ersine" "dekatron" "physicalistically"],
 1960 ["warain" "propeking" "warschaw" "demecarium" "pikiran"],
 1970 ["prioritize" "walshok" "waterboard" "demogrants" "delisted"],
 1980 ["warsl" "watasi" "proarrhythmic" "walonick" "procurved"],
 1990 ["wanglie" "procedores" "printlnc" "dejanews" "eslamboli"],
 2000 ["wardriving" "erlestoke" "deleterole" "deleteq" "erius"]}

This is the moral equivalent of Pig’s ILLUSTRATE command. Parkour lacks the rigid execution model which allows Pig’s ILLUSTRATE to e.g. synthesize data for joins, but the simplicity of the approach means any combination of “remote” jobs and local processing just works, without surprises.

As with sampling input for individual functions, mixed mode job execution supports rapid development iteration on real data.

Write a test

REPL iteration gets results quickly, but once code works, a real test allows us to have confidence that it will continue to work, and the results are what we actually expect. So let’s write a test for our job graph:

(ns trending-terms.core-test
  (:require [clojure.test :refer :all]
            [trending-terms.core :as tt]
            [parkour (fs :as fs) (conf :as conf)]
            [parkour.io (dsink :as dsink) (seqf :as seqf)]
            [parkour.test-helpers :as th])
  (:import [org.apache.hadoop.io Text LongWritable]))

(def n1grams-records
  ;; omitted from blog post for clarity
  )

(deftest test-basic
  (th/with-config
    (let [workdir (doto "tmp/work/basic" fs/path-delete)
          inpath (fs/path workdir "ngrams")
          ngrams (dsink/with-dseq (seqf/dsink [LongWritable Text] inpath)
                   n1grams-records)
          trending (tt/trending-terms (th/config) workdir ngrams 1)]
      (is (= {1950 ["legal"],
              1960 ["assembly"],
              1970 ["astoria"],
              2000 ["prostate"]}
             trending)))))

The Parkour th/config function and th/with-config macro allow us to run code using a purely local-mode Hadoop configuration, even in a process where the default configuration points to a live cluster. Just like we were able to REPL-test jobs using mixed mode, we can now run our actual tests in-REPL in full local mode:

trending-terms.core-test> (run-tests)

Testing trending-terms.core-test

Ran 1 tests containing 1 assertions.
0 failures, 0 errors.
{:type :summary, :pass 1, :test 1, :error 0, :fail 0}

Success!

Launch a job

Once our program is developed and tested, it’s time to run it on the full dataset. Normally this would involve leaving the REPL to build a job JAR and deploy it somehow, but Parkour allows us to do this directly from the REPL too:

trending-terms.core> (require '[parkour.repl :refer [launch!]])
trending-terms.core> (def *results
                       (future (->> (ngram-dseq 1)
                                    (launch! {"mapred.reduce.tasks" 8}
                                             trending-terms "tt/0" 5))))
#<Var@3a23a4ec: #<Future@55f5a074: :pending>>
trending-terms.core> (realized? *results)
false

(We run the jobs in a future to place it in a background thread, and thus not tie up the REPL while the jobs are running.)

Parkour uses hugoduncan’s Alembic library to load and interact with a full in-process (but isolated) instance of Leiningen. Using this Leiningen instance, Parkour just builds your job JAR and assembles your dependencies exactly as specified by your Leiningen project file.

Once the job finishes, time for some results:

trending-terms.core> (realized? *results)
true
trending-terms.core> @*results
{1900 ["strether" "fluidextract" "thutmose" "adrenalin" "lekythoi"],
 1910 ["orthotype" "britling" "salvarsan" "pacifist" "boches"],
 1920 ["liliom" "bacteriophage" "prohack" "vanzetti" "erlend"],
 1930 ["vridar" "samghin" "mulan" "nazis" "goebbels"],
 1940 ["psia" "plutonium" "luftwaffe" "darlan" "beachhead"],
 1950 ["lopatkin" "rooscvelt" "fluoridation" "jacy" "desegregation"],
 1960 ["vietcong" "synanon" "tshombe" "lumumba" "psychedelic"],
 1970 ["mdhr" "sexist" "sexism" "biofeedback" "counterculture"],
 1980 ["affit" "autocad" "dbase" "neob" "garion"],
 1990 ["activex" "photoshop" "javascript" "netscape" "toolbars"],
 2000 ["cengage" "eventargs" "itunes" "podcast" "wsdl"]}

How trendy! It looks like our smoothing function has added more noise from rare terms3, but the basics are there for the tweaking.

The complete example trending-terms project is on Github, if you want to give a try at experimenting with it (in a live REPL!) yourself.

Parting thoughts

Thanks to rfarrjr for awesome discussion around these features, and to ztellman and ahobson for prompting their value and for specific suggestions. I was honestly skeptical at first that this sort of REPL integration could be made useful, from past experience trying to make a live-cluster Cascalog REPL work. But now that these features exist, I’m not sure how I wrote MapReduce programs without them.

So head over to the Parkour project and get started!

1 Version 0.5.4, at the time of writing.

2 On the EMR Hadoop cluster master node.

3 Especially OCR errors.

blog comments powered by Disqus