我是靠谱客的博主 斯文保温杯,最近开发中收集的这篇文章主要介绍pyspark.sql DataFrame创建及常用操作,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

Spark SQL 简介及参考链接

Spark 是一个基于内存的用于处理大数据的集群计算框架。它提供了一套简单的编程接口,从而使得应用程序开发者方便使用集群节点的CPU,内存,存储资源来处理大数据。
Spark API提供了Scala, Java, Python和R的编程接口,可以使用这些语言来开发Spark应用。为了用Spark支持Python,Apache Spark社区发布了一个工具PySpark。使用PySpark,您也可以使用Python编程语言处理RDD。
Spark SQL将 SQL和HiveSQL的简单与强大融合到一起。 Spark SQL是一个运行在Spark上的Spark库。它提供了比Spark Core更为高层的用于处理结构化数据的抽象.
Spark DataFrame 派生于RDD类,分布式但是提供了非常强大的数据操作功能。 本文主要梳理Spark DataFrame的常用方法,之后写一下与DataFrame操作密切配合的Spark SQL内置函数
和 用户UDF (用户定义函数) 和 UDAF (用户定义聚合函数)

pyspark.sql 核心类

  • pyspark.SparkContext: Spark 库的主要入口点,它表示与Spark集群的一个连接,其他重要的对象都要依赖它.SparkContext存在于Driver中,是Spark功能的主要入口。代表着与Spark集群的连接,可以在集群上创建RDD,accumulators和广播变量
  • pyspark.RDD: 是Spark的主要数据抽象概念,是Spark库中定义的一个抽象类。
  • pyspark.streaming.StreamingContext 一个定义在Spark Streaming库中定义的类, 每一个Spark Streaming 应用都必须创建这个类
  • pyspark.streaming.DStrem:离散数据流,是Spark Streaming处理数据流的主要对象
  • pyspark.sql.SparkSession: 是DataFrame和SQL函数的主要入口点。
  • pyspark.sql.DataFrame: 是Spark SQL的主要抽象对象,若干行的分布式数据,每一行都要若干个有名字的列。 跟R/Python中的DataFrame 相像,有着更丰富的优化。DataFrame可以有很多种方式进行构造,例如: 结构化数据文件,Hive的table, 外部数据库,RDD。
  • pyspark.sql.Column DataFrame 的列表达.
  • pyspark.sql.Row DataFrame的行数据

环境配置

  • os: Win 10
  • spark: spark-2.4.4-bin-hadoop2.7
  • python:python 3.7.4
  • java: jdk 1.8.0_221

从SparkSession 开始

Spark 2.20 以后 SparkSession 合并了 SQLContext 和 HiveContext, 同时支持Hive, 包括HIveSOL, Hive UDFs 的入口, 以及从Hive table中读取数据。

from pyspark.sql import SparkSession
spark = SparkSession 
.builder 
.appName("Python Spark SQL basic example") 
.config("spark.some.config.option", "some-value") 
.getOrCreate()
## 获取或者新建一个 sparkSession
#spark master URL. 本地为local, “local[4]” 本地4核,
# or “spark://master:7077” to run on a Spark standalone cluster

创建DataFrame

有了SparkSession, DataFrame可以从已有的RDD, Hive table, 或者其他spark的数据源进行创建

# spark is an existing SparkSession
# 从文件读取
# 工作目录: spark安装路径SPARK_HOME
## read.json
df = spark.read.json("examples/src/main/resources/people.json")
df.show()
+----+-------+
| age|
name|
+----+-------+
|null|Michael|
|
30|
Andy|
|
19| Justin|
+----+-------+
df = spark.read.load("examples/src/main/resources/people.json", format="json") #format: Default to ‘parquet’
## read.csv
df_csv = spark.read.csv("examples/src/main/resources/people.csv",sep=';', header= True)
## read.text
df_txt = spark.read.text("examples/src/main/resources/people.txt")
## read.parquet
df_parquet = spark.read.parquet("examples/src/main/resources/users.parquet")
## orc
df_orc = spark.read.orc("examples/src/main/resources/users.orc")
## rdd
sc = spark.sparkContext
rdd = sc.textFile('examples/src/main/resources/people.json')
df_rdd1 = spark.read.json(rdd)
# createDataFrame: rdd, list, pandas.DataFrame
df_list = spark.createDataFrame([('Tom', 80), ('Alice', None)], ["name", "height"])
l = [('Alice', 1)]
rdd = sc.parallelize(l)
df_rdd2 = spark.createDataFrame(rdd,['name', 'age'])
df_rdd2.show()
+-----+---+
| name|age|
+-----+---+
|Alice|
1|
+-----+---+
## with scheme 
from pyspark.sql.types import *
schema = StructType([
StructField("name", StringType(), True),
StructField("age", IntegerType(), True)])
df3 = spark.createDataFrame(rdd, schema)
#from pandas
import pandas
df_pandas = spark.createDataFrame(pandas.DataFrame([[1, 2]]))
df_pandas.show()
+---+---+
|
0|
1|
+---+---+
|
1|
2|
+---+---+

DataFrame常用方法

关于DataFrame的操作,感觉上和pandas.DataFrame的操作很类似,很多时候都可以触类旁通。Spark 的操作分为两部分, 转换(transformation) 和 执行(actions). 操作是lazy模式,只有遇到执行操作才会执行

创建DataFrame, customers, products, sales

customers =
[(1,'James',21,'M'), (2, "Liz",25,"F"), (3, "John", 31, "M"),
(4, "Jennifer", 45, "F"), (5, "Robert", 41, "M"), (6, "Sandra", 45, "F")]
df_customers = spark.createDataFrame(customers, ["cID", "name", "age", "gender"]) # list -> DF
df_customers.show()
+---+--------+---+------+
|cID|
name|age|gender|
+---+--------+---+------+
|
1|
James| 21|
M|
|
2|
Liz| 25|
F|
|
3|
John| 31|
M|
|
4|Jennifer| 45|
F|
|
5|
Robert| 41|
M|
|
6|
Sandra| 45|
F|
+---+--------+---+------+
products = [(1, "iPhone", 600, 400), (2, "Galaxy", 500, 400), (3, "iPad", 400, 300),
(4, "Kindel",200,100), (5, "MacBook", 1200, 900), (6, "Dell",500, 400)]
df_products = sc.parallelize(products).toDF(["pId", "name", "price", "cost"]) # List-> RDD ->DF
df_products.show()
+---+-------+-----+----+
|pId|
name|price|cost|
+---+-------+-----+----+
|
1| iPhone|
600| 400|
|
2| Galaxy|
500| 400|
|
3|
iPad|
400| 300|
|
4| Kindel|
200| 100|
|
5|MacBook| 1200| 900|
|
6|
Dell|
500| 400|
+---+-------+-----+----+
sales = [("01/01/2015", "iPhone", "USA", 40000), ("01/02/2015", "iPhone", "USA", 30000),
("01/02/2015", "iPhone", "China", 10000), ("01/02/2015", "iPhone", "China", 5000),
("01/01/2015", "S6", "USA", 20000), ("01/02/2015", "S6", "USA", 10000),
("01/01/2015", "S6", "China", 9000), ("01/02/2015", "S6", "China", 6000)]
df_sales = spark.createDataFrame(sales, ["date", "product", "country", "revenue"])
df_sales.show()
+----------+-------+-------+-------+
|
date|product|country|revenue|
+----------+-------+-------+-------+
|01/01/2015| iPhone|
USA|
40000|
|01/02/2015| iPhone|
USA|
30000|
|01/02/2015| iPhone|
China|
10000|
|01/02/2015| iPhone|
China|
5000|
|01/01/2015|
S6|
USA|
20000|
|01/02/2015|
S6|
USA|
10000|
|01/01/2015|
S6|
China|
9000|
|01/02/2015|
S6|
China|
6000|
+----------+-------+-------+-------+

基本操作

df_customers.cache() # 以列式存储在内存中
df_customers.persist() # 缓存到内存中
df_customers.unpersist() # 移除所有的blocks
df_customers.coalesce(numPartitions= 1) #返回一个有着numPartition的DataFrame
df_customers.repartition(10) ##repartitonByRange
df_customers.rdd.getNumPartitions()# 查看partitons个数
df_customers.columns # 查看列名
['cID', 'name', 'age', 'gender']
df_customers.dtypes # 返回列的数据类型
df_customers.explain() #返回物理计划,调试时应用

执行操作actions

df_customers.show(n = 2, truncate= True, vertical= False) #n是行数,truncate字符限制长度。
+---+-----+---+------+
|cID| name|age|gender|
+---+-----+---+------+
|
1|James| 21|
M|
|
2|
Liz| 25|
F|
+---+-----+---+------+
only showing top 2 rows
df_customers.collect() # 返回所有记录的列表, 每一个元素是Row对象
[Row(cID=1, name='James', age=21, gender='M'), Row(cID=2, name='Liz', age=25, gender='F'),
Row(cID=3, name='John', age=31, gender='M'), Row(cID=4, name='Jennifer', age=45, gender='F'),
Row(cID=5, name='Robert', age=41, gender='M'), Row(cID=6, name='Sandra', age=45, gender='F')]
df_customers.count() # 有多少行, 
6
df_customers.head(n=1) #df_customers.limit(), 返回前多少行; 当结果比较小的时候使用
[Row(cID=1, name='James', age=21, gender='M')]
df_customers.describe() # 探索性数据分析
df_customers.first() # 返回第一行
Row(cID=1, name='James', age=21, gender='M')
df_customers.take(2) #以Row对象的形式返回DataFrame的前几行
[Row(cID=1, name='James', age=21, gender='M'), Row(cID=2, name='Liz', age=25, gender='F')]
df_customers.printSchema() # 以树的格式输出到控制台
root
|-- cID: long (nullable = true)
|-- name: string (nullable = true)
|-- age: long (nullable = true)
|-- gender: string (nullable = true)
df_customers.corr('cID', "age") # df_customers.cov('cID', 'age') 计算两列的相关系数
0.9298147235977954

转换:查询常用方法,合并,抽样,聚合,分组聚合,子集选取

返回一个有新名的DataFrame

df_as1 = df_customers.alias("df_as1")
df_as1.show()
+---+--------+---+------+
|cID|
name|age|gender|
+---+--------+---+------+
|
1|
James| 21|
M|
|
2|
Liz| 25|
F|
|
3|
John| 31|
M|
|
4|Jennifer| 45|
F|
|
5|
Robert| 41|
M|
|
6|
Sandra| 45|
F|
+---+--------+---+------+
df_as2 = df_customers.alias("df_as2")

聚合操作. agg: 一列或多列上执行指定的聚合操作,返回一个新的DataFrame

from pyspark.sql import functions as F
df_agg = df_products.agg(F.max(df_products.price), F.min(df_products.price), F.count(df_products.name))
df_agg.show()
+----------+----------+-----------+
|max(price)|min(price)|count(name)|
+----------+----------+-----------+
|
1200|
200|
6|
+----------+----------+-----------+

访问列

df_customers.age # df_customers['age'] # 访问一列, 返回Column对象
Column<b'age'>
df_customers[['age','gender']].show()
+---+------+
|age|gender|
+---+------+
| 21|
M|
| 25|
F|
| 31|
M|
| 45|
F|
| 41|
M|
| 45|
F|
+---+------+

去重,删除列

#distinct 去除重复行,返回一个新的DataFram, 包含不重复的行
df_withoutdup = df_customers.distinct()
df_withoutdup
# drop: 丢弃指定的列,返回一个新的DataFrame
df_drop = df_customers.drop('age', 'gender')
df_drop.show()
+---+--------+
|cID|
name|
+---+--------+
|
1|
James|
|
2|
Liz|
|
3|
John|
|
4|Jennifer|
|
5|
Robert|
|
6|
Sandra|
+---+--------+
# dropDuplicates: 根据指定列删除相同的行
df_dropDup = df_sales.dropDuplicates(['product', 'country'])
df_dropDup.show()
+----------+-------+-------+-------+
|
date|product|country|revenue|
+----------+-------+-------+-------+
|01/01/2015|
S6|
China|
9000|
|01/02/2015| iPhone|
China|
10000|
|01/01/2015| iPhone|
USA|
40000|
|01/01/2015|
S6|
USA|
20000|
+----------+-------+-------+-------+

行筛选和列选择

# filter 筛选元素, 过滤DataFrame的行, 输入参数是一个SQL语句, 返回一个新的DataFrame
df_filter = df_customers.filter(df_customers.age > 25)
df_filter.show()
+---+--------+---+------+
|cID|
name|age|gender|
+---+--------+---+------+
|
3|
John| 31|
M|
|
4|Jennifer| 45|
F|
|
5|
Robert| 41|
M|
|
6|
Sandra| 45|
F|
+---+--------+---+------+
# select 返回指定列的数据,返回一个DataFrame
df_select = df_customers.select('name','age')
df_select.show()
+--------+---+
|
name|age|
+--------+---+
|
James| 21|
|
Liz| 25|
|
John| 31|
|Jennifer| 45|
|
Robert| 41|
|
Sandra| 45|
+--------+---+
df_select1 = df_customers.select(df_customers['name'],df_customers['age'] +1)
df_select1.show()
+--------+---------+
|
name|(age + 1)|
+--------+---------+
|
James|
22|
|
Liz|
26|
|
John|
32|
|Jennifer|
46|
|
Robert|
42|
|
Sandra|
46|
+--------+---------+
df_select2 = df_customers.selectExpr('name', 'age +1 AS new_age')
df_select2.show() # 可以接收SQL表达式
+--------+-------+
|
name|new_age|
+--------+-------+
|
James|
22|
|
Liz|
26|
|
John|
32|
|Jennifer|
46|
|
Robert|
42|
|
Sandra|
46|
+--------+-------+

增加列,替换列

## withColumn 对源DataFrame 做新增一列或替换一原有列的操作, 返回DataFrame
df_new = df_products.withColumn("profit", df_products.price - df_products.cost)
df_new.show()
+---+-------+-----+----+------+
|pId|
name|price|cost|profit|
+---+-------+-----+----+------+
|
1| iPhone|
600| 400|
200|
|
2| Galaxy|
500| 400|
100|
|
3|
iPad|
400| 300|
100|
|
4| Kindel|
200| 100|
100|
|
5|MacBook| 1200| 900|
300|
|
6|
Dell|
500| 400|
100|
+---+-------+-----+----+------+
## withColumnRenamed (existing, new)
df_customers.withColumnRenamed('age', 'age2')
DataFrame[cID: bigint, name: string, age2: bigint, gender: string]

分组groupby

# groupby/groupBy 根据参数的列对源DataFrame中的行进行分组
groupByGender = df_customers.groupBy('gender').count()
groupByGender.show()
+------+-----+
|gender|count|
+------+-----+
|
F|
3|
|
M|
3|
+------+-----+
revenueByproduct = df_sales.groupBy('product').sum('revenue')
revenueByproduct.show()
+-------+------------+
|product|sum(revenue)|
+-------+------------+
|
S6|
45000|
| iPhone|
85000|
+-------+------------+

替换replace

df_replace = df_customers.replace(["James", "Liz"], ["James2", "Liz2"], subset = ["name"])
df_replace.show()
+---+--------+---+------+
|cID|
name|age|gender|
+---+--------+---+------+
|
1|
James2| 21|
M|
|
2|
Liz2| 25|
F|
|
3|
John| 31|
M|
|
4|Jennifer| 45|
F|
|
5|
Robert| 41|
M|
|
6|
Sandra| 45|
F|
+---+--------+---+------+

缺失值处理(参数pandas.DataFrame类似)

from pyspark.sql import Row
df = sc.parallelize([ 
Row(name='Alice', age=5, height=80), 
Row(name= None, age=5, height=70), 
Row(name='Bob', age=None, height=80)]).toDF()
df.show()
+----+------+-----+
| age|height| name|
+----+------+-----+
|
5|
80|Alice|
|
5|
70| null|
|null|
80|
Bob|
+----+------+-----+
# dropna #na.drop删除包含缺失值的列, 
df.na.drop(how='any', thresh=None, subset=None).show() # df.dropna().show()
+---+------+-----+
|age|height| name|
+---+------+-----+
|
5|
80|Alice|
+---+------+-----+
# fillna # na.fill # 
df.na.fill({'age': 5, 'name': 'unknown'}).show()
+---+------+-------+
|age|height|
name|
+---+------+-------+
|
5|
80|
Alice|
|
5|
70|unknown|
|
5|
80|
Bob|
+---+------+-------+

遍历循环

##foreach: 对DataFrame的每一行进行操作
def f(customer):
print(customer.age)
df_customers.foreach(f)
##foreachPartition, 对每一个Partition进行遍历操作

合并

## intersect 取交集,返回一个新的DataFrame
customers2 =
[(11,'Jackson',21,'M'), (12, "Emma",25,"F"), (13, "Olivia", 31, "M"),
(4, "Jennifer", 45, "F"), (5, "Robert", 41, "M"), (6, "Sandra", 45, "F")]
df_customers2 = spark.createDataFrame(customers2, ["cID", "name", "age", "gender"]) # list -> DF
df_customers2.show()
+---+--------+---+------+
|cID|
name|age|gender|
+---+--------+---+------+
| 11| Jackson| 21|
M|
| 12|
Emma| 25|
F|
| 13|
Olivia| 31|
M|
|
4|Jennifer| 45|
F|
|
5|
Robert| 41|
M|
|
6|
Sandra| 45|
F|
+---+--------+---+------+
df_common = df_customers.intersect(df_customers2)
df_common.show()
+---+--------+---+------+
|cID|
name|age|gender|
+---+--------+---+------+
|
6|
Sandra| 45|
F|
|
5|
Robert| 41|
M|
|
4|Jennifer| 45|
F|
+---+--------+---+------+
## union: 返回一个新的DataFrame, 合并行. 
# 一般后面接着distinct()
df_union = df_customers.union(df_customers2) # 根据位置合并
df_union.show()
+---+--------+---+------+
|cID|
name|age|gender|
+---+--------+---+------+
|
1|
James| 21|
M|
|
2|
Liz| 25|
F|
|
3|
John| 31|
M|
|
4|Jennifer| 45|
F|
|
5|
Robert| 41|
M|
|
6|
Sandra| 45|
F|
| 11| Jackson| 21|
M|
| 12|
Emma| 25|
F|
| 13|
Olivia| 31|
M|
|
4|Jennifer| 45|
F|
|
5|
Robert| 41|
M|
|
6|
Sandra| 45|
F|
+---+--------+---+------+
df_union_nodup = df_union.distinct()
df_union_nodup.show()
+---+--------+---+------+
|cID|
name|age|gender|
+---+--------+---+------+
|
6|
Sandra| 45|
F|
|
1|
James| 21|
M|
|
5|
Robert| 41|
M|
|
2|
Liz| 25|
F|
|
4|Jennifer| 45|
F|
| 13|
Olivia| 31|
M|
| 11| Jackson| 21|
M|
|
3|
John| 31|
M|
| 12|
Emma| 25|
F|
+---+--------+---+------+
# unionByName 根据列名进行行合并
df1 = spark.createDataFrame([[1,2,3]], ["col0", "col1", "col2"])
df2 = spark.createDataFrame([[4,5,6]], ["col1", "col2", "col0"])
df_unionbyname = df1.unionByName(df2)
df_unionbyname.show()
+----+----+----+
|col0|col1|col2|
+----+----+----+
|
1|
2|
3|
|
6|
4|
5|
+----+----+----+
## join: 与另一个DataFrame 上面执行SQL中的连接操作。 参数:DataFrame, 连接表达式,连接类型
transactions = [(1,5,3,"01/01/2015", "San Francisco"), (2,6,1, "01/02/2015", "San Jose"),
(3,1,6,"01/01/2015", "Boston"), (4,200,400,"01/02/2015","Palo Alto"),
(6, 100, 100, "01/02/2015", "Mountain View")]
df_transactions = spark.createDataFrame(transactions, ['tId', "custId", "date", "city"])
df_transactions.show()
+---+------+----+----------+-------------+
|tId|custId|date|
city|
_5|
+---+------+----+----------+-------------+
|
1|
5|
3|01/01/2015|San Francisco|
|
2|
6|
1|01/02/2015|
San Jose|
|
3|
1|
6|01/01/2015|
Boston|
|
4|
200| 400|01/02/2015|
Palo Alto|
|
6|
100| 100|01/02/2015|Mountain View|
+---+------+----+----------+-------------+
df_join_inner = df_transactions.join(df_customers, df_transactions.custId == df_customers.cID, "inner")
df_join_inner.show()
+---+------+----+----------+-------------+---+------+---+------+
|tId|custId|date|
city|
_5|cID|
name|age|gender|
+---+------+----+----------+-------------+---+------+---+------+
|
2|
6|
1|01/02/2015|
San Jose|
6|Sandra| 45|
F|
|
1|
5|
3|01/01/2015|San Francisco|
5|Robert| 41|
M|
|
3|
1|
6|01/01/2015|
Boston|
1| James| 21|
M|
+---+------+----+----------+-------------+---+------+---+------+
df_join_outer = df_transactions.join(df_customers, df_transactions.custId == df_customers.cID, "outer")
df_join_outer.show()
| tId|custId|date|
city|
_5| cID|
name| age|gender|
+----+------+----+----------+-------------+----+--------+----+------+
|
2|
6|
1|01/02/2015|
San Jose|
6|
Sandra|
45|
F|
|
1|
5|
3|01/01/2015|San Francisco|
5|
Robert|
41|
M|
|
3|
1|
6|01/01/2015|
Boston|
1|
James|
21|
M|
|
6|
100| 100|01/02/2015|Mountain View|null|
null|null|
null|
|null|
null|null|
null|
null|
3|
John|
31|
M|
|
4|
200| 400|01/02/2015|
Palo Alto|null|
null|null|
null|
|null|
null|null|
null|
null|
2|
Liz|
25|
F|
|null|
null|null|
null|
null|
4|Jennifer|
45|
F|
df_join_left = df_transactions.join(df_customers, df_transactions.custId == df_customers.cID, "left_outer")
df_join_left.show()
+---+------+----+----------+-------------+----+------+----+------+
|tId|custId|date|
city|
_5| cID|
name| age|gender|
+---+------+----+----------+-------------+----+------+----+------+
|
2|
6|
1|01/02/2015|
San Jose|
6|Sandra|
45|
F|
|
1|
5|
3|01/01/2015|San Francisco|
5|Robert|
41|
M|
|
3|
1|
6|01/01/2015|
Boston|
1| James|
21|
M|
|
6|
100| 100|01/02/2015|Mountain View|null|
null|null|
null|
|
4|
200| 400|01/02/2015|
Palo Alto|null|
null|null|
null|
+---+------+----+----------+-------------+----+------+----+------+
df_join_right = df_transactions.join(df_customers, df_transactions.custId == df_customers.cID, "right_outer")
df_join_right.show()
+----+------+----+----------+-------------+---+--------+---+------+
| tId|custId|date|
city|
_5|cID|
name|age|gender|
+----+------+----+----------+-------------+---+--------+---+------+
|
2|
6|
1|01/02/2015|
San Jose|
6|
Sandra| 45|
F|
|
1|
5|
3|01/01/2015|San Francisco|
5|
Robert| 41|
M|
|
3|
1|
6|01/01/2015|
Boston|
1|
James| 21|
M|
|null|
null|null|
null|
null|
3|
John| 31|
M|
|null|
null|null|
null|
null|
2|
Liz| 25|
F|
|null|
null|null|
null|
null|
4|Jennifer| 45|
F|
+----+------+----+----------+-------------+---+--------+---+------+
##left_semi 返回在两个表都有的行,只返回左表
##left_anti 返回只在左表有的行

排序

## orderBy/sort 返回按照指定列排序的DataFrame. 默认情况下按升序(asc)排列
df_sort1 = df_customers.orderBy("name")
df_sort1.show()
+---+--------+---+------+
|cID|
name|age|gender|
+---+--------+---+------+
|
1|
James| 21|
M|
|
4|Jennifer| 45|
F|
|
3|
John| 31|
M|
|
2|
Liz| 25|
F|
|
5|
Robert| 41|
M|
|
6|
Sandra| 45|
F|
+---+--------+---+------+
df_sort2 = df_customers.orderBy(['age','name'], ascending = [0, 1])
df_sort2.show()
+---+--------+---+------+
|cID|
name|age|gender|
+---+--------+---+------+
|
4|Jennifer| 45|
F|
|
6|
Sandra| 45|
F|
|
5|
Robert| 41|
M|
|
3|
John| 31|
M|
|
2|
Liz| 25|
F|
|
1|
James| 21|
M|
+---+--------+---+------+
df_sort3 = df_customers.sort("name")
df_sort3.show()
df_sort4 = df_customers.sort("name", ascending = False)
df_sort4.show()
+---+--------+---+------+
|cID|
name|age|gender|
+---+--------+---+------+
|
6|
Sandra| 45|
F|
|
5|
Robert| 41|
M|
|
2|
Liz| 25|
F|
|
3|
John| 31|
M|
|
4|Jennifer| 45|
F|
|
1|
James| 21|
M|
+---+--------+---+------+

抽样与分割

## sample, 返回一个DataFrame, 包含源DataFrame 指定比例行数的数据
df_sample = df_customers.sample(withReplacement= False, fraction =0.2, seed = 1)
df_sample.show()
+---+------+---+------+
|cID|
name|age|gender|
+---+------+---+------+
|
2|
Liz| 25|
F|
|
6|Sandra| 45|
F|
+---+------+---+------+
## sampleBy 按指定列,分层无放回抽样
df_sample2 = df_sales.sampleBy('product', fractions= {"iPhone": 0.5, "S6": 0.5}, seed = 1)
df_sample2.show()
+----------+-------+-------+-------+
|
date|product|country|revenue|
+----------+-------+-------+-------+
|01/01/2015| iPhone|
USA|
40000|
|01/02/2015| iPhone|
USA|
30000|
|01/02/2015| iPhone|
China|
10000|
|01/01/2015|
S6|
USA|
20000|
|01/02/2015|
S6|
USA|
10000|
|01/02/2015|
S6|
China|
6000|
+----------+-------+-------+-------+
## randomSplit: 把DataFrame分割成多个DataFrame
df_splits = df_customers.randomSplit([0.6,0.2,0.2])
df_splits[0].show()
+---+------+---+------+
|cID|
name|age|gender|
+---+------+---+------+
|
2|
Liz| 25|
F|
|
3|
John| 31|
M|
|
5|Robert| 41|
M|
+---+------+---+------+

转化成其他常用数据对象, Json, DF, pandas.DF

df_json = df_customers.toJSON()## 返回RDD, RDD每个元素是JSON对象
df_json.first()
'{"cID":1,"name":"James","age":21,"gender":"M"}'
df_pandas = df_customers.toPandas() ## 返回pandas.DataFrame
df_pandas
cID
name
age gender
0
1
James
21
M
1
2
Liz
25
F
2
3
John
31
M
3
4
Jennifer
45
F
4
5
Robert
41
M
5
6
Sandra
45
F
rdd = df_customers.rdd #然后可以使用RDD的操作
df = rdd.toDF().first()

生成临时查询表

# registerTempTable. 给定名字的临时表, 用SQL进行查询
df_customers.registerTempTable("customers_temp")
df_search = spark.sql('select * from customers_temp where age > 30')
df_search.show()
+---+--------+---+------+
|cID|
name|age|gender|
+---+--------+---+------+
|
3|
John| 31|
M|
|
4|Jennifer| 45|
F|
|
5|
Robert| 41|
M|
|
6|
Sandra| 45|
F|
+---+--------+---+------+
# createGlobalTempView
# createOrReplaceGlobalTempView
创建一个临时永久表,与Spark应该绑定
# createOrReplaceTempView
生命周期与SparkSession绑定
# createTempView

其他函数 crossJion, crosstab, cube, rollup
输出write,保存DataFrame到文件中

## json, parquet, orc, csv,text 格式, 可以写入本地文件系统, HDFS, S3上
import os
df_customers0 = df_customers.coalesce(numPartitions= 1) #设置NumPartition为1
# df_customers0.write.format('json').save("savepath")
# df_customers0.write.orc("savepath")
df_customers0.write.csv("savepath",header=True, sep=",", mode='overwrite')
# mode: 默认error/ append(追加)/ overwrite(重写)/ ignore(不写)
# df_customers0.write.parquet("savepath")

原地址:https://blog.csdn.net/samll_tree/article/details/103317344

最后

以上就是斯文保温杯为你收集整理的pyspark.sql DataFrame创建及常用操作的全部内容,希望文章能够帮你解决pyspark.sql DataFrame创建及常用操作所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(65)

评论列表共有 0 条评论

立即
投稿
返回
顶部