我是靠谱客的博主 虚拟小伙,最近开发中收集的这篇文章主要介绍spark sql加载txt文件01,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

spark sql加载txt文件

#文件内容
Michael, 29
Andy, 30
Justin, 19

#方法2需要复制这三行
import findspark
findspark.init()
import pyspark 
from __future__ import print_function

# $example on:init_session$
from pyspark.sql import SparkSession
# $example off:init_session$

# $example on:schema_inferring$
from pyspark.sql import Row
# $example off:schema_inferring$

# $example on:programmatic_schema$
# Import data types
from pyspark.sql.types import *
# $example off:programmatic_schema$

import os

if __name__ == "__main__":
    # $example on:init_session$
    spark = SparkSession 
        .builder 
        .appName("Python Spark SQL basic example") 
        .config("spark.some.config.option", "some-value") 
        .getOrCreate()
#     初始化session
# 使用反射获取schema
# Spark SQL可以将Row对象格式的RDD转换成DataFrame,并推断其类型。Rows是通过向Row类传入一个key/value对列表作为关键字参数来构建。列表的keys定义了表的列名,通过抽样整个数据集来推断类型,类似于推断json文件。
# 步骤大致以下两步:
# 1、将原来的RDD转换成Row格式的RDD。
# 2、通过SparkSession提供的createDataFrame创建一个DataFrame。
# 之后就可以通过DataFrame的createOrReplaceTempView("tablename")将其创建或者替换一个临时视图,即表tablename。就可以用spark.sql方法在表tablename上运行SQL语句了。

    # $example off:init_session$
# df = spark.read.json("C:/file/spark_package/spark-2.4.4-bin-hadoop2.7/examples/src/main/resources/people.json")
    sc = spark.sparkContext
# 加载txt文本文件
    # Load a text file and convert each line to a Row.
    lines = sc.textFile("C:/file/spark_package/spark-2.4.4-bin-hadoop2.7/examples/src/main/resources/people.txt")
    parts = lines.map(lambda l: l.split(","))
    people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))

    # Infer the schema, and register the DataFrame as a table.
    schemaPeople = spark.createDataFrame(people)
    schemaPeople.createOrReplaceTempView("people")
#     schemaPeople.show()
    # SQL can be run over DataFrames that have been registered as a table.
    teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

#     # The results of SQL queries are Dataframe objects.
#     # rdd returns the content as an :class:`pyspark.RDD` of :class:`Row`.
#     teenNames = teenagers.rdd.map(lambda p: "Name: " + p.name).collect()
#     for name in teenNames:
#         print(name)
#     # Name: Justin
#     # $example off:schema_inferring$

官网内容

def schema_inference_example(spark):
    # $example on:schema_inferring$
    sc = spark.sparkContext

    # Load a text file and convert each line to a Row.
    lines = sc.textFile("examples/src/main/resources/people.txt")
    parts = lines.map(lambda l: l.split(","))
    people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))

    # Infer the schema, and register the DataFrame as a table.
    schemaPeople = spark.createDataFrame(people)
    schemaPeople.createOrReplaceTempView("people")

    # SQL can be run over DataFrames that have been registered as a table.
    teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

    # The results of SQL queries are Dataframe objects.
    # rdd returns the content as an :class:`pyspark.RDD` of :class:`Row`.
    teenNames = teenagers.rdd.map(lambda p: "Name: " + p.name).collect()
    for name in teenNames:
        print(name)
    # Name: Justin
    # $example off:schema_inferring$

最后

以上就是虚拟小伙为你收集整理的spark sql加载txt文件01的全部内容,希望文章能够帮你解决spark sql加载txt文件01所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(56)

评论列表共有 0 条评论

立即
投稿
返回
顶部