Using Spark with MongoDB (original) (raw)
I recently started investigating Apache Spark as a framework for data mining. Spark builds upon Apache Hadoop, and allows a multitude of operations more than map-reduce. It also supports streaming data with iterative algorithms.
Since Spark builds upon Hadoop and HDFS, it is compatible with any HDFS data source. Our server uses MongoDB, so we naturally turned to the mongo-hadoop connector, which allows reading and writing directly from a Mongo database.
However, it was far from obvious (at least for a beginner with Spark) how to use and configure mongo-hadoop together with Spark. After a lot of experimentation, frustration, and a few emails to the Spark user mailing list, I got it working in both Java and Scala. I wrote this tutorial to save others the exasperation.
Read below for details. The impatient can just grab the example application code.
Versions and APIs
The Hadoop ecosystem is fraught with different library versions and incompatible APIs. The major API change was in Hadoop 0.20, where the old org.apache.hadoop.mapred
API was changed to org.apache.hadoop.mapreduce
. The API change reflects also to other libraries: mongo-hadoop has the packages com.mongodb.hadoop.mapred
(old) and com.mongodb.hadoop
(new), while SparkContext
contains the methods hadoopRDD
and newAPIHadoopRDD
.
You need to take care to use the correct versions of each API. This is made even more confusing since in most cases the class names of the two APIs are the same, and only the package differs. If you get mysterious errors, double-check that you’re using the APIs consistently.
The example application uses Hadoop 2.2.0 and the new API.
Library dependencies
Apache Spark depends on a multitude of supporting libraries ranging from Apache Commons and Hadoop to slf4j and Jetty. Don’t even try to manage the library dependencies yourself — use Maven, Ivy, SBT or something similar.
The example application uses SBT with the Akka Maven repositories. The Maven repositories contain the mongo-hadoop connector for several different Hadoop versions, but not for 2.2.0. Therefore the mongo-hadoop connector is included as an unmanaged library.
Using mongo-hadoop from Spark
The mongo-hadoop configuration parameters are passed using a Configuration
object (from the Hadoop packages). The most important parameters are mongo.input.uri
and mongo.output.uri
, which provide the Mongo host, port, authentication, database and collection names. You can also provide other configuration options, for example a Mongo query to limit the data.
Each Mongo collection is loaded as a separate RDD using the SparkContext
:
JavaPairRDD<Object, BSONObject> rdd = sc.newAPIHadoopRDD(config, MongoInputFormat.class, Object.class, BSONObject.class);
This uses the new API, and MongoInputFormat
must be imported from com.mongodb.hadoop
. For the old API you would use the hadoopRDD
method and com.mongodb.hadoop.mapred.MongoInputFormat
.
The type returned is RDD<Object, BSONObject>
. The first parameter is an ObjectId
instance, which is the Mongo object ID of the document. The second parameter contains the BSON document.
Saving an RDD back to Mongo uses the saveAsNewAPIHadoopFile
method:
rdd.saveAsNewAPIHadoopFile("file:///bogus", Object.class, Object.class, MongoOutputFormat.class, config);
Only the two last parameters seem to be relevant (though the first argument must be a valid HDFS URI). The RDD is again of type RDD<Object, BSONObject>
. However, due to a bug, the first argument cannot be an ObjectId
. If you want to provide the object ID explicitly, use a String
. If you want the Mongo driver to generate the ID automatically, set it to null
(as the example application does).
The example app
The example application contains a simple word count algorithm in both Java and Scala. They read from a MongoDB running on localhost from the database beowulf
and collection input
. The documents contain just a text
field, on which the word count is run.
The results are stored in the same database in the collection output
, with the document containing the fields word
and count
.
The example requires MongoDB to be running on localhost and Scala 2.10 and SBT to be installed. Afterwards, you can import the example data, run the programs and print out the results with the following commands:
mongoimport -d beowulf -c input beowulf.json sbt 'run-main JavaWordCount' sbt 'run-main ScalaWordCount' mongo beowulf --eval 'printjson(db.output.find().toArray())' | less