Pig Usage (original) (raw)

Usage with Pig

Installation

  1. Obtain the MongoDB Hadoop Connector. You can either build it or download the jars. The releases page also includes instructions for use with Maven and Gradle. For Pig, you'll need the "core" jar and the "pig" jar.
  2. Get a JAR for the MongoDB Java Driver. The connector requires at least version 3.0.0 of the driver "uber" jar (called "mongo-java-driver.jar").
  3. Use REGISTER statements in your Pig script to include these JARs (core, pig, and the Java driver), e.g., REGISTER /path-to/mongo-hadoop-pig-<version>.jar;.

Reading into Pig

From a MongoDB collection

To load records from MongoDB database to use in a Pig script, a class called MongoLoader is provided. To use it, first register the dependency jars in your script and then specify the Mongo URI to load with the MongoLoader class.

-- First, register jar dependencies
REGISTER mongo-java-driver.jar  
REGISTER mongo-hadoop-core.jar
REGISTER mongo-hadoop-pig.jar

raw = LOAD 'mongodb://localhost:27017/demo.yield_historical.in' USING com.mongodb.hadoop.pig.MongoLoader;

MongoLoader can be used in two ways - Dynamic Schema and Fixed Schema mode. By creating an instance of the class without specifying any field names in the constructor (as in the previous snippet) each record will appear to Pig as a tuple containing a single Map that corresponds to the document from the collection, for example:

([bc2Year#7.87,bc3Year#7.9,bc1Month#,bc5Year#7.87,_id#631238400000,bc10Year#7.94,bc20Year#,bc7Year#7.98,bc6Month#7.89,bc3Month#7.83,dayOfWeek#TUESDAY,bc30Year#8,bc1Year#7.81])

However, by creating a MongoLoader instance with a specific list of field names, you can map fields in the document to fields in a Pig named tuple datatype. When used this way, MongoLoader takes two arguments:

schema - a schema (list of fields/datatypes) that will map fields in the document to fields in the Pig records. See section below on Datatype Mapping for details.

idAlias - an alias to use for the _id field in documents retrieved from the collection. The string "_id" is not a legal pig variable name, so the contents of the field in _id will be mapped to a value in Pig accordingly by providing a value here.

Example:

-- Load two fields from the documents in the collection specified by this URI
-- map the "_id" field in the documents to the "id" field in pig
> raw = LOAD 'mongodb://localhost:27017/demo.yield_historical.in' using com.mongodb.hadoop.pig.MongoLoader('id, bc10Year', 'id');
> raw_limited = LIMIT raw 3;
> dump raw_limited; 
(631238400000,7.94)
(631324800000,7.99)
(631411200000,7.98)

Note: Pig 0.9 and earlier have issues with non-named tuples. You may need to unpack and name the tuples explicitly, for example: The tuple (1,2,3) can not be transformed into a MongoDB document. But,FLATTEN((1,2,3)) as v1, v2, v3 can successfully be stored as {'v1': 1, 'v2': 2, 'v3': 3}Pig 0.10 and later handles both cases correctly, so avoiding Pig 0.9 or earlier is recommended.

From a .BSON file

You can load records directly into a Pig relation from a BSON file using the BSONLoader class, for example:

raw = LOAD 'file:///tmp/dump/yield_historical.in.bson' USING com.mongodb.hadoop.pig.BSONLoader;

As with MongoLoader you can also supply an optional idAlias argument to map the _id field to a named Pig field, along with aschema to select fields/types to extract from the documents.

Datatype Mapping

In the second optional argument to the BSONLoader and MongoLoader class constructors, you can explicitly provide a datatype for each element of the schema by using the Pig schema syntax, for example name:chararray, age:int. If the types aren't provided, the output type will be inferred based on the values in the documents. Data mappings used for these inferred types are as follows:

Note: older versions of Pig may not be able to generate mappings when tuples are unnamed, due toPIG-2509. If you get errors, try making sure that all top-level fields in the relation being stored have names assigned to them or try using a newer version of Pig.

Writing output from Pig

Static BSON file output

To store output from Pig in a .BSON file (which can then be imported into a MongoDB instance using mongorestore) use the BSONStorage class. Example:

STORE raw_out INTO 'file:///tmp/whatever.bson' USING com.mongodb.hadoop.pig.BSONStorage;

If you want to supply a custom value for the _id field in the documents written out by BSONStorage you can give it an optionalidAlias field which maps a value in the Pig record to the _id field in the output document, for example:

STORE raw_out INTO 'file:///tmp/whatever.bson' USING com.mongodb.hadoop.pig.BSONStorage('id');

The output URI for BSONStorage can be any accessible file system including hdfs:// and s3n://. However, when using S3 for an output file, you will also need to set fs.s3.awsAccessKeyId and fs.s3.awsSecretAccessKey for your AWS account accordingly.

Inserting directly into a MongoDB collection

To make each output record be used as an insert into a MongoDB collection, use the MongoInsertStorage class supplying the output URI. For example:

STORE dates_averages INTO 'mongodb://localhost:27017/demo.yield_aggregated' USING com.mongodb.hadoop.pig.MongoInsertStorage('', '' );

The MongoInsertStorage class also takes one argument: an idAlias as described above. If idAlias is left blank, an ObjectId will be generated for the value of the _id field in each output document.

Updating a MongoDB collection

If instead of inserting documents you need to update, upsert, or replace documents, you can use com.mongodb.hadoop.pig.MongoUpdateStorage. Here's an overview:

STORE <aliasname> INTO 'mongodb://localhost:27017/<db>.<collection>'
                  USING com.mongodb.hadoop.pig.MongoUpdateStorage(
                        '<query>',
                        '<update>',
                        '<schema>',
                        '<fieldtoignore>',
                        '<updateOptions>');

where

Consider the following examples:

Assume we have an alias data that is a bag of tuples.

data =
{
 ("Bab", "Alabi", "male", 19, {("a"), ("b"), ("c")}),
 ("Dad", "Alabi", "male", 21, {("d"), ("e"))}),
 ("Tins", "Dada", "female", 50, {})
}

with schema f:chararray, l:chararray, g:chararray, age:int, cars:{t:(car:chararray)}.Note: Every pig data structure in a pig schema has to be named.

To insert the gender, first and last names of each person in data into a test.persons_info collection, making sure that we update any existing documents with the same first and last fields, use

STORE data INTO 'mongodb://localhost:27017/test.persons_info'
           USING com.mongodb.hadoop.pig.MongoUpdateStorage(
                 '{first:"\$f", last:"\$l"}',
                 '{\$set:{gender:"\$g"}}',
                 'f:chararray, l:chararray, g:chararray, age:int, cars:{t:(car:chararray)}'
           );

The resulting collection looks like this:

{ "_id" : ObjectId("..."), "first":"Bab", "last":"Alabi", "gender":"male"},
{ "_id" : ObjectId("..."), "first":"Dad", "last":"Alabi", "gender":"male"},
{ "_id" : ObjectId("..."), "first":"Tins", "last":"Dada", "gender":"female"}

Next, let's say, we want to include the age and cars for each person into the collection, use:

STORE data INTO 'mongodb://localhost:27017/test.persons_info'
           USING com.mongodb.hadoop.pig.MongoUpdateStorage(
                 '{first:"\$f", last:"\$l"}',
                 '{\$set:{age:"\$age"}, \$pushAll:{cars:"\$cars"}}',
                 'f:chararray, l:chararray, g:chararray, age:int, cars:{t:(car:chararray)}'
           );

The resulting collection looks like this:

{ "_id" : ObjectId("..."), "gender":"male", "age" : 19, "cars" : [{"car": "a"}, {"car":"b"}, {"car":"c"}], "first" : "Daniel", "last" : "Alabi" }
{ "_id" : ObjectId("..."), "gender":"male", "age" : 21, "cars" : [{"car":"d"}, {"car":"e"}], "first" : "Tolu", "last" : "Alabi" }
{ "_id" : ObjectId("..."), "gender":"female", "age" : 50, "cars" : [], "first" : "Tinuke", "last" : "Dada" }

Notice that every element in cars is a named map with one key car. In most cases, such update is unwanted/unnecessary. To instead makecars an array of strings, we can use:

STORE data INTO 'mongodb://localhost:27017/test.persons_info'
           USING com.mongodb.hadoop.pig.MongoUpdateStorage(
                 '{first:"\$f", last:"\$l"}',
                 '{\$set:{age:"\$age"}, \$pushAll:{cars:"\$cars"}}',
                 'f:chararray, l:chararray, age:int, cars:{t:(car:chararray)}'
                 'car'
           );

specifying what field to ignore in the schema while inserting pig objects. The resulting collection looks like this:

{ "_id" : ObjectId("..."), "gender":"male", "age" : 19, "cars" : ["a", "b", "c"], "first" : "Daniel", "last" : "Alabi" }
{ "_id" : ObjectId("..."), "gender":"male", "age" : 21, "cars" : ["d", "e"], "first" : "Tolu", "last" : "Alabi" }
{ "_id" : ObjectId("..."), "gender":"female", "age" : 50, "cars" : [], "first" : "Tinuke", "last" : "Dada" }