我是靠谱客的博主 野性咖啡豆,最近开发中收集的这篇文章主要介绍pyspark-Spark SQL, DataFrames and Datasets GuideSpark SQL, DataFrames and Datasets GuideGetting Started,觉得挺不错的,现在分享给大家,希望可以做个参考。
概述
参考:
1、https://github.com/apache/spark/tree/v2.2.0
2、http://spark.apache.org/docs/latest/sql-programming-guide.html
Spark SQL, DataFrames and Datasets Guide
- Overview
- SQL
- Datasets and DataFrames
- Getting Started
- Starting Point: SparkSession
- Creating DataFrames
- Untyped Dataset Operations (aka DataFrame Operations)
- Running SQL Queries Programmatically
- Global Temporary View
- Creating Datasets
- Interoperating with RDDs
- Inferring the Schema Using Reflection
- Programmatically Specifying the Schema
- Aggregations
- Untyped User-Defined Aggregate Functions
- Type-Safe User-Defined Aggregate Functions
- Data Sources
- Generic Load/Save Functions
- Manually Specifying Options
- Run SQL on files directly
- Save Modes
- Saving to Persistent Tables
- Bucketing, Sorting and Partitioning
- Parquet Files
- Loading Data Programmatically
- Partition Discovery
- Schema Merging
- Hive metastore Parquet table conversion
- Hive/Parquet Schema Reconciliation
- Metadata Refreshing
- Configuration
- JSON Datasets
- Hive Tables
- Specifying storage format for Hive tables
- Interacting with Different Versions of Hive Metastore
- JDBC To Other Databases
- Troubleshooting
- Generic Load/Save Functions
- Performance Tuning
- Caching Data In Memory
- Other Configuration Options
- Distributed SQL Engine
- Running the Thrift JDBC/ODBC server
- Running the Spark SQL CLI
- Migration Guide
- Upgrading From Spark SQL 2.1 to 2.2
- Upgrading From Spark SQL 2.0 to 2.1
- Upgrading From Spark SQL 1.6 to 2.0
- Upgrading From Spark SQL 1.5 to 1.6
- Upgrading From Spark SQL 1.4 to 1.5
- Upgrading from Spark SQL 1.3 to 1.4
- DataFrame data reader/writer interface
- DataFrame.groupBy retains grouping columns
- Behavior change on DataFrame.withColumn
- Upgrading from Spark SQL 1.0-1.2 to 1.3
- Rename of SchemaRDD to DataFrame
- Unification of the Java and Scala APIs
- Isolation of Implicit Conversions and Removal of dsl Package (Scala-only)
- Removal of the type aliases in org.apache.spark.sql for DataType (Scala-only)
- UDF Registration Moved to
sqlContext.udf
(Java & Scala) - Python DataTypes No Longer Singletons
- Compatibility with Apache Hive
- Deploying in Existing Hive Warehouses
- Supported Hive Features
- Unsupported Hive Functionality
- Reference
- Data Types
- NaN Semantics
Getting Started
Starting Point: SparkSession
from pyspark.sql import SparkSession spark = SparkSession .builder .appName("Python Spark SQL basic example") .config("spark.some.config.option", "some-value") .getOrCreate()
Creating DataFrames
# spark is an existing SparkSession df = spark.read.json("examples/src/main/resources/people.json") # Displays the content of the DataFrame to stdout df.show() # +----+-------+ # | age| name| # +----+-------+ # |null|Michael| # | 30| Andy| # | 19| Justin| # +----+-------+
Untyped Dataset Operations (aka DataFrame Operations)
# spark, df are from the previous example # Print the schema in a tree format df.printSchema() # root # |-- age: long (nullable = true) # |-- name: string (nullable = true) # Select only the "name" column df.select("name").show() # +-------+ # | name| # +-------+ # |Michael| # | Andy| # | Justin| # +-------+ # Select everybody, but increment the age by 1 df.select(df['name'], df['age'] + 1).show() # +-------+---------+ # | name|(age + 1)| # +-------+---------+ # |Michael| null| # | Andy| 31| # | Justin| 20| # +-------+---------+ # Select people older than 21 df.filter(df['age'] > 21).show() # +---+----+ # |age|name| # +---+----+ # | 30|Andy| # +---+----+ # Count people by age df.groupBy("age").count().show() # +----+-----+ # | age|count| # +----+-----+ # | 19| 1| # |null| 1| # | 30| 1| # +----+-----+
Running SQL Queries Programmatically
# Register the DataFrame as a SQL temporary view df.createOrReplaceTempView("people") sqlDF = spark.sql("SELECT * FROM people") sqlDF.show() # +----+-------+ # | age| name| # +----+-------+ # |null|Michael| # | 30| Andy| # | 19| Justin| # +----+-------+
Global Temporary View
# Register the DataFrame as a global temporary view df.createGlobalTempView("people") # Global temporary view is tied to a system preserved database `global_temp` spark.sql("SELECT * FROM global_temp.people").show() # +----+-------+ # | age| name| # +----+-------+ # |null|Michael| # | 30| Andy| # | 19| Justin| # +----+-------+ # Global temporary view is cross-session spark.newSession().sql("SELECT * FROM global_temp.people").show() # +----+-------+ # | age| name| # +----+-------+ # |null|Michael| # | 30| Andy| # | 19| Justin| # +----+-------+
Inferring the Schema Using Reflection
from pyspark.sql import Row sc = spark.sparkContext # Load a text file and convert each line to a Row. lines = sc.textFile("examples/src/main/resources/people.txt") parts = lines.map(lambda l: l.split(",")) people = parts.map(lambda p: Row(name=p[0], age=int(p[1]))) # Infer the schema, and register the DataFrame as a table. schemaPeople = spark.createDataFrame(people) schemaPeople.createOrReplaceTempView("people") # SQL can be run over DataFrames that have been registered as a table. teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") # The results of SQL queries are Dataframe objects. # rdd returns the content as an :class:`pyspark.RDD` of :class:`Row`. teenNames = teenagers.rdd.map(lambda p: "Name: " + p.name).collect() for name in teenNames: print(name) # Name: Justin
Programmatically Specifying the Schema
# Import data types from pyspark.sql.types import * sc = spark.sparkContext # Load a text file and convert each line to a Row. lines = sc.textFile("examples/src/main/resources/people.txt") parts = lines.map(lambda l: l.split(",")) # Each line is converted to a tuple. people = parts.map(lambda p: (p[0], p[1].strip())) # The schema is encoded in a string. schemaString = "name age" fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()] schema = StructType(fields) # Apply the schema to the RDD. schemaPeople = spark.createDataFrame(people, schema) # Creates a temporary view using the DataFrame schemaPeople.createOrReplaceTempView("people") # SQL can be run over DataFrames that have been registered as a table. results = spark.sql("SELECT name FROM people") results.show() # +-------+ # | name| # +-------+ # |Michael| # | Andy| # | Justin| # +-------+
Generic Load/Save Functions
df = spark.read.load("examples/src/main/resources/users.parquet")
df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
Manually Specifying Options
df = spark.read.load("examples/src/main/resources/people.json", format="json") df.select("name", "age").write.save("namesAndAges.parquet", format="parquet")
Bucketing, Sorting and Partitioning
df.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed") df.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet") df = spark.read.parquet("examples/src/main/resources/users.parquet") (df .write .partitionBy("favorite_color") .bucketBy(42, "name") .saveAsTable("people_partitioned_bucketed"))
Loading Data Programmatically
peopleDF = spark.read.json("examples/src/main/resources/people.json") # DataFrames can be saved as Parquet files, maintaining the schema information. peopleDF.write.parquet("people.parquet") # Read in the Parquet file created above. # Parquet files are self-describing so the schema is preserved. # The result of loading a parquet file is also a DataFrame. parquetFile = spark.read.parquet("people.parquet") # Parquet files can also be used to create a temporary view and then used in SQL statements. parquetFile.createOrReplaceTempView("parquetFile") teenagers = spark.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19") teenagers.show() # +------+ # | name| # +------+ # |Justin| # +------+
Schema Merging
from pyspark.sql import Row # spark is from the previous example. # Create a simple DataFrame, stored into a partition directory sc = spark.sparkContext squaresDF = spark.createDataFrame(sc.parallelize(range(1, 6)) .map(lambda i: Row(single=i, double=i ** 2))) squaresDF.write.parquet("data/test_table/key=1") # Create another DataFrame in a new partition directory, # adding a new column and dropping an existing column cubesDF = spark.createDataFrame(sc.parallelize(range(6, 11)) .map(lambda i: Row(single=i, triple=i ** 3))) cubesDF.write.parquet("data/test_table/key=2") # Read the partitioned table mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table") mergedDF.printSchema() # The final schema consists of all 3 columns in the Parquet files together # with the partitioning column appeared in the partition directory paths. # root # |-- double: long (nullable = true) # |-- single: long (nullable = true) # |-- triple: long (nullable = true) # |-- key: integer (nullable = true)
Metadata Refreshing
# spark is an existing SparkSession spark.catalog.refreshTable("my_table")
JSON Datasets
# spark is from the previous example. sc = spark.sparkContext # A JSON dataset is pointed to by path. # The path can be either a single text file or a directory storing text files path = "examples/src/main/resources/people.json" peopleDF = spark.read.json(path) # The inferred schema can be visualized using the printSchema() method peopleDF.printSchema() # root # |-- age: long (nullable = true) # |-- name: string (nullable = true) # Creates a temporary view using the DataFrame peopleDF.createOrReplaceTempView("people") # SQL statements can be run by using the sql methods provided by spark teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19") teenagerNamesDF.show() # +------+ # | name| # +------+ # |Justin| # +------+ # Alternatively, a DataFrame can be created for a JSON dataset represented by # an RDD[String] storing one JSON object per string jsonStrings = ['{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}'] otherPeopleRDD = sc.parallelize(jsonStrings) otherPeople = spark.read.json(otherPeopleRDD) otherPeople.show() # +---------------+----+ # | address|name| # +---------------+----+ # |[Columbus,Ohio]| Yin| # +---------------+----+
Hive Tables
from os.path import expanduser, join, abspath from pyspark.sql import SparkSession from pyspark.sql import Row # warehouse_location points to the default location for managed databases and tables warehouse_location = abspath('spark-warehouse') spark = SparkSession .builder .appName("Python Spark SQL Hive integration example") .config("spark.sql.warehouse.dir", warehouse_location) .enableHiveSupport() .getOrCreate() # spark is an existing SparkSession spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive") spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src") # Queries are expressed in HiveQL spark.sql("SELECT * FROM src").show() # +---+-------+ # |key| value| # +---+-------+ # |238|val_238| # | 86| val_86| # |311|val_311| # ... # Aggregation queries are also supported. spark.sql("SELECT COUNT(*) FROM src").show() # +--------+ # |count(1)| # +--------+ # | 500 | # +--------+ # The results of SQL queries are themselves DataFrames and support all normal functions. sqlDF = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key") # The items in DataFrames are of type Row, which allows you to access each column by ordinal. stringsDS = sqlDF.rdd.map(lambda row: "Key: %d, Value: %s" % (row.key, row.value)) for record in stringsDS.collect(): print(record) # Key: 0, Value: val_0 # Key: 0, Value: val_0 # Key: 0, Value: val_0 # ... # You can also use DataFrames to create temporary views within a SparkSession. Record = Row("key", "value") recordsDF = spark.createDataFrame([Record(i, "val_" + str(i)) for i in range(1, 101)]) recordsDF.createOrReplaceTempView("records") # Queries can then join DataFrame data with data stored in Hive. spark.sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show() # +---+------+---+------+ # |key| value|key| value| # +---+------+---+------+ # | 2| val_2| 2| val_2| # | 4| val_4| 4| val_4| # | 5| val_5| 5| val_5| # ...
JDBC To Other Databases
# Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods # Loading data from a JDBC source jdbcDF = spark.read .format("jdbc") .option("url", "jdbc:postgresql:dbserver") .option("dbtable", "schema.tablename") .option("user", "username") .option("password", "password") .load() jdbcDF2 = spark.read .jdbc("jdbc:postgresql:dbserver", "schema.tablename", properties={"user": "username", "password": "password"}) # Saving data to a JDBC source jdbcDF.write .format("jdbc") .option("url", "jdbc:postgresql:dbserver") .option("dbtable", "schema.tablename") .option("user", "username") .option("password", "password") .save() jdbcDF2.write .jdbc("jdbc:postgresql:dbserver", "schema.tablename", properties={"user": "username", "password": "password"}) # Specifying create table column data types on write jdbcDF.write .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)") .jdbc("jdbc:postgresql:dbserver", "schema.tablename", properties={"user": "username", "password": "password"})
DataFrame.groupBy retains grouping columns
import pyspark.sql.functions as func # In 1.3.x, in order for the grouping column "department" to show up, # it must be included explicitly as part of the agg function call. df.groupBy("department").agg(df["department"], func.max("age"), func.sum("expense")) # In 1.4+, grouping column "department" is included automatically. df.groupBy("department").agg(func.max("age"), func.sum("expense")) # Revert to 1.3.x behavior (not retaining grouping column) by: sqlContext.setConf("spark.sql.retainGroupColumns", "false")
最后
以上就是野性咖啡豆为你收集整理的pyspark-Spark SQL, DataFrames and Datasets GuideSpark SQL, DataFrames and Datasets GuideGetting Started的全部内容,希望文章能够帮你解决pyspark-Spark SQL, DataFrames and Datasets GuideSpark SQL, DataFrames and Datasets GuideGetting Started所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复