connecting the dots . . . (original) (raw)

Machine Learning with Clojure and Spark using Flambo

In this short tutorial I’m going to show you how to train a logistic regression classifier in a scalable manner withApache Spark andClojure using Flambo.

Assumptions:

The goal of the tutorial is to help you familiarize yourself with Flambo – a Clojure DSL for Apache Spark. Even though Flambo is far from being complete, it already does a decent job of wrapping basic Spark APIs into idiomatic Clojure.

During the course of the tutorial, we are going to train a classifier capable of predicting whether a wine would taste good given certain objective chemical characteristics.

Step 1. Create new project

Run these commands:

$ lein new app t01spark
$ cd t01spark

Here, t01spark is the name of the project. You can give it any name you like. Don’t forger to change the current directory to the project you’ve just created.

Step 2. Update project.clj

Open project.clj in a text editor and update the dependency section so it looks like this:

:dependencies
    [[org.clojure/clojure "1.6.0"]
     [yieldbot/flambo "0.6.0"]
     [org.apache.spark/spark-mllib_2.10 "1.3.0"]]

Please note that although listing Spark jars in this manner is perfectly fine for exploratory projects, it is not suitable for production use. For that you will need to list them as “provided” dependencies in the profiles section, but let’s keep things simple for now.

Make sure that AOT is enabled, otherwise you will see strangeClassNotFound errors. Add this to the project file:

:aot :all

It also could make sense to add some extra memory for Spark:

:jvm-opts ^:replace ["-server" "-Xmx2g"]

Step 3. Download dataset

In this tutorial we are going to use theWine Quality Dataset. Download and save it along with the project.clj file:

$ wget http://archive.ics.uci.edu/ml/machine-learning-databases/wine-quality/winequality-red.csv

Step 4. Start REPL

The simplest way is running Leiningen with the repl command:

$ lein repl
Clojure 1.6.0
Java HotSpot(TM) 64-Bit Server VM 1.8.0_xxx
...
user=>

Of course, nothing prevents you from running REPL inEmacs withCider,IntelliJ IDEA or any other Clojure-aware IDE.

Step 5. Require modules and import classes

user=> (require '[flambo.api :as f]
                '[flambo.conf :as cf]
                '[flambo.tuple :as ft]
                '[clojure.string :as s])

user=> (import '[org.apache.spark.mllib.linalg Vectors]
               '[org.apache.spark.mllib.regression LabeledPoint]
               '[org.apache.spark.mllib.classification LogisticRegressionWithLBFGS]
               '[org.apache.spark.mllib.evaluation BinaryClassificationMetrics])

Step 6. Create Spark context

user=> (def spark
         (let [cfg (-> (cf/spark-conf)
                       (cf/master "local[2]")
                       (cf/app-name "t01spark")
                       (cf/set "spark.akka.timeout" "300"))]
           (f/spark-context cfg)))

We’ve just created a Spark context bound to a local, in-process Spark server. You should see lots of INFO log messages in the terminal. That’s normal. Again, creating a Spark context like this will work for tutorial purposes, although in real life you’d probably want to wrap this expression into amemoizing function and call it whenever you need a context.

Step 7. Load and parse data

The data is stored in a CSV file with a header. We don’t need that header. To get rid of it, let’s enumerate rows and retain only those with indexes greater than zero. Then we split each row by the semicolon character and convert each element to float:

user=> (def data
         ;; Read lines from file
         (-> (f/text-file spark "winequality-red.csv")
             ;; Enumerate lines.
             ;; This function is missing from Flambo,
             ;; so we call the method directly
             (.zipWithIndex)
             ;; This is here purely for convenience:
             ;; it transforms Spark tuples into Clojure vectors
             (f/map f/untuple)
             ;; Get rid of the header
             (f/filter (f/fn [[line idx]] (< 0 idx)))
             ;; Split lines and transform values
             (f/map (f/fn [[line _]]
                      (->> (s/split line #";")
                           (map #(Float/parseFloat %)))))))

Let’s verify what’s in the RDD:

user=> (f/take data 3)
[(7.4 0.7 0.0 1.9 0.076 11.0 34.0 0.9978 3.51 0.56 9.4 5.0)
 (7.8 0.88 0.0 2.6 0.098 25.0 67.0 0.9968 3.2 0.68 9.8 5.0)
 (7.8 0.76 0.04 2.3 0.092 15.0 54.0 0.997 3.26 0.65 9.8 5.0)]

Looks legit.

Step 8. Transform data

The subjective wine quality information is contained in the Quality variable. It takes values in the [0..10] range. Let’s transform that into a binary variable by splitting it over the median. Wines with Quality below 6 will be considered “not good”, 6 and above - “good”.

I explored this dataset in R and found that the most interesting variables are Citric Acid, Total Sulfur Dioxide and Alcohol. I encourage you to experiment with adding other variables to the model. Also, using logarithms of those variables instead of raw values might be a good idea. Please refer to the Wine Quality Dataset for the full variable list.

 user=> (def dataset
          (f/map data
                 (f/fn [[_ _ citric-acid _ _ _
                         total-sulfur-dioxide _ _ _
                         alcohol quality]]
                   ;; A wine is "good" if the quality is above the median
                   (let [good (if (<= 6 quality) 0.0 1.0)
                         ;; these will be our predictors
                         pred (double-array [citric-acid
                                             total-sulfur-dioxide
                                             alcohol])]
                     ;; Spark requires samples to be packed into LabeledPoints
                     (LabeledPoint. good (Vectors/dense pred))))))

 user=> (f/take dataset 3)
 [#<LabeledPoint (1.0,[0.0,34.0,9.399999618530273])>
  #<LabeledPoint (1.0,[0.0,67.0,9.800000190734863])>
  #<LabeledPoint (1.0,[0.03999999910593033,54.0,9.800000190734863])>]

There is no order guarantee in derived RDDs, so you might get a different result.

Step 9. Prepare training and validation datasets

 user=> (f/cache dataset) ; Temporary cache the source dataset
                          ; BTW, caching is a side effect

 user=> (def training
          (-> (f/sample dataset false 0.8 1234)
              (f/cache)))

 user=> (def validation
          (-> (.subtract dataset training)
              (f/cache)))

 user=> (map f/count [training validation]) ; Check the counts
 (1291 235)

 user=> (.unpersist dataset) ; no need to cache it anymore

Caching is crucial for MLlib performance. Actually, Spark MLlib algorithms will complain if you feed them with uncached datasets.

Step 10. Train classifier

MLlib-related parts are completely missing from Flambo, but that’s hopefully coming soon. For now, let’s use the Java API directly.

 user=> (def classifier
          (doto (LogisticRegressionWithLBFGS.)
            ;; Otherwise we'll need to provide it
            (.setIntercept true)))

 user=> (def model
          (doto (.run classifier (.rdd training))
            ;; We need the "raw" probability predictions
            (.clearThreshold)))

 user=> [(.intercept model) (.weights model)]
 [9.805476268219566
  #<DenseVector [-1.6766504448212323,0.011619041367225583,-0.9683045663615859]>]

Step 11. Assess predictive power

First, let’s create a function to compute the area under theprecision-recall curve and the area under thereceiver operating characteristiccurve. These are two the most important indicators of the predictive power of a trained classification model.

user=> (defn metrics [ds model]
         ;; Here we construct an RDD containing [prediction, label]
         ;; tuples and compute classification metrics.
         (let [pl (f/map ds (f/fn [point]
                              (let [y (.label point)
                                    x (.features point)]
                                (ft/tuple (.predict model x) y))))
               metrics (BinaryClassificationMetrics. (.rdd pl))]
           [(.areaUnderROC metrics)
            (.areaUnderPR metrics)]))

Obtain metrics for the training dataset:

 user=> (metrics training model)
 [0.7800174890996763 0.7471259498290513]

And then for the validation dataset:

 user=> (metrics validation model)
 [0.7785138248847928 0.7160113864756078]

OK, let’s turn L2 regularization on and rebuild the model:

 user=> (doto (.optimizer classifier)
          (.setRegParam 0.0001))

 user=> (def model
          (doto (.run classifier (.rdd training))
            (.clearThreshold)))

 user=> (metrics training model)
 [0.7794660966515655 0.748073583460006]

 user=> (metrics validation model)
 [0.7807459677419355 0.7200550175610565]

Looks good? I’m sure you can do better.

Step 12. Build predictor

As a final step, let’s define a function that we could use for predicting wine quality:

 user=> (defn is-good? [model citric-acid
                        total-sulfur-dioxide alcohol]
          (let [point (-> (double-array [citric-acid
                                         total-sulfur-dioxide
                                         alcohol])
                          (Vectors/dense))
                prob (.predict model point)]
            (< 0.5 prob)))

 user=> (is-good? model 0.0 34.0 9.399999618530273)
 true

Conclusion

We have built a simple logistic regression classifier in Clojure on Apache Spark using Flambo. Some parts of the Flambo API are still missing, but it’s definitely usable. It was not terribly difficult to get it working and I hope you had fun.