Introduction to Apache Spark's Core API (Part I) (original) (raw)
Hello coders, I hope you are all doing well.
Over the past few months, I've been learning the Spark framework along with other big data topics. Spark is essentially a cluster programming framework. I know that one word can't define the entire framework.
In this post, I am going to discuss the core APIs of Apache Spark with respect to Python as a programming language. I am assuming that you have a basic knowledge of the Spark framework (tuples, RDD, pair RDD, and data frames) and its initialization steps.
When we launch a Spark shell, either in Scala or Python (i.e. Spark Shell or PySpark), it will initialize assparkContextsc
and asSQLContextsqlContext
.
- Core APIs
- sc.textFile(path)
* This method reads a text file from HDFS and returns it as an RDD of strings. - ```
ordersRDD = sc.textFile('orders')
* rdd.first() * This method returns the first element in the RDD. * ``` ordersRDD.first() # u'1,2013-07-25 00:00:00.0,11599,CLOSED' - first element of the ordersRDD
- rdd.collect()
* This method returns a list that contains all of the elements in the RDD. - ```
ordersRDD.collect()
[u'68882,2014-07-22 00:00:00.0,10000,ON_HOLD', u'68883,2014-07-23 00:00:00.0,5533,COMPLETE']
* rdd.filter(f) * This method returns a new RDD containing only the elements that satisfy a predicate, i.e. it will create a new RDD containing those elements which satisfy the condition given in the argument. * ``` filterdRDD = ordersRDD.filter(lambda line: line.split(',')[3] in ['COMPLETE']) filterdRDD.first() # u'3,2013-07-25 00:00:00.0,12111,COMPLETE'
- rdd.map(f)
* This method returns a new RDD by applying a function to each element of this RDD. i.e. it will transform the RDD to a new one by applying a function. - ```
mapedRDD = ordersRDD.map(lambda line: tuple(line.split(',')))
mapedRDD.first()(u'1', u'2013-07-25 00:00:00.0', u'11599', u'CLOSED')
* rdd.flatMap(f) * This method returns a new RDD by first applying a function to each element of this RDD (same as map method) and then flattening the results. * ``` flatMapedRDD = ordersRDD.flatMap(lambda line: line.split(',')) flatMapedRDD.take(4) # [u'1', u'2013-07-25 00:00:00.0', u'11599', u'CLOSED']
- sc.parallelize(c)
* This method distributes a local Python collection to form an RDD. - ```
lRDD = sc.parallelize(range(1,10))
lRDD.first()1
lRDD.take(10)
[1, 2, 3, 4, 5, 6, 7, 8, 9]
* rdd.reduce(f) * This method reduces the elements of this RDD using the specified commutative and associative binary operator. * ``` lRDD.reduce(lambda x,y: x+y) # 45 - this is the sum of 1 to 9
- rdd.count()
* This method returns the number of elements in this RDD. - ```
lRDD.count()
9 - as there are 9 elements in the lRDD
* rdd.sortBy(keyFunc) * This method sorts this RDD by a given`keyfunc` * ``` lRDD.collect() # [1, 2, 3, 4, 5, 6, 7, 8, 9] lRDD.sortBy(lambda x: -x).collect() # [9, 8, 7, 6, 5, 4, 3, 2, 1] - can sort the rdd in any manner i.e. ASC or DESC
- rdd.top(num)
* This method gets the top N elements from an RDD. It returns the list sorted in descending order. - ```
lRDD.top(3)
[9, 8, 7]
* rdd.take(num) * This method takes the first num elements of the RDD. * ``` lRDD.take(7) # [1, 2, 3, 4, 5, 6, 7]
- rdd.union(otherRDD)
* Return the union of this RDD and another one. - ```
l1 = sc.parallelize(range(1,5))
l1.collect()[1, 2, 3, 4]
l2 = sc.parallelize(range(3,8))
l2.collect()[3, 4, 5, 6, 7]
lUnion = l1.union(l2)
lUnion.collect()[1, 2, 3, 4, 3, 4, 5, 6, 7]
* rdd.distinct() * Return a new RDD containing the distinct elements in this RDD. * ``` lDistinct = lUnion.distinct() lDistinct.collect() # [2, 4, 6, 1, 3, 5, 7]
- rdd.intersection(otherRDD)
* Return the intersection of this RDD and another one, i.e. the output will not contain any duplicate elements, even if the input RDDs did. - ```
lIntersection = l1.intersection(l2)
lIntersection.collect()[4, 3]
* rdd.subtract(otherRDD) * Return each value in RDD that is not contained in another one. * ``` lSubtract = l1.subtract(l2) lSubtract.collect() # [2, 1]
- rdd.saveAsTextFile(path, compressionCodec)
* Save this RDD as a text file. - ```
lRDD.saveAsTextFile('lRDD_only')
this method will save the lRDD under lRDD_only folder under home directory in the HDFS
lUnion.saveAsTextFile('lRDD_union','org.apache.hadoop.io.compress.GzipCodec')
this method will save the lUion compressed with Gzip codec under lRDD_union folder under home directory in the HDFS
lSubtract.saveAsTextFile('lRDD_union','org.apache.hadoop.io.compress.SnappyCodec')
this method will save the lUion compressed with Snappy codec under lRDD_union folder under home directory in the HDFS
* rdd.keyBy(f) * Creates tuples (pair RDD) of the elements in this RDD by applying the function. * ``` ordersPairRDD = ordersRDD.keyBy(lambda line: int(line.split(',')[0])) ordersPairRDD.first() # (1, u'1,2013-07-25 00:00:00.0,11599,CLOSED') # This way we can create the pair RDD.
- sc.textFile(path)
For now, these are all the functions or methods for plain RDD, i.e. without the key. In my next post, I will explain the functions or methods with respect to pair RDD with multiple example snippets.
Thanks for reading and happy coding!
Opinions expressed by DZone contributors are their own.