首先我们要创建SparkSession
val spark = SparkSession.builder() .appName("test") .master("local") .getOrCreate()import spark.implicits._ //将RDD转化成为DataFrame并支持SQL操作
然后我们通过SparkSession来创建DataFrame
1.使用toDF
函数创建DataFrame
通过导入(importing)spark.implicits, 就可以将本地序列(seq), 数组或者RDD转为DataFrame。
只要这些数据的内容能指定数据类型即可。
import spark.implicits._val df = Seq( (1, "zhangyuhang", java.sql.Date.valueOf("2018-05-15")), (2, "zhangqiuyue", java.sql.Date.valueOf("2018-05-15"))).toDF("id", "name", "created_time")
注意:如果直接用toDF()而不指定列名字,那么默认列名为"_1", "_2"
我们可以通过df.withColumnRenamed("_1", "newName1").withColumnRenamed("_2", "newName2")进行修改列名
2.使用createDataFrame
函数创建DataFrame
通过schema + row 来创建
import org.apache.spark.sql.types._//定义dataframe的结构的schemaval schema = StructType(List( StructField("id", IntegerType, nullable = false), StructField("name", StringType, nullable = true), StructField("create_time", DateType, nullable = true)))//定义dataframe内容的rddval rdd = sc.parallelize(Seq( Row(1, "zhangyuhang", java.sql.Date.valueOf("2018-05-15")), Row(2, "zhangqiuyue", java.sql.Date.valueOf("2018-05-15"))))//创建dataframeval df = spark.createDataFrame(rdd, schema)
3.通过文件直接创建DataFrame
(1)使用parquet文件创建
val df = spark.read.parquet("hdfs:/path/to/file")
(2)使用json文件创建
val df = spark.read.json("examples/src/main/resources/people.json")
(3)使用csv文件创建
val df = spark.read .format("com.databricks.spark.csv") .option("header", "true") //reading the headers .option("mode", "DROPMALFORMED") .load("csv/file/path")
(4)使用Hive表创建
spark.table("test.person") // 库名.表名 的格式 .registerTempTable("person") // 注册成临时表spark.sql( """ | select * | from person | limit 10 """.stripMargin).show()
记得,最后我们要调用spark.stop()来关闭SparkSession。