spark sql加载txt文件
Michael, 29
Andy, 30
Justin, 19
import findspark
import pyspark
from __future__ import print_function
# $example on:init_session$
from pyspark.sql import SparkSession
# $example off:init_session$
# $example on:schema_inferring$
from pyspark.sql import Row
# $example off:schema_inferring$
# $example on:programmatic_schema$
# Import data types
from pyspark.sql.types import *
# $example off:programmatic_schema$
import os
if __name__ == "__main__":
# $example on:init_session$
spark = SparkSession
.appName("Python Spark SQL basic example")
.config("spark.some.config.option", "some-value")
# 初始化session
# 使用反射获取schema
# Spark SQL可以将Row对象格式的RDD转换成DataFrame,并推断其类型。Rows是通过向Row类传入一个key/value对列表作为关键字参数来构建。列表的keys定义了表的列名,通过抽样整个数据集来推断类型,类似于推断json文件。
# 步骤大致以下两步:
# 1、将原来的RDD转换成Row格式的RDD。
# 2、通过SparkSession提供的createDataFrame创建一个DataFrame。
# 之后就可以通过DataFrame的createOrReplaceTempView("tablename")将其创建或者替换一个临时视图,即表tablename。就可以用spark.sql方法在表tablename上运行SQL语句了。
# $example off:init_session$
# df = spark.read.json("C:/file/spark_package/spark-2.4.4-bin-hadoop2.7/examples/src/main/resources/people.json")
sc = spark.sparkContext
# 加载txt文本文件
# Load a text file and convert each line to a Row.
lines = sc.textFile("C:/file/spark_package/spark-2.4.4-bin-hadoop2.7/examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))
# Infer the schema, and register the DataFrame as a table.
schemaPeople = spark.createDataFrame(people)
# schemaPeople.show()
# SQL can be run over DataFrames that have been registered as a table.
teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
# # The results of SQL queries are Dataframe objects.
# # rdd returns the content as an :class:`pyspark.RDD` of :class:`Row`.
# teenNames = teenagers.rdd.map(lambda p: "Name: " + p.name).collect()
# for name in teenNames:
# print(name)
# # Name: Justin
# # $example off:schema_inferring$
