Spark SQL, DataFrames and Datasets Guide

  • Overview
    • SQL
    • Datasets and DataFrames
  • Getting Started
    • Starting Point: SparkSession
    • Creating DataFrames
    • Untyped Dataset Operations (aka DataFrame Operations)
    • Running SQL Queries Programmatically
    • Global Temporary View
    • Creating Datasets
    • Interoperating with RDDs
      • Inferring the Schema Using Reflection
      • Programmatically Specifying the Schema
    • Aggregations
      • Untyped User-Defined Aggregate Functions
      • Type-Safe User-Defined Aggregate Functions
  • Data Sources
    • Generic Load/Save Functions
      • Manually Specifying Options
      • Run SQL on files directly
      • Save Modes
      • Saving to Persistent Tables
      • Bucketing, Sorting and Partitioning
    • Parquet Files
      • Loading Data Programmatically
      • Partition Discovery
      • Schema Merging
      • Hive metastore Parquet table conversion
        • Hive/Parquet Schema Reconciliation
        • Metadata Refreshing
      • Configuration
    • JSON Datasets
    • Hive Tables
      • Specifying storage format for Hive tables
      • Interacting with Different Versions of Hive Metastore
    • JDBC To Other Databases
    • Troubleshooting
  • Performance Tuning
    • Caching Data In Memory
    • Other Configuration Options
  • Distributed SQL Engine
    • Running the Thrift JDBC/ODBC server
    • Running the Spark SQL CLI
  • Migration Guide
    • Upgrading From Spark SQL 2.1 to 2.2
    • Upgrading From Spark SQL 2.0 to 2.1
    • Upgrading From Spark SQL 1.6 to 2.0
    • Upgrading From Spark SQL 1.5 to 1.6
    • Upgrading From Spark SQL 1.4 to 1.5
    • Upgrading from Spark SQL 1.3 to 1.4
      • DataFrame data reader/writer interface
      • DataFrame.groupBy retains grouping columns
      • Behavior change on DataFrame.withColumn
    • Upgrading from Spark SQL 1.0-1.2 to 1.3
      • Rename of SchemaRDD to DataFrame
      • Unification of the Java and Scala APIs
      • Isolation of Implicit Conversions and Removal of dsl Package (Scala-only)
      • Removal of the type aliases in org.apache.spark.sql for DataType (Scala-only)
      • UDF Registration Moved to sqlContext.udf (Java & Scala)
      • Python DataTypes No Longer Singletons
    • Compatibility with Apache Hive
      • Deploying in Existing Hive Warehouses
      • Supported Hive Features
      • Unsupported Hive Functionality
  • Reference
    • Data Types
    • NaN Semantics

Getting Started

Starting Point: SparkSession

from pyspark.sql import SparkSession

spark = SparkSession 
    .appName("Python Spark SQL basic example") 
    .config("spark.some.config.option", "some-value") 

Creating DataFrames

# spark is an existing SparkSession
df = spark.read.json("examples/src/main/resources/people.json")
# Displays the content of the DataFrame to stdout
# +----+-------+
# | age|   name|
# +----+-------+
# |null|Michael|
# |  30|   Andy|
# |  19| Justin|
# +----+-------+

Untyped Dataset Operations (aka DataFrame Operations)

# spark, df are from the previous example
# Print the schema in a tree format
# root
# |-- age: long (nullable = true)
# |-- name: string (nullable = true)

# Select only the "name" column
# +-------+
# |   name|
# +-------+
# |Michael|
# |   Andy|
# | Justin|
# +-------+

# Select everybody, but increment the age by 1
df.select(df['name'], df['age'] + 1).show()
# +-------+---------+
# |   name|(age + 1)|
# +-------+---------+
# |Michael|     null|
# |   Andy|       31|
# | Justin|       20|
# +-------+---------+

# Select people older than 21
df.filter(df['age'] > 21).show()
# +---+----+
# |age|name|
# +---+----+
# | 30|Andy|
# +---+----+

# Count people by age
# +----+-----+
# | age|count|
# +----+-----+
# |  19|    1|
# |null|    1|
# |  30|    1|
# +----+-----+

Running SQL Queries Programmatically

# Register the DataFrame as a SQL temporary view

sqlDF = spark.sql("SELECT * FROM people")
# +----+-------+
# | age|   name|
# +----+-------+
# |null|Michael|
# |  30|   Andy|
# |  19| Justin|
# +----+-------+

Global Temporary View

# Register the DataFrame as a global temporary view

# Global temporary view is tied to a system preserved database `global_temp`
spark.sql("SELECT * FROM global_temp.people").show()
# +----+-------+
# | age|   name|
# +----+-------+
# |null|Michael|
# |  30|   Andy|
# |  19| Justin|
# +----+-------+

# Global temporary view is cross-session
spark.newSession().sql("SELECT * FROM global_temp.people").show()
# +----+-------+
# | age|   name|
# +----+-------+
# |null|Michael|
# |  30|   Andy|
# |  19| Justin|
# +----+-------+

Inferring the Schema Using Reflection

from pyspark.sql import Row

sc = spark.sparkContext

# Load a text file and convert each line to a Row.
lines = sc.textFile("examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))

# Infer the schema, and register the DataFrame as a table.
schemaPeople = spark.createDataFrame(people)

# SQL can be run over DataFrames that have been registered as a table.
teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

# The results of SQL queries are Dataframe objects.
# rdd returns the content as an :class:`pyspark.RDD` of :class:`Row`.
teenNames = teenagers.rdd.map(lambda p: "Name: " + p.name).collect()
for name in teenNames:
# Name: Justin

Programmatically Specifying the Schema

# Import data types
from pyspark.sql.types import *

sc = spark.sparkContext

# Load a text file and convert each line to a Row.
lines = sc.textFile("examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
# Each line is converted to a tuple.
people = parts.map(lambda p: (p[0], p[1].strip()))

# The schema is encoded in a string.
schemaString = "name age"

fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
schema = StructType(fields)

# Apply the schema to the RDD.
schemaPeople = spark.createDataFrame(people, schema)

# Creates a temporary view using the DataFrame

# SQL can be run over DataFrames that have been registered as a table.
results = spark.sql("SELECT name FROM people")

# +-------+
# |   name|
# +-------+
# |Michael|
# |   Andy|
# | Justin|
# +-------+

Generic Load/Save Functions

df = spark.read.load("examples/src/main/resources/users.parquet")
df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")

Manually Specifying Options

df = spark.read.load("examples/src/main/resources/people.json", format="json")
df.select("name", "age").write.save("namesAndAges.parquet", format="parquet")

Bucketing, Sorting and Partitioning

df.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed")


df = spark.read.parquet("examples/src/main/resources/users.parquet")
    .bucketBy(42, "name")

Loading Data Programmatically

peopleDF = spark.read.json("examples/src/main/resources/people.json")

# DataFrames can be saved as Parquet files, maintaining the schema information.

# Read in the Parquet file created above.
# Parquet files are self-describing so the schema is preserved.
# The result of loading a parquet file is also a DataFrame.
parquetFile = spark.read.parquet("people.parquet")

# Parquet files can also be used to create a temporary view and then used in SQL statements.
teenagers = spark.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
# +------+
# |  name|
# +------+
# |Justin|
# +------+

Schema Merging

from pyspark.sql import Row

# spark is from the previous example.
# Create a simple DataFrame, stored into a partition directory
sc = spark.sparkContext

squaresDF = spark.createDataFrame(sc.parallelize(range(1, 6))
                                  .map(lambda i: Row(single=i, double=i ** 2)))

# Create another DataFrame in a new partition directory,
# adding a new column and dropping an existing column
cubesDF = spark.createDataFrame(sc.parallelize(range(6, 11))
                                .map(lambda i: Row(single=i, triple=i ** 3)))

# Read the partitioned table
mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table")

# The final schema consists of all 3 columns in the Parquet files together
# with the partitioning column appeared in the partition directory paths.
# root
#  |-- double: long (nullable = true)
#  |-- single: long (nullable = true)
#  |-- triple: long (nullable = true)
#  |-- key: integer (nullable = true)

Metadata Refreshing

# spark is an existing SparkSession

JSON Datasets

# spark is from the previous example.
sc = spark.sparkContext

# A JSON dataset is pointed to by path.
# The path can be either a single text file or a directory storing text files
path = "examples/src/main/resources/people.json"
peopleDF = spark.read.json(path)

# The inferred schema can be visualized using the printSchema() method
# root
#  |-- age: long (nullable = true)
#  |-- name: string (nullable = true)

# Creates a temporary view using the DataFrame

# SQL statements can be run by using the sql methods provided by spark
teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19")
# +------+
# |  name|
# +------+
# |Justin|
# +------+

# Alternatively, a DataFrame can be created for a JSON dataset represented by
# an RDD[String] storing one JSON object per string
jsonStrings = ['{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}']
otherPeopleRDD = sc.parallelize(jsonStrings)
otherPeople = spark.read.json(otherPeopleRDD)
# +---------------+----+
# |        address|name|
# +---------------+----+
# |[Columbus,Ohio]| Yin|
# +---------------+----+

Hive Tables

from os.path import expanduser, join, abspath

from pyspark.sql import SparkSession
from pyspark.sql import Row

# warehouse_location points to the default location for managed databases and tables
warehouse_location = abspath('spark-warehouse')

spark = SparkSession 
    .appName("Python Spark SQL Hive integration example") 
    .config("spark.sql.warehouse.dir", warehouse_location) 

# spark is an existing SparkSession
spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

# Queries are expressed in HiveQL
spark.sql("SELECT * FROM src").show()
# +---+-------+
# |key|  value|
# +---+-------+
# |238|val_238|
# | 86| val_86|
# |311|val_311|
# ...

# Aggregation queries are also supported.
spark.sql("SELECT COUNT(*) FROM src").show()
# +--------+
# |count(1)|
# +--------+
# |    500 |
# +--------+

# The results of SQL queries are themselves DataFrames and support all normal functions.
sqlDF = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")

# The items in DataFrames are of type Row, which allows you to access each column by ordinal.
stringsDS = sqlDF.rdd.map(lambda row: "Key: %d, Value: %s" % (row.key, row.value))
for record in stringsDS.collect():
# Key: 0, Value: val_0
# Key: 0, Value: val_0
# Key: 0, Value: val_0
# ...

# You can also use DataFrames to create temporary views within a SparkSession.
Record = Row("key", "value")
recordsDF = spark.createDataFrame([Record(i, "val_" + str(i)) for i in range(1, 101)])

# Queries can then join DataFrame data with data stored in Hive.
spark.sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()
# +---+------+---+------+
# |key| value|key| value|
# +---+------+---+------+
# |  2| val_2|  2| val_2|
# |  4| val_4|  4| val_4|
# |  5| val_5|  5| val_5|
# ...

JDBC To Other Databases

# Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
# Loading data from a JDBC source
jdbcDF = spark.read 
    .option("url", "jdbc:postgresql:dbserver") 
    .option("dbtable", "schema.tablename") 
    .option("user", "username") 
    .option("password", "password") 

jdbcDF2 = spark.read 
    .jdbc("jdbc:postgresql:dbserver", "schema.tablename",
          properties={"user": "username", "password": "password"})

# Saving data to a JDBC source
    .option("url", "jdbc:postgresql:dbserver") 
    .option("dbtable", "schema.tablename") 
    .option("user", "username") 
    .option("password", "password") 

    .jdbc("jdbc:postgresql:dbserver", "schema.tablename",
          properties={"user": "username", "password": "password"})

# Specifying create table column data types on write
    .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)") 
    .jdbc("jdbc:postgresql:dbserver", "schema.tablename",
          properties={"user": "username", "password": "password"})

DataFrame.groupBy retains grouping columns

import pyspark.sql.functions as func

# In 1.3.x, in order for the grouping column "department" to show up,
# it must be included explicitly as part of the agg function call.
df.groupBy("department").agg(df["department"], func.max("age"), func.sum("expense"))

# In 1.4+, grouping column "department" is included automatically.
df.groupBy("department").agg(func.max("age"), func.sum("expense"))

# Revert to 1.3.x behavior (not retaining grouping column) by:
sqlContext.setConf("spark.sql.retainGroupColumns", "false")


