PYSPARK:

PySpark is the python binding for the Spark Platform and API and not much different from the Java/Scala versions. Python is dynamically typed, so RDDs can hold objects of multiple types. PySpark does not yet support a few API calls, such as lookup and non-text input files, though these will be added in future releases.

SPARK:

Spark is an open source big data processing framework built around speed, ease of use, and sophisticated analytics. Spark can be 100x faster than Hadoop for large scale data processing by exploiting in memory computing and other optimizations. It has easy-to-use APIs for operating on large datasets. It comes packaged with higher-level libraries, including support for SQL queries, streaming data, machine learning and graph processing.

PYTHON:

Python is an interpreted, object-oriented, high-level programming language with dynamic semantics. Its high-level built in data structures, combined with dynamic typing and dynamic binding, make it very attractive for Rapid Application Development, as well as for use as a scripting or glue language to connect existing components together.

 

Create an Spark Application using Python and read a file and filter out the word which is less than 5 characters also ignore all empty lines.

Step-1: Enter into PySpark

( Open a terminal and type a command )

pyspark

pysk-0


Step-2:
Create an Spark Application

( First we import the SparkContext and SparkConf into pyspark )

from pyspark import SparkContext, SparkConf

 

Step-3: Create Configuration object and set App name

conf = SparkConf().setAppName(“Pyspark Pgm”)

sc = SparkContext(conf = conf)

pysk-1


Step-4:
Load data from HDFS

(i). First Create a text file and load the file into HDFS

Here is the Example File:

Save the following into PySpark.txt

PySpark is the python binding for the Spark Platform and API and not much different from the Java/Scala versions. A good starting point is the official page i.e Examples | Apache Spark. Python is dynamically typed, so RDDs can hold objects of multiple types. PySpark does not yet support a few API calls, such as lookup and non-text input files, though these will be added in future releases.

Load the file into HDFS

hdfs dfs -put /home/geouser/Documents/PySpark.txt /user/geouser/

pysk-01

 

(ii). Next load the PySpark.txt file (from HDFS) to pyspark

contentRDD =sc.textFile(“hdfs://localhost:9000/user/geouser/PySpark.txt”)

 pysk-2

 

Step-5: Filter out non-empty lines from the loaded file (PySpark.txt)

nonempty_lines = contentRDD.filter(lambda x: len(x) > 0)

 

Step-6: Split the content based on space

words = nonempty_lines.flatMap(lambda x: x.split(‘ ‘))

 

Step-7: Filter out the words which size is less than 5 in the file (PySpark)

finalRDD = words.filter(lambda x: len(x) > 5)

 

Step-8: View the file after filter

for word in finalRDD.collect():

     print(word)   (Give 4 spaces before the print statement)

 

pysk-3

 

pysk-4



Step-9:
Save the Final Data to HDFS

finalRDD.saveAsTextFile(“hdfs://localhost:9000/user/geouser/PySpark1”)

 

pysk-5

 

Description about Commands:

from pyspark import SparkContext, SparkConf

   –> The first step of any Spark driver application is to create a SparkContext.

The SparkContext allows your Spark driver application to access the cluster through a resource manager. The resource manager can be YARN, or Spark’s cluster manager.

In order to create a SparkContext you should first create a SparkConf. The SparkConf stores configuration parameters that your Spark driver application will pass to SparkContext.


conf = SparkConf().setAppName(“Pyspark Pgm”)

> setAppName() gives your Spark driver application a name so you can identify it in the Spark or Yarn UI.


sc = SparkContext(conf = conf)

-> First time configure we use this, suppose if the conf already exist we use

val sc = new SparkContext(conf)


contentRDD =sc.textFile(“hdfs://localhost:9000/user/geouser/PySpark.txt”)

contentRDD =sc.textFile() -> keyword for load a file

“hdfs://localhost:9000/user/geouser/PySpark.txt” –> Path of the HDFS file, which should

be loaded.


nonempty_lines = contentRDD
.filter(lambda x: len(x) > 0)

> Used for omiting empty lines

 

words = nonempty_lines.flatMap(lambda x: x.split(‘ ‘))

-> Used For Split the Content based on space (‘ ‘)


finalRDD = words.filter(lambda x: len(x) > 5)

words.filter() -> Used For Filter

lambda x: len(x) > 5 –> length of the word less than 5


for word in finalRDD.collect():

-> Used for Collect the Word

print(word)

-> Print the Filtered Words

Leave a Reply

Your email address will not be published. Required fields are marked *