RDD Joins in Core Spark

Apache Spark

Apache Spark is an open source parallel processing framework for running large-scale data analytics applications across clustered computers.
Apache Spark can process data from a variety of data repositories, including the Hadoop Distributed File System (HDFS), NoSQL databases and relational data stores such as Apache Hive. Spark supports in-memory processing to boost the performance of big data analytics applications, but it can also do conventional disk-based processing when data sets are too large to fit into the available system memory.

RDD

Resilient Distributed Datasets. Resilient Distributed Datasets (RDD) is a fundamental data structure of Spark. It is an immutable distributed collection of objects. Each dataset in RDD is divided into logical partitions, which may be computed on different nodes of the cluster.

Joins in Core Spark

Here we will see various RDD joins. In general, a JOIN in Apache spark is expensive as it requires keys from different RDDs to be located on the same partition so that they can be combined locally. If the RDDs do not have a known partitioner, then shuffle operations occur to bring the keys into the same partitioner. Once the tables are joined, we can perform various Transformations as well as Actions on the joined RDDs.
In order to join the data, Spark needs it to be present on the same partition. The default process of join in apache Spark is called a shuffled Hash join. The shuffled Hash join ensures that data on each partition has the same keys by partitioning the second dataset with the same default partitioner as the first.
Now, we will perform a JOIN in Apache spark RDDs.

Types of Joins

Full Join
Left Outer Join
Right Outer Join
Cartesion

Joins in Spark RDD

We have two different DataSets, i.e., Customer_data and Department_data.
Description of customer_data
idInt,
first_name:String,
last_name:String,
company_name:String,
phone:String.
Description of Department_data
id:Int,
first_name:String,
address:String,
city:String,
county:String,
state:String,
zip:Int.
// defining a case class “Customer” with fields according to Customer_data file. We have mentioned the data type as well.

scala> case class Customer(id:Int, first_name:String, last_name:String, company_name:String, phone:String)
defined class Customer

// defining a case class “Department” with fields according to Department_data file.

scala> case class Department_data(id:Int, first_name:String, address:String, city:String, county:String, state:String, zip:Int)
defined class Department

// loading the Customer_data and splitting it based on the delimiter ‘\t’

scala> val input1=sc.textFile(“/home/geouser/Documents/Dataset/Customer_data”)
.map(_.split(“\t”))
input1: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[55] at map at <console>:27

// loading the Department_data and splitting it based on the delimiter ‘\t’

scala> val input2 = sc.textFile(“/home/geouser/Documents/Dataset/Department_data”).map(_.split(“\t”))
input2: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[58] at map at <console>:27

/* creating a tuple containing 2 fields.
Field 1 is an integer and this will be treated as the KEY(2nd column from Customer_data)
Field 2 is a ‘Customer’ class which will take 3 parameters. */

scala> val cust_record = input1.map(x => (x(0).toInt, sales1(x(0).toInt, x(1).toString, x(2).toString, x(3).toString, x(4).toString)))
sal1_record: org.apache.spark.rdd.RDD[(Int, Customer_data)] = MapPartitionsRDD[59] at map at <console>:31

scala> cust_record.collect
res18: Array[(Int, sales1)] = Array((1,sales1(1,James,Butt,Benton, John B Jr,504-621-8927)), (2,sales1(2,Josephine,Darakjy,Chanay, Jeffrey A Esq,810-292-9388)), (3,sales1(3,Art,Venere,Chemel, James L Cpa,856-636-8749)), (4,sales1(4,Lenna,Paprocki,Feltz Printing Service,907-385-4412)), (5,sales1(5,Donette,Foller,Printing Dimensions,513-570-1893)), (6,sales1(6,Simona,Morasca,Chapman, Ross E Esq,419-503-2484)), (7,sales1(7,Mitsue,Tollner,Morlong Associates,773-573-6914)), (8,sales1(8,Leota,Dilliard,Commercial Press,408-752-3500)), (9,sales1(9,Sage,Wieser,Truhlar And Truhlar Attys,605-414-2147)), (10,sales1(10,Kris,Marrier,King, Christopher A Esq,410-655-8723)), (11,sales1(11,Minna,Amigon,Dorl, James J Esq,215-874-1229)), (12,sales1(12,Abel,Maclead,Rangoni Of Florence,631-335-3414)), (13,sa…

/* creating a tuple containing 2 fields.
Field 1 is an integer and this will be treated as the KEY ( 1st column from Department_data)
Field 2 is a ‘Department’ class which will take 2 parameters. */

scala> val Dept_record = input2.map(x => (x(0).toInt, sales2(x(0).toInt, x(1).toString, x(2).toString, x(3).toString, x(4).toString, x(5).toString, x(6).toInt)))
sal2_record: org.apache.spark.rdd.RDD[(Int, sales2)] = MapPartitionsRDD[60] at map at <console>:31

scala> Dept_record.collect
res19: Array[(Int, sales2)] = Array((1,sales2(1,James,6649 N Blue Gum St,New Orleans,Orleans,LA,70116)), (2,sales2(2,Josephine,4 B Blue Ridge Blvd,Brighton,Livingston,MI,48116)), (3,sales2(3,Art,8 W Cerritos Ave #54,Bridgeport,Gloucester,NJ,8014)), (4,sales2(4,Lenna,639 Main St,Anchorage,Anchorage,AK,99501)), (5,sales2(5,Donette,34 Center St,Hamilton,Butler,OH,45011)), (6,sales2(6,Simona,3 Mcauley Dr,Ashland,Ashland,OH,44805)), (7,sales2(7,Mitsue,7 Eads St,Chicago,Cook,IL,60632)), (8,sales2(8,Leota,7 W Jackson Blvd,San Jose,Santa Clara,CA,95111)), (9,sales2(9,Sage,5 Boston Ave #88,Sioux Falls,Minnehaha,SD,57105)), (10,sales2(10,Kris,228 Runamuck Pl #2808,Baltimore,Baltimore City,MD,21224)), (11,sales2(11,Minna,2371 Jerrold Ave,Kulpsville,Montgomery,PA,19443)), (12,sales2(12,Abel,37275 S…

// performing the join operation.

Full Join

scala> val joined_rdd = sal1_record.join(sal2_record)
joined_rdd: org.apache.spark.rdd.RDD[(Int, (sales1, sales2))] = MapPartitionsRDD[63] at join at <console>:39

scala> joined_rdd.collect
res20: Array[(Int, (sales1, sales2))] = Array((4,(sales1(4,Lenna,Paprocki,Feltz Printing Service,907-385-4412),sales2(4,Lenna,639 Main St,Anchorage,Anchorage,AK,99501))), (16,(sales1(16,Mattie,Poquette,Century Communications,602-277-4385),sales2(16,Mattie,73 State Road 434 E,Phoenix,Maricopa,AZ,85013))), (22,(sales1(22,Veronika,Inouye,C 4 Network Inc,408-540-1785),sales2(22,Veronika,6 Greenleaf Ave,San Jose,Santa Clara,CA,95111))), (28,(sales1(28,Ezekiel,Chui,Sider, Donald C Esq,410-669-1642),sales2(28,Ezekiel,2 Cedar Ave #84,Easton,Talbot,MD,21601))), (30,(sales1(30,Bernardo,Figeroa,Clark, Richard Cpa,936-336-3951),sales2(30,Bernardo,386 9th Ave N,Conroe,Montgomery,TX,77301))), (14,(sales1(14,Graciela,Ruta,Buckley Miller & Wright,440-780-8425),sales2(14,Graciela,98 Connecticut Ave Nw,C…

Left Outer Join

scala> val leftjoin_rdd = sal1_record.leftOuterJoin(sal2_record)
leftjoin_rdd: org.apache.spark.rdd.RDD[(Int, (sales1, Option[sales2]))] = MapPartitionsRDD[66] at leftOuterJoin at <console>:39

scala> leftjoin_rdd.collect
res21: Array[(Int, (sales1, Option[sales2]))] = Array((4,(sales1(4,Lenna,Paprocki,Feltz Printing Service,907-385-4412),Some(sales2(4,Lenna,639 Main St,Anchorage,Anchorage,AK,99501)))), (16,(sales1(16,Mattie,Poquette,Century Communications,602-277-4385),Some(sales2(16,Mattie,73 State Road 434 E,Phoenix,Maricopa,AZ,85013)))), (22,(sales1(22,Veronika,Inouye,C 4 Network Inc,408-540-1785),Some(sales2(22,Veronika,6 Greenleaf Ave,San Jose,Santa Clara,CA,95111)))), (28,(sales1(28,Ezekiel,Chui,Sider, Donald C Esq,410-669-1642),Some(sales2(28,Ezekiel,2 Cedar Ave #84,Easton,Talbot,MD,21601)))), (30,(sales1(30,Bernardo,Figeroa,Clark, Richard Cpa,936-336-3951),Some(sales2(30,Bernardo,386 9th Ave N,Conroe,Montgomery,TX,77301)))), (14,(sales1(14,Graciela,Ruta,Buckley Miller & Wright,440-780-8425),Some…

Right Outer Join

scala> val rightjoin_rdd = sal1_record.rightOuterJoin(sal2_record)
rightjoin_rdd: org.apache.spark.rdd.RDD[(Int, (Option[sales1], sales2))] = MapPartitionsRDD[69] at rightOuterJoin at <console>:39

scala> rightjoin_rdd.collect
res22: Array[(Int, (Option[sales1], sales2))] = Array((4,(Some(sales1(4,Lenna,Paprocki,Feltz Printing Service,907-385-4412)),sales2(4,Lenna,639 Main St,Anchorage,Anchorage,AK,99501))), (16,(Some(sales1(16,Mattie,Poquette,Century Communications,602-277-4385)),sales2(16,Mattie,73 State Road 434 E,Phoenix,Maricopa,AZ,85013))), (22,(Some(sales1(22,Veronika,Inouye,C 4 Network Inc,408-540-1785)),sales2(22,Veronika,6 Greenleaf Ave,San Jose,Santa Clara,CA,95111))), (28,(Some(sales1(28,Ezekiel,Chui,Sider, Donald C Esq,410-669-1642)),sales2(28,Ezekiel,2 Cedar Ave #84,Easton,Talbot,MD,21601))), (30,(Some(sales1(30,Bernardo,Figeroa,Clark, Richard Cpa,936-336-3951)),sales2(30,Bernardo,386 9th Ave N,Conroe,Montgomery,TX,77301))), (14,(Some(sales1(14,Graciela,Ruta,Buckley Miller & Wright,440-780-8425)…

Cartesion

scala> val cartesion_rdd = sal1_record.cartesian(sal2_record)
cartesion_rdd: org.apache.spark.rdd.RDD[((Int, sales1), (Int, sales2))] = CartesianRDD[70] at cartesian at <console>:39

scala> cartesion_rdd.collect
res23: Array[((Int, sales1), (Int, sales2))] = Array(((1,sales1(1,James,Butt,Benton, John B Jr,504-621-8927)),(1,sales2(1,James,6649 N Blue Gum St,New Orleans,Orleans,LA,70116))), ((1,sales1(1,James,Butt,Benton, John B Jr,504-621-8927)),(2,sales2(2,Josephine,4 B Blue Ridge Blvd,Brighton,Livingston,MI,48116))), ((1,sales1(1,James,Butt,Benton, John B Jr,504-621-8927)),(3,sales2(3,Art,8 W Cerritos Ave #54,Bridgeport,Gloucester,NJ,8014))), ((1,sales1(1,James,Butt,Benton, John B Jr,504-621-8927)),(4,sales2(4,Lenna,639 Main St,Anchorage,Anchorage,AK,99501))), ((1,sales1(1,James,Butt,Benton, John B Jr,504-621-8927)),(5,sales2(5,Donette,34 Center St,Hamilton,Butler,OH,45011))), ((1,sales1(1,James,Butt,Benton, John B Jr,504-621-8927)),(6,sales2(6,Simona,3 Mcauley Dr,Ashland,Ashland,OH,44805))), …