RDDs – Resilient Distributed Datasets:

Iit is the fundamental unit of data in spark, which is didtributed collection of elements across cluster nodes and can perform parallel operations.

RDDs are immutable but can generate new RDD by transforming existing RDD.

Two types of Operations:

Transformation:

Transformations build new RDD(Resilient Distributed Dataset) from previous RDD with the help of operations like filter, map, flatmap etc. Transformations are lazy operation on RDD, i.e. they don’t execute immediately, instead after calling actions transformations are executed. Transformations are functions that take input and produce one or many “new” output RDDs.

The result Rdd will be always different from their parent Rdd and they can be smaller or bigger or of the same size. To improve performance of computations transformation uses pipelined which is an optimization technique.

Creation of RDD:

scala> val textFile = sc.textFile(“/home/geouser/Documents/Bigdata/Spark/sales.csv”)
textFile: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[9] at textFile at <console>:27

scala> textFile.collect
res1: Array[String] = Array(carolina,Product1,1200,Basildon,England, Betina,Product1,1200,Parkville ,MO, Federica e Andrea,Product1,1200,Astoria ,OR, Gouya,Product1,1200,Echuca,Victoria, Gerd W ,Product2,3600,Cahaba Heights ,AL, carolina,Product1,1200,Mickleton ,NJ, Fleur,Product1,1200,Peoria ,IL, adam,Product1,1200,Martin ,TN, Renee Elisabeth,Product1,1200,Tel Aviv,Tel Aviv, Aidan,Product1,1200,Chatou,Ile-de-France)

Some of the functionalities are listed below:

Map

It passes each element through user-defined function and it returns a new dataset on passing each element to the function. It is applying function on each row / item of RDD. Size of input an d output will remain same.

scala> val counts = textFile.map(line => line.split(“,”))
counts: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[10] at map at <console>:29

scala> counts.collect
res3: Array[Array[String]] = Array(Array(carolina, Product1, 1200, Basildon, England), Array(Betina, Product1, 1200, “Parkville “, MO), Array(Federica e Andrea, Product1, 1200, “Astoria “, OR), Array(Gouya, Product1, 1200, Echuca, Victoria), Array(“Gerd W “, Product2, 3600, “Cahaba Heights “, AL), Array(carolina, Product1, 1200, “Mickleton “, NJ), Array(Fleur, Product1, 1200, “Peoria “, IL), Array(adam, Product1, 1200, “Martin “, TN), Array(Renee Elisabeth, Product1, 1200, Tel Aviv, Tel Aviv), Array(Aidan, Product1, 1200, Chatou, Ile-de-France))

FlatMap
It is same as map function but the difference is that flatmap returns a list of elements (like 0 or more) as an iterator and output of flatmap is flattened. Functions in flatmap returns list of elements, array or sequence.

scala> val countnew = textFile.flatMap(line => line.split(“,”))
countnew: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[11] at flatMap at <console>:29

scala> countnew.collect
res4: Array[String] = Array(carolina, Product1, 1200, Basildon, England, Betina, Product1, 1200, “Parkville “, MO, Federica e Andrea, Product1, 1200, “Astoria “, OR, Gouya, Product1, 1200, Echuca, Victoria, “Gerd W “, Product2, 3600, “Cahaba Heights “, AL, carolina, Product1, 1200, “Mickleton “, NJ, Fleur, Product1, 1200, “Peoria “, IL, adam, Product1, 1200, “Martin “, TN, Renee Elisabeth, Product1, 1200, Tel Aviv, Tel Aviv, Aidan, Product1, 1200, Chatou, Ile-de-France)

Filter

It returns a new dataset which is formed by selecting those elements of source on which function returns true. It returns those elements only that satisfy a predicate, predicate is a function that accepts parameter and returns Boolean value either true or false. It keeps only those elements which passes / satisfies the condition and filter out those which don’t , so the new RDD will be set of those elements for which function returns true.

scala> val countfilter = textFile.filter(line => line.contains(“Product2”))
countfilter: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[12] at filter at <console>:29
scala> countfilter.collect
[Stage 4:> res5: Array[String] = Array(Gerd W ,Product2,3600,Cahaba Heights ,AL)

scala> val countfilter = textFile.filter(line => line.contains(“Product1”))
countfilter: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[14] at filter at <console>:29
scala> countfilter.collect
res6: Array[String] = Array(carolina,Product1,1200,Basildon,England, Betina,Product1,1200,Parkville ,MO, Federica e Andrea,Product1,1200,Astoria ,OR, Gouya,Product1,1200,Echuca,Victoria, carolina,Product1,1200,Mickleton ,NJ, Fleur,Product1,1200,Peoria ,IL, adam,Product1,1200,Martin ,TN, Renee Elisabeth,Product1,1200,Tel Aviv,Tel Aviv, Aidan,Product1,1200,Chatou,Ile-de-France)

MapPartitions

It runs one at a time on each partition or block of the Rdd, so function must be of type iterator<T>. It improves performance by reducing creation of object in map function.

scala> countfilter.mapPartitions(x => List(x.next).iterator).collect
res8: Array[String] = Array(carolina,Product1,1200,Basildon,England, Fleur,Product1,1200,Peoria ,IL)

MappartionwithIndex

It is similar to MapPartition but with one difference that it takes two parameters, the first parameter is the index and second is an iterator through all items within this partition (Int, Iterator<t>).

scala> countfilter.mapPartitionsWithIndex( (index: Int, it: Iterator[String]) => it.toList.map(x => index + ” “+x).iterator).collect
res26: Array[String] = Array(0 carolina,Product1,1200,Basildon,England, 0 Betina,Product1,1200,Parkville ,MO, 0 Federica e Andrea,Product1,1200,Astoria ,OR, 0 Gouya,Product1,1200,Echuca,Victoria, 0 carolina,Product1,1200,Mickleton ,NJ, 1 Fleur,Product1,1200,Peoria ,IL, 1 adam,Product1,1200,Martin ,TN, 1 Renee Elisabeth,Product1,1200,Tel Aviv,Tel Aviv, 1 Aidan,Product1,1200,Chatou,Ile-de-France)

Union

It performs standard set operation. It is the same as operator ‘++”.It returns a new RDD by making union with other RDD.

scala> val textFile = sc.textFile(“/home/geouser/Documents/Bigdata/Spark/sales.csv”)
textFile: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[43] at textFile at <console>:27

scala> val textFile1 = sc.textFile(“/home/geouser/Documents/Bigdata/Spark/salesnew.csv”)
textFile1: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[45] at textFile at <console>:27

scala> textFile.union(textFile1).collect
res47: Array[String] = Array(carolina,Product1,1200,Basildon,England, Betina,Product1,1200,Parkville ,MO, Federica e Andrea,Product1,1200,Astoria ,OR, Gouya,Product1,1200,Echuca,Victoria, Gerd W ,Product2,3600,Cahaba Heights ,AL, carolina,Product1,1200,Mickleton ,NJ, Fleur,Product1,1200,Peoria ,IL, adam,Product1,1200,Martin ,TN, Renee Elisabeth,Product1,1200,Tel Aviv,Tel Aviv, Aidan,Product1,1200,Chatou,Ile-de-France, carolina,504-845-1427,jbutt@gmail.com, Betina,810-374-9840,josephine_darakjy@darakjy.org, Federica e Andrea,856-264-4130,art@venere.org, Gouya,907-921-2010,lpaprocki@hotmail.com, Gerd W ,513-549-4561,donette.foller@cox.net, carolina,419-800-6759,simona@morasca.com…

Distinct

Returns a new dataset containing unique elements. It returns distinct values from one array.

scala> textFile.union(textFile1).distinct.collect
res48: Array[String] = Array(carolina,Product1,1200,Mickleton ,NJ, Federica e Andrea,Product1,1200,Astoria ,OR, adam,408-813-1105,leota@hotmail.com, Anandh,679-087-5677,anadhgeo@gmail.com, Gerd W ,513-549-4561,donette.foller@cox.net, Betina,Product1,1200,Parkville ,MO, adam,Product1,1200,Martin ,TN, Renee Elisabeth,Product1,1200,Tel Aviv,Tel Aviv, Federica e Andrea,856-264-4130,art@venere.org, Gerd W ,Product2,3600,Cahaba Heights ,AL, Aidan,Product1,1200,Chatou,Ile-de-France, Gouya,Product1,1200,Echuca,Victoria, carolina,419-800-6759,simona@morasca.com, Fleur,Product1,1200,Peoria ,IL, Gouya,907-921-2010,lpaprocki@hotmail.com, Renee Elisabeth,605-794-4895,sage_wieser@cox.net, ca…

Intersection

It returns value or elements from two RDD which are identical but with de-duplication.

scala> textFile.intersection(textFile1).collect
res45: Array[String] = Array()

The Keys:

The group of transformation functions (groupByKey, reduceByKey, aggregateByKey, sortByKey, join) all act on key,value RDDs. So, this section will be known as “The Keys”.

GroupBy

It works on key value pair, returns a new dataset of grouped items. It will return the new RDD which is made up with key (which is a group) and list of items of that group. Order of elements within group may not be the same when you apply same operation on same RDD over and over. It’s a wide operation as it shuffles data from multiple partitions / divisions and create another RDD.

scala> val textFile = sc.textFile(“/home/geouser/Documents/Bigdata/Spark/sales.csv”)
textFile: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[13] at textFile at <console>:27

scala> val b = textFile.keyBy(_.length)
b: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[14] at keyBy at <console>:29
scala> b.groupByKey.collect
res5: Array[(Int, Iterable[String])] = Array((52,CompactBuffer(Betina,Product1,1200,Parkville ,MO, Betina,Product1,1200,Parkville ,MO)), (54,CompactBuffer(carolina,Product1,1200,Mickleton ,NJ)), (50,CompactBuffer(adam,Product1,1200,Martin ,TN)), (40,CompactBuffer(Aidan,Product1,1200,Chatou,Ile-de-France)), (39,CompactBuffer(carolina,Product1,1200,Basildon,England, carolina,Product1,1200,Basildon,England)), (47,CompactBuffer(Renee Elisabeth,Product1,1200,Tel Aviv,Tel Aviv)), (53,CompactBuffer(Gerd W ,Product2,3600,Cahaba Heights ,AL)), (35,CompactBuffer(Gouya,Product1,1200,Echuca,Victoria)), (51,CompactBuffer(Fleur,Product1,1200,Peoria ,IL)), (63,CompactBuffer(Federica e Andrea,Pr…

ReduceByKey

It uses associative reduce function, where it merges value of each key. It can be used with Rdd only in key value pair. It’s wide operation which shuffles data from multiple partitions/divisions and creates another RDD. It merges data locally using associative function for optimized data shuffling. Result of the combination (e.g. a sum) is of the same type that the values, and that the operation when combined from different partitions is also the same as the operation when combining values inside a partition.

scala> val b = textFile.map(x => (x.length,x))
b: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[18] at map at <console>:29

scala> b.reduceByKey(_+_).collect
res7: Array[(Int, String)] = Array((52,Betina,Product1,1200,Parkville ,MOBetina,Product1,1200,Parkville ,MO), (54,carolina,Product1,1200,Mickleton ,NJ), (50,adam,Product1,1200,Martin ,TN), (40,Aidan,Product1,1200,Chatou,Ile-de-France), (39,carolina,Product1,1200,Basildon,Englandcarolina,Product1,1200,Basildon,England), (47,Renee Elisabeth,Product1,1200,Tel Aviv,Tel Aviv), (53,Gerd W ,Product2,3600,Cahaba Heights ,AL), (35,Gouya,Product1,1200,Echuca,Victoria), (51,Fleur,Product1,1200,Peoria ,IL), (63,Federica e Andrea,Product1,1200,Astoria ,ORFederica e Andrea,Product1,1200,Astoria ,OR))

Action:

It returns final result to drives program or write it to the external data-store.

Some of the functionalities are listed below:

Count ()

It returns number of elements or items in RDD. So it basically counts the number of items present in dataset and returns a number after count.

scala> val textFile = sc.textFile(“/home/geouser/Documents/Bigdata/Spark/sales.csv”).map(_.split(“,”))
textFile: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[5] at map at <console>:27
scala> textFile.count()
res0: Long = 10

Collect()

It returns all the data / elements present in an RDD in the form of array. It prints values of array back to console and used in debugging programs.

scala> val textFile = sc.textFile(“/home/geouser/Documents/Bigdata/Spark/sales.csv”).map(_.split(“,”))
scala> textFile.collect
res1: Array[Array[String]] = Array(Array(1, carolina, Product1, 1200, Basildon, England), Array(2, Betina, Product1, 1200, “Parkville “, MO), Array(3, Federica e Andrea, Product1, 1200, “Astoria “, OR), Array(4, Gouya, Product1, 1200, Echuca, Victoria), Array(5, “Gerd W “, Product2, 3600, “Cahaba Heights “, AL), Array(6, carolina, Product1, 1200, “Mickleton “, NJ), Array(7, Fleur, Product1, 1200, “Peoria “, IL), Array(8, adam, Product1, 1200, “Martin “, TN), Array(9, Renee Elisabeth, Product1, 1200, Tel Aviv, Tel Aviv), Array(10, Aidan, Product1, 1200, Chatou, Ile-de-France))

Take(n)

It fetches or extracts first n requested number of elements of RDD and returns them as an array.

scala> val textFile = sc.textFile(“/home/geouser/Documents/Bigdata/Spark/sales.csv”).map(_.split(“,”))
scala> textFile.take(3)
res5: Array[Array[String]] = Array(Array(1, carolina, Product1, 1200, Basildon, England), Array(2, Betina, Product1, 1200, “Parkville “, MO), Array(3, Federica e Andrea, Product1, 1200, “Astoria “, OR))

First()

Retrieves the very first data or element of RDD.It is similar to take (1).

scala> val textFile = sc.textFile(“/home/geouser/Documents/Bigdata/Spark/sales.csv”).map(_.split(“,”))
textFile: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[11] at map at <console>:27
scala> textFile.first
res4: Array[String] = Array(1, carolina, Product1, 1200, Basildon, England)

TakeSample()

It is an action that is used to return a fixed-size random sample subset of an RDD includes Boolean option of with or without replacement and random generator seed. It returns an array. It internally randomizes order of elements returned.

scala> val textFile = sc.textFile(“/home/geouser/Documents/Bigdata/Spark/sales.csv”).map(_.split(“,”))
scala> textFile.takeSample(true,3)
res6: Array[Array[String]] = Array(Array(6, carolina, Product1, 1200, “Mickleton “, NJ), Array(8, adam, Product1, 1200, “Martin “, TN), Array(5, “Gerd W “, Product2, 3600, “Cahaba Heights “, AL))
scala> textFile.takeSample(false,3)
res7: Array[Array[String]] = Array(Array(6, carolina, Product1, 1200, “Mickleton “, NJ), Array(8, adam, Product1, 1200, “Martin “, TN), Array(7, Fleur, Product1, 1200, “Peoria “, IL))

CountByKey()

It counts the value of RDD consisting of two components tuple for each distinct key. It actually counts the number of elements for each key and return the result to the master as lists of (key, count) pairs.

scala> val textFile = sc.textFile(“/home/geouser/Documents/Bigdata/Spark/sales.csv”).map(_.split(“,”))
scala> textFile.countByKey
res9: scala.collection.Map[String,Long] = Map(Gouya -> 1, Betina -> 1, carolina -> 2, adam -> 1, Federica e Andrea -> 1, “Gerd W ” -> 1, Aidan -> 1, Fleur -> 1, Renee Elisabeth -> 1)

SaveAsTextfile()

It writes the content of RDD to text file or saves the RDD as a text file in file path directory using string representation.

scala> val textFile = sc.textFile(“/home/geouser/Documents/Bigdata/Spark/sales.csv”).map(_.split(“,”)).map(n => (n(1), n(4)))
textFile: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[23] at map at <console>:27
scala> textFile.saveAsTextFile(“result.csv”)

One Thought on “RDDs Transformations and Actions in Apache Spark”

Leave a Reply

Your email address will not be published. Required fields are marked *