Practical Apache Spark in 10 minutes. Part 4 - SQL


Practical Apache Spark in 10 minutes. Part 4 - SQL

Spark SQL is a part of Apache Spark big data framework designed for processing structured and semi-structured data. It provides a DataFrame API that simplifies and accelerates data manipulations. DataFrame is a special type of object, conceptually similar to a table in relational database. It represents a distributed collection of data organized into named columns. DataFrames can be created from external sources, retrieved with a query from a database, or converted from RDD; the inverse transform is also possible. This abstraction is designed for sampling, filtering, aggregating, and visualizing the data.

Today we will show you how to construct and work with DataFrames with the help of Spark SQL and pyspark.

Comma-Separated Values (CSV) File

Start by running pyspark.

./bin/pyspark

Then, let's load some data into a DataFrame. We will use the Movielens small dataset which you can find here. First off, specify few options for the loader, namely set delimiter to a semicolon and header to True so the names of columns will be loaded from the file.


data = spark.read.format("csv")\ .option("delimiter",";")\ .option("header",True)\ .load("example/movielens.csv")

To take a glance at the data, we use the show() function. For instance, we can display first five rows.

data.show(5)
+-------+--------------------+--------------------+------+------+----------+
|movieId|               title|              genres|userId|rating| timestamp|
+-------+--------------------+--------------------+------+------+----------+
|      1|    Toy Story (1995)|Adventure|Animati...|     7|   3.0| 851866703|
|      2|      Jumanji (1995)|Adventure|Childre...|    15|   2.0|1134521380|
|      3|Grumpier Old Men ...|      Comedy|Romance|     5|   4.0|1163374957|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|    19|   3.0| 855192868|
|      5|Father of the Bri...|              Comedy|    15|   4.5|1093070098|
+-------+--------------------+--------------------+------+------+----------+

Next, we can perform some data manipulations using the API or the regular SQL queries. Let’s select only the title column using API.

data.select("title").show(3)
+--------------------+
|               title|
+--------------------+
|    Toy Story (1995)|
|      Jumanji (1995)|
|Grumpier Old Men ...|
+--------------------+

There are also available tools to filter the data. Say, we want to choose movies with ratings lower than 3.0. To do this, run the following:

data.filter(data['rating'] < 3.0).show(3)
+-------+--------------------+--------------------+------+------+----------+
|movieId|               title|              genres|userId|rating| timestamp|
+-------+--------------------+--------------------+------+------+----------+
|      2|      Jumanji (1995)|Adventure|Childre...|    15|   2.0|1134521380|
|     11|American Presiden...|Comedy|Drama|Romance|    15|   2.5|1093028381|
|     14|        Nixon (1995)|               Drama|    15|   2.5|1166586286|
+-------+--------------------+--------------------+------+------+----------+

Of course, there are many other possibilities for working with DataFrames; for further reference, consider exploring the API documentation.

Now, we will use SQL to query the data. To begin with, we need to register a dataframe as a temp view with the next command:

data.createOrReplaceTempView("movielens")

Say, we want to select all the entries for the user #15. So, we need to run a select query on the movielens view.

spark.sql("select * from movielens where userId = 15").show(5)
+-------+--------------------+--------------------+------+------+----------+
|movieId|               title|              genres|userId|rating| timestamp|
+-------+--------------------+--------------------+------+------+----------+
|      2|      Jumanji (1995)|Adventure|Childre...|    15|   2.0|1134521380|
|      5|Father of the Bri...|              Comedy|    15|   4.5|1093070098|
|      6|         Heat (1995)|Action|Crime|Thri...|    15|   4.0|1040205753|
|     14|        Nixon (1995)|               Drama|    15|   2.5|1166586286|
|     16|       Casino (1995)|         Crime|Drama|    15|   3.5|1093070150|
+-------+--------------------+--------------------+------+------+----------+

Now, let's create the DataFrame from RDD. We'll use the same dataset, but this time will load it as a text File (also without a header). We want to keep only three columns for simplicity. So, first load data into RDD.


rdd = sc.textFile("example/movielens.txt")\ .map(lambda line: line.split(";"))\ .map(lambda splits: (int(splits[0]), splits[1], splits[2]))

Then, take a look at the contents of rdd:

for elem in rdd.take(5):
    print(elem)
(1, 'Toy Story (1995)', 'Adventure|Animation|Children|Comedy|Fantasy')
(2, 'Jumanji (1995)', 'Adventure|Children|Fantasy')
(3, 'Grumpier Old Men (1995)', 'Comedy|Romance')
(4, 'Waiting to Exhale (1995)', 'Comedy|Drama|Romance')
(5, 'Father of the Bride Part II (1995)', 'Comedy')

At this moment, we import dependencies and create fields with specified types for the schema and as well as a schema itself.

from pyspark.sql.types import *
id_field = StructField("id", IntegerType(), True)
title_field = StructField("title", StringType(), True)
genres_field = StructField("genres", StringType(), True)
schema = StructType([id_field, title_field, genres_field])

Finally, we construct the DataFrame.

movielens = spark.createDataFrame(rdd, schema)
movielens.show(3)
+---+--------------------+--------------------+
| id|               title|              genres|
+---+--------------------+--------------------+
|  1|    Toy Story (1995)|Adventure|Animati...|
|  2|      Jumanji (1995)|Adventure|Childre...|
|  3|Grumpier Old Men ...|      Comedy|Romance|
+---+--------------------+--------------------+

Virtual Machines for data science

JSON Files

In the previous examples, we’ve been loading data from text files, but datasets are also often saved in JSON format. Spark provides a simple way to operate with JSON files.

movies = spark.read.json("example\movielens.json")

Now, let's perform a query on it.

movies.createOrReplaceTempView("movies")
nice_movies = spark.sql("select * from movies where rating > 4.9")
nice_movies.show(5)
+--------------------+-------+------+----------+--------------------+------+
|              genres|movieId|rating| timestamp|               title|userId|
+--------------------+-------+------+----------+--------------------+------+
|       Drama|Romance|     17|   5.0| 835355681|Sense and Sensibi...|     2|
|       Drama|Romance|     28|   5.0| 854714394|   Persuasion (1995)|    67|
|         Crime|Drama|     30|   5.0| 848161285|Shanghai Triad (Y...|    86|
|Mystery|Sci-Fi|Th...|     32|   5.0|1154465405|Twelve Monkeys (a...|     8|
|      Children|Drama|     34|   5.0| 949919556|         Babe (1995)|     4|
+--------------------+-------+------+----------+--------------------+------+

And finally, we can save the result of the query as a json file.

nice_movies.write.json("nice_movies")

Parquet Files

Apache Parquet is a popular column-oriented storage format, which is supported by a wide variety of data processing systems. It is often used with tools in the Hadoop ecosystem and supports all of the data types in Spark SQL. Spark SQL provides methods to read from and write to parquet files.

Let’s save our first DataFrame as Parquet file.

data.write.parquet('movielens.parquet')

After that, we are able to read this file.

parquet = spark.read.parquet('movielens.parquet')

Now we can play with it as with regular DataFrame. Let’s register it as a view and select all of the movies which are neither good nor bad.

parquet.createOrReplaceTempView('parquetlens')
just_movies = spark.sql("select title, rating from parquetlens where rating between 2 and 5")
just_movies.show(5)
+--------------------+------+
|               title|rating|
+--------------------+------+
|    Toy Story (1995)|   3.0|
|      Jumanji (1995)|   2.0|
|Grumpier Old Men ...|   4.0|
|Waiting to Exhale...|   3.0|
|Father of the Bri...|   4.5|
+--------------------+------+

Conclusions

In this article, we’ve briefly discussed how to create DataFrames from CSV, JSON, and parquet files in Spark SQL. From the article you should have understood some basic manipulations, but there are many other abilities for you to explore. Hope, the article was helpful for you. Study every day and improve yourself. See you soon!

 Slack chat with peer data science students

Comments (0)

Add a new comment: