【sparkSQL】创建DataFrame

首先我们要创建SparkSession

val spark = SparkSession.builder() .appName("test") .master("local") .getOrCreate()import spark.implicits._ //将RDD转化成为DataFrame并支持SQL操作 

然后我们通过SparkSession来创建DataFrame

1.使用toDF函数创建DataFrame

 通过导入(importing)spark.implicits, 就可以将本地序列(seq), 数组或者RDD转为DataFrame。

 只要这些数据的内容能指定数据类型即可。

import spark.implicits._val df = Seq( (1, "zhangyuhang", java.sql.Date.valueOf("2018-05-15")), (2, "zhangqiuyue", java.sql.Date.valueOf("2018-05-15"))).toDF("id", "name", "created_time")

注意:如果直接用toDF()而不指定列名字,那么默认列名为"_1", "_2"

我们可以通过df.withColumnRenamed("_1", "newName1").withColumnRenamed("_2", "newName2")进行修改列名

2.使用createDataFrame函数创建DataFrame

通过schema + row 来创建

import org.apache.spark.sql.types._//定义dataframe的结构的schemaval schema = StructType(List( StructField("id", IntegerType, nullable = false), StructField("name", StringType, nullable = true), StructField("create_time", DateType, nullable = true)))//定义dataframe内容的rddval rdd = sc.parallelize(Seq( Row(1, "zhangyuhang", java.sql.Date.valueOf("2018-05-15")), Row(2, "zhangqiuyue", java.sql.Date.valueOf("2018-05-15"))))//创建dataframeval df = spark.createDataFrame(rdd, schema)

3.通过文件直接创建DataFrame

 (1)使用parquet文件创建  

val df = spark.read.parquet("hdfs:/path/to/file")

 (2)使用json文件创建

val df = spark.read.json("examples/src/main/resources/people.json")

 (3)使用csv文件创建

val df = spark.read .format("com.databricks.spark.csv") .option("header", "true") //reading the headers .option("mode", "DROPMALFORMED") .load("csv/file/path")

 (4)使用Hive表创建

spark.table("test.person") // 库名.表名 的格式 .registerTempTable("person") // 注册成临时表spark.sql( """ | select * | from person | limit 10 """.stripMargin).show()

记得,最后我们要调用spark.stop()来关闭SparkSession。  

 

 

 

相关文章