概述
spark sql加载txt文件
#文件内容
Michael, 29
Andy, 30
Justin, 19
#方法2需要复制这三行
import findspark
findspark.init()
import pyspark
from __future__ import print_function
# $example on:init_session$
from pyspark.sql import SparkSession
# $example off:init_session$
# $example on:schema_inferring$
from pyspark.sql import Row
# $example off:schema_inferring$
# $example on:programmatic_schema$
# Import data types
from pyspark.sql.types import *
# $example off:programmatic_schema$
import os
if __name__ == "__main__":
# $example on:init_session$
spark = SparkSession
.builder
.appName("Python Spark SQL basic example")
.config("spark.some.config.option", "some-value")
.getOrCreate()
# 初始化session
# 使用反射获取schema
# Spark SQL可以将Row对象格式的RDD转换成DataFrame,并推断其类型。Rows是通过向Row类传入一个key/value对列表作为关键字参数来构建。列表的keys定义了表的列名,通过抽样整个数据集来推断类型,类似于推断json文件。
# 步骤大致以下两步:
# 1、将原来的RDD转换成Row格式的RDD。
# 2、通过SparkSession提供的createDataFrame创建一个DataFrame。
# 之后就可以通过DataFrame的createOrReplaceTempView("tablename")将其创建或者替换一个临时视图,即表tablename。就可以用spark.sql方法在表tablename上运行SQL语句了。
# $example off:init_session$
# df = spark.read.json("C:/file/spark_package/spark-2.4.4-bin-hadoop2.7/examples/src/main/resources/people.json")
sc = spark.sparkContext
# 加载txt文本文件
# Load a text file and convert each line to a Row.
lines = sc.textFile("C:/file/spark_package/spark-2.4.4-bin-hadoop2.7/examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))
# Infer the schema, and register the DataFrame as a table.
schemaPeople = spark.createDataFrame(people)
schemaPeople.createOrReplaceTempView("people")
# schemaPeople.show()
# SQL can be run over DataFrames that have been registered as a table.
teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
# # The results of SQL queries are Dataframe objects.
# # rdd returns the content as an :class:`pyspark.RDD` of :class:`Row`.
# teenNames = teenagers.rdd.map(lambda p: "Name: " + p.name).collect()
# for name in teenNames:
# print(name)
# # Name: Justin
# # $example off:schema_inferring$
官网内容
def schema_inference_example(spark):
# $example on:schema_inferring$
sc = spark.sparkContext
# Load a text file and convert each line to a Row.
lines = sc.textFile("examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))
# Infer the schema, and register the DataFrame as a table.
schemaPeople = spark.createDataFrame(people)
schemaPeople.createOrReplaceTempView("people")
# SQL can be run over DataFrames that have been registered as a table.
teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
# The results of SQL queries are Dataframe objects.
# rdd returns the content as an :class:`pyspark.RDD` of :class:`Row`.
teenNames = teenagers.rdd.map(lambda p: "Name: " + p.name).collect()
for name in teenNames:
print(name)
# Name: Justin
# $example off:schema_inferring$
最后
以上就是虚拟小伙为你收集整理的spark sql加载txt文件01的全部内容,希望文章能够帮你解决spark sql加载txt文件01所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复