Practical Apache Spark in 10 minutes. Part 3 - Data Frames


Practical Apache Spark in 10 minutes. Part 3 - Data Frames

DataFrame in Spark is a distributed collection of data, organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood. DataFrames can be constructed from a wide array of sources such as structured data files, tables in Hive, external databases, or existing RDDs.

RDD

First, let's take a look at basics of working with RDD.

Create a simple RDD from a list of smiles using the parallelize() method of SparkContext.

smiles = ['happy', 'sad', 'grinning', 'happy', 'confused', 'kissing', 'kissing', 'sad', 'grinning', 'happy', 'happy', 'happy']
data = sc.parallelize(smiles)

Create a function to add information to each smile considering its mood: positive or negative.

def sentiments(smile):
    if smile == 'happy' or smile == 'grinning' or smile == 'kissing':
        sent = 'positive'
    else:
        sent = 'negative'
    return (sent, smile)

Transform the data RDD using a map() transformation with sentiments function, and take a look at first 5 elements.

data_tuples = data.map(sentiments)
print (data_tuples.take(5))
[('positive', 'happy'), ('negative', 'sad'), ('positive', 'grinning'), ('positive', 'happy'), ('negative', 'confused')]

Now let's count number of positive and negative emotions using groupByKey() and map().

data_grouped = data_tuples.groupByKey().map(lambda x: (x[0], len(x[1])))
print (data_grouped.collect())
[('positive', 9), ('negative', 3)]

Join the real-time Data Science discussions

DataFrames

Our next step in this article is working with Data Frames. To demonstrate operations with Data Frames we will take 'California house prices' dataset. We also need to import some additional libraries. To import pandas library we need to previously install it if it not installed on your computer:

To exit from spark type in the terminal:

exit()

And execute the following two commands:

sudo apt-get install python-pip
sudo pip install pandas

Now to launch Python Spark type:

./bin/pyspark

And import the following libraries:

from  pyspark import sql
from pyspark.sql.types import *
from pyspark.sql import Row
import pandas as pd

Now, read the dataset from a file, you can find the file here:

Important! Make sure that this file will be saved to the Spark folder. The folder name will be spark-2.3.0-bin-hadoop2.7 (depending on the Spark version you have downloaded). The file name must be 'Sacramentorealestatetransactions.csv'

rdd = sc.textFile('Sacramentorealestatetransactions.csv')
rdd.take(3)
[u'street,city,zip,state,beds,baths,sq__ft,type,sale_date,price,latitude,longitude',
 u'3526 HIGH ST,SACRAMENTO,95838,CA,2,1,836,Residential,Wed May 21 00:00:00 EDT 2008,59222,38.631913,-121.434879',
 u'51 OMAHA CT,SACRAMENTO,95823,CA,3,1,1167,Residential,Wed May 21 00:00:00 EDT 2008,68212,38.478902,-121.431028']

Split each line by comma using the map() transformation.

rdd = rdd.map(lambda line: line.split(","))
print (rdd.take(2))
[ [u'street', u'city', u'zip', u'state', u'beds', u'baths', u'sq__ft', u'type', u'sale_date', u'price', u'latitude', u'longitude'], [u'3526 HIGH ST', u'SACRAMENTO', u'95838', u'CA', u'2', u'1', u'836', u'Residential', u'Wed May 21 00:00:00 EDT 2008', u'59222', u'38.631913', u'-121.434879'] ]

Drop header from data with the filter() transformation

header = rdd.first()
rdd = rdd.filter(lambda line:line != header)
print (rdd.take(2))
[ [u'3526 HIGH ST', u'SACRAMENTO', u'95838', u'CA', u'2', u'1', u'836', u'Residential', u'Wed May 21 00:00:00 EDT 2008', u'59222', u'38.631913', u'-121.434879'], [u'51 OMAHA CT', u'SACRAMENTO', u'95823', u'CA', u'3', u'1', u'1167', u'Residential', u'Wed May 21 00:00:00 EDT 2008', u'68212', u'38.478902', u'-121.431028'] ]

We have prepared the data and now are ready to work with DataFrames. Let's create a DataFrame from our data and take a look at first five rows.

df = rdd.map(lambda line: Row(street = line[0], city = line[1], zip=line[2], beds=line[4], baths=line[5], sqft=line[6], price=line[9])).toDF()
df.show(5)
+-----+----+----------+-----+----+----------------+-----+
|baths|beds|      city|price|sqft|          street|  zip|
+-----+----+----------+-----+----+----------------+-----+
|    1|   2|SACRAMENTO|59222| 836|    3526 HIGH ST|95838|
|    1|   3|SACRAMENTO|68212|1167|     51 OMAHA CT|95823|
|    1|   2|SACRAMENTO|68880| 796|  2796 BRANCH ST|95815|
|    1|   2|SACRAMENTO|69307| 852|2805 JANETTE WAY|95815|
|    1|   2|SACRAMENTO|81900| 797| 6001 MCMAHON DR|95824|
+-----+----+----------+-----+----+----------------+-----+
only showing top 5 rows

To view the structure of our DataFrame we can use the printSchema() method.

df.printSchema()
root
 |-- baths: string (nullable = true)
 |-- beds: string (nullable = true)
 |-- city: string (nullable = true)
 |-- price: string (nullable = true)
 |-- sqft: string (nullable = true)
 |-- street: string (nullable = true)
 |-- zip: string (nullable = true)

Get first two rows with price less than 5000

df.filter(df.price < 5000).show(2)
+-----+----+-------+-----+----+------------------+-----+
|baths|beds|   city|price|sqft|            street|  zip|
+-----+----+-------+-----+----+------------------+-----+
|    0|   0|LINCOLN| 4897|   0|20 CRYSTALWOOD CIR|95648|
|    0|   0|LINCOLN| 4897|   0|24 CRYSTALWOOD CIR|95648|
+-----+----+-------+-----+----+------------------+-----+
only showing top 2 rows

Alternatively, you can use Pandas-like syntax for the same result

df[df.price < 5000].show(2)
+-----+----+-------+-----+----+------------------+-----+
|baths|beds|   city|price|sqft|            street|  zip|
+-----+----+-------+-----+----+------------------+-----+
|    0|   0|LINCOLN| 4897|   0|20 CRYSTALWOOD CIR|95648|
|    0|   0|LINCOLN| 4897|   0|24 CRYSTALWOOD CIR|95648|
+-----+----+-------+-----+----+------------------+-----+
only showing top 2 rows

Count the number of rows, grouped by city

df.groupBy("city").count().show(2)
+--------------+-----+
|          city|count|
+--------------+-----+
|RANCHO MURIETA|    3|
|  CAMERON PARK|    9|
+--------------+-----+
only showing top 2 rows

We can get statistics about some numerical features using a describe() function.

df.describe(['baths', 'beds','price','sqft']).show()
+-------+------------------+------------------+------------------+------------------+
|summary|             baths|              beds|             price|              sqft|
+-------+------------------+------------------+------------------+------------------+
|  count|               985|               985|               985|               985|
|   mean|1.7766497461928934|2.9116751269035532|234144.26395939087|1314.9167512690356|
| stddev|0.8949168036438349|1.3072681384582236| 138295.5847828183|  852.615113131778|
|    min|                 0|                 0|            100000|                 0|
|    max|                 5|                 8|             99000|               998|
+-------+------------------+------------------+------------------+------------------+

DataFrames allow a lot more operations. For more information, refer to Spark SQL, DataFrames and Datasets Guide. We can convert our Dataframe to Pandas Dataframe using a toPandas() function.

pandas_df = df.toPandas()
pandas_df[:5]
  Baths beds        city  price  sqft             street   zip
0     1    2  SACRAMENTO  59222   836       3526 HIGH ST 95838
1     1    3  SACRAMENTO  68212  1167        51 OMAHA CT 95823
2     1    2  SACRAMENTO  68880   796     2796 BRANCH ST 95815
3     1    2  SACRAMENTO  69307   852   2805 jANETTE WAY 95815
4     1    2  SACRAMENTO  81900   797    6001 MCMAHON DR 95824

Alternatively, we can get Pandas DataFrame to Spark using a createDataFrame() function of sqlContext:

dataF = sqlContext.createDataFrame(pandas_df)
dataF.show(5)
+------+----+-----------+------+-----+-----------------+-----+
| Baths|beds|       city| price| sqft|           street|  zip|
|     1|   2| SACRAMENTO| 59222|  836|     3526 HIGH ST|95838|
|     1|   3| SACRAMENTO| 68212| 1167|      51 OMAHA CT|95823|
|     1|   2| SACRAMENTO| 68880|  796|   2796 BRANCH ST|95815|
|     1|   2| SACRAMENTO| 69307|  852| 2805 jANETTE WAY|95815|
|     1|   2| SACRAMENTO| 81900|  797|  6001 MCMAHON DR|95824|
+------+----+-----------+------+-----+-----------------------+

Conclusion

Today we’ve talked about Data Frames in Apache Spark and its basic operations. Similarly to RDDs, DataFrames are lazily evaluated. Thus, computation is performed only when an action (e.g. display the result, or save output) happens. This allows their executions to be optimized, by applying techniques such as predicate push-downs and bytecode generation. All DataFrame operations are also automatically parallelized and distributed on clusters. Hope, you have enjoyed this article and we are glad to see you in the ranks of our students. Till next time!

Data Science solutions for your business

Comments (0)

Add a new comment: