Pyspark Various Functions

Pyspark:

    PySpark is the python binding for the Spark Platform and API and not much different from the Java/Scala versions. Python is dynamically typed, so RDDs can hold objects of multiple types. PySpark does not yet support a few API calls, such as lookup and non-text input files, though these will be added in future releases.

Python:

    Python is an interpreted, object-oriented, high-level programming language with dynamic semantics. Its high-level built in data structures, combined with dynamic typing and dynamic binding, make it very attractive for Rapid Application Development, as well as for use as a scripting or glue language to connect existing components together.

Spark:

    Spark is an open source big data processing framework built around speed, ease of use, and sophisticated analytics. Spark can be 100x faster than Hadoop for large scale data processing by exploiting in memory computing and other optimizations. It has easy-to-use APIs for operating on large datasets. It comes packaged with higher-level libraries, including support for SQL queries, streaming data, machine learning and graph processing.

The functions which are occured in Pyspark(python and spark):

Here we are going to create table in mysql and import in HDFS using Sqoop.

Step 1: Create a database and table in mysql

mysql>create database geo;


mysql> show databases;

Databases
default
geo
mysql
Patient

mysql> use geo;

mysql> create table employeedetails(employee_id int,employee_name varchar(50),dateofjoining date,salary bigint);

mysql> insert into employeedetails values(1,’Vasanth’,’2016-08-15′,15000);


mysql> insert into employeedetails values(2,’Anitha’,’2016-08-11′,20000);


mysql> insert into employeedetails values(3,’Sujin’,’2016-08-18′,10000);

mysql> select * from employeedetails;

employee_id employee_name dateofjoining salary
1 Vasanth 2016-08-15 15000
2 Anitha 2016-08-11 20000
3 Sujin 2016-08-18 10000

mysql> create table department(department_id int,employee_id int,designation varchar(50));

mysql> insert into department values(101,1,’Hr’);


mysql> insert into department values(102,2,’Manager’);


mysql> insert into department values(103,3,’Developer’);

mysql> select * from department;

deaprtment_id employee_id designation
101 1 Hr
102 2 Manager
103 3 Developer

Step 2: Open the Sqoop terminal and import the Mysql tables into HDFS.

geouser@geouser:~/sqoop-1.4.5.bin__hadoop-2.0.4-alpha/bin$ ./sqoop import –connect jdbc:mysql://localhost/geo –username root –password root –table employeedetails –target-dir employee/Employee_Information -m 1;


geouser@geouser:~/sqoop-1.4.5.bin__hadoop-2.0.4-alpha/bin$ ./sqoop import –connect jdbc:mysql://localhost/geo –username root –password root –table department –target-dir employee/Department_Details -m 1;

Step 3: Read the data that are stored in HDFS.

Output:

———-

        hadoop fs -cat /user/geouser/employee/Employee_Information/part-m-00000

Employee_Information:

——————————–

1,Vasanth,2016-08-15,15000

2,Anitha,2016-08-11,20000

3,Sujin,2016-08-18,10000

        hadoop fs -cat /user/geouser/employee/Department_Details/part-m-00000

Department_Details:

—————————–

101,1,Hr

102,2,Manager

103,3,Developer

Step 4: The various functionalities are appeared in Pyspark,which are listed below.

Load the two table as RDD(Resilient Distributed Dataset) using spark and python(Pyspark).

Here we are loading the two tables into RDD using sc.textFile.While sc(SparkContext) means it convert the tables into text file.

>>> employeedetails = sc.textFile(“hdfs://localhost:9000/user/geouser/employee/Employee_Information”)

>>> department = sc.textFile(“hdfs://localhost:9000/user/geouser/employee/Department_Details”)

Convert RDD into key value.

If the above text file should be convert into Key Value.

>>>employeeKeyValue = employeedetails.map(lambda line: (int(line.split(“,”)[0]), line))

>>> departmentkeyValue = department.map(lambda line: (int(line.split(“,”)[1]), line))

Join both the RDD using employee_id.

Now we are going to join the two text file and print the result.

>>> joinData = departmentkeyValue.join(employeeKeyValue)

Print the joined data:

>>> for line in joinData.collect():

… print(line)

Output:

———–

(1, (u’101,1,Hr’, u’1,Vasanth,2016-08-15,15000′))

(2, (u’102,2,Manager’, u’2,Anitha,2016-08-11,20000′))

(3, (u’103,3,Developer’, u’3,Sujin,2016-08-18,10000′))

Now fetch selected values of Employee_id,Designation and salary.

Here we are selecting the particular columns in that text file using pyspark.

>>> empdata = joinData.map(lambda row: (row[0],str(row[1] [0].split(“,”)[2]),row[1][1].split(“,”)[3]))

Print the result :

>>> for line in empdata.collect():

… print(line)

Output:

—— —-

(1, ‘Hr’, u’15000′)

(2, ‘Manager’, u’20000′)

(3, ‘Developer’, u’10000′)

Select distinct name for each date.

Selecting the unique name for a single date.

>>> distinctname = joinData.map(lambda row: row[1][1].split(“,”) [2]+”,”+str(row[1][0].split(“,”)[2])).distinct()

Print the result:

>>> for line in distinctname.collect():

… print(line)

Output:

—— —–

2016-08-11,Manager

2016-08-18,Developer

2016-08-15,Hr

Similar to wordcount,generate (date,1) record for each row.

Here we are spliting the data and stored in a list called newwordcount.

>>> newwordcount = distinctname.map(lambda line: (line.split(“,”) [0],1))

Do the count for each key(date),to get total name per date.

Here we count the number of names who are all in a particular date.

>>> totalnamePerDate = newwordcount.reduceByKey(lambda a,b:a+b)

Print the result :

>>> for line in totalnamePerDate.collect():

… print(line)

Output:

———-

(u’2016-08-18′, 1)

(u’2016-08-11′, 1)

(u’2016-08-15′, 1)

Sort the results by date

>>> sortedData = totalnamePerDate.sortByKey().collect()

Print the result:

>>> for line in sortedData:

… print(line)

Output:

———–

(u’2016-08-11′, 1)

(u’2016-08-15′, 1)

(u’2016-08-18′, 1)