1 package com.spark_sql 2 3 import java.util.Properties 4 import org.apache.spark.sql.{DataFrame, SparkSession} 5 6 object DataFromMysql { 7 def main(args: Array[String]): Unit = { 8 //todo:1、创建sparkSession对象 9 val spark: SparkSession = SparkSession.builder().appName("DataFromMysql").master("local[2]").getOrCreate()10 //todo:2、创建Properties对象,设置连接mysql的用户名和密码11 val properties: Properties = new Properties()12 properties.setProperty("user", "root")13 properties.setProperty("password", "123")14 //todo:3、读取mysql中的数据15 val mysqlDF: DataFrame = spark.read.jdbc("jdbc:mysql://localhost:3306/spark?serverTimezone=UTC", "student", properties)16 //todo:4、显示mysql中表的数据17 mysqlDF.show()18 spark.stop()19 }20 21 }
View Code
1 package com.spark_sql 2 3 import java.util.Properties 4 import org.apache.spark.sql.{DataFrame, SparkSession} 5 6 object DataFromMysql { 7 def main(args: Array[String]): Unit = { 8 //todo:1、创建sparkSession对象 9 val spark: SparkSession = SparkSession.builder().appName("DataFromMysql").master("local[2]").getOrCreate()10 //todo:2、创建Properties对象,设置连接mysql的用户名和密码11 val properties: Properties = new Properties()12 properties.setProperty("user", "root")13 properties.setProperty("password", "123")14 //todo:3、读取mysql中的数据15 val mysqlDF: DataFrame = spark.read.jdbc("jdbc:mysql://localhost:3306/spark?serverTimezone=UTC", "student", properties)16 //todo:4、显示mysql中表的数据17 mysqlDF.show()18 spark.stop()19 }20 21 }
1 package com.spark_sql 2 3 import org.apache.spark.SparkContext 4 import org.apache.spark.rdd.RDD 5 import org.apache.spark.sql.{DataFrame, SparkSession} 6 7 object InferringSchema { 8 def main(args: Array[String]): Unit = { 9 //todo:1、构建sparkSession 指定appName和master的地址10 val spark: SparkSession = SparkSession.builder().appName("InferringSchema").master("local[2]").getOrCreate()11 //todo:2、从sparkSession获取sparkContext对象12 val sc: SparkContext = spark.sparkContext13 sc.setLogLevel("WARN") //设置日志输出级别14 //todo:3、加载数据15 val dataRDD: RDD[String] = sc.textFile("D:\\IDEA_Maven\\day08\\src\\main\\resources\\Person.txt")16 //todo:4、切分每一行记录17 val lineArrayRDD: RDD[Array[String]] = dataRDD.map(_.split(" "))18 //todo:5、将RDD与Person类关联19 val personRDD: RDD[Person] = lineArrayRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt))20 //todo:6、创建dataFrame,需要导入隐式转换21 import spark.implicits._22 val personDF: DataFrame = personRDD.toDF()23 24 //todo-------------------DSL语法操作 start--------------25 //1、显示DataFrame的数据,默认显示20行26 personDF.show()27 //2、显示DataFrame的schema信息28 personDF.printSchema()29 //3、显示DataFrame记录数30 println(personDF.count())31 //4、显示DataFrame的所有字段32 personDF.columns.foreach(println)33 //5、取出DataFrame的第一行记录34 println(personDF.head())35 //6、显示DataFrame中name字段的所有值36 personDF.select("name").show()37 //7、过滤出DataFrame中年龄大于30的记录38 personDF.filter($"age" > 30).show()39 //8、统计DataFrame中年龄大于30的人数40 println(personDF.filter($"age" > 30).count())41 //9、统计DataFrame中按照年龄进行分组,求每个组的人数42 personDF.groupBy("age").count().show()43 //todo-------------------DSL语法操作 end-------------44 45 //todo--------------------SQL操作风格 start-----------46 //todo:将DataFrame注册成表47 personDF.createOrReplaceTempView("t_person")48 //todo:传入sql语句,进行操作49 50 spark.sql("select * from t_person").show()51 52 spark.sql("select * from t_person where name=‘zhangsan‘").show()53 54 spark.sql("select * from t_person order by age desc").show()55 //todo--------------------SQL操作风格 end-------------56 57 58 sc.stop()59 }60 }61 62 case class Person (val id:Int,val name: String, val age: Int)
1 package com.spark_sql 2 3 import org.apache.spark.SparkContext 4 import org.apache.spark.rdd.RDD 5 import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} 6 import org.apache.spark.sql.{DataFrame, Row, SparkSession} 7 8 object SparkSqlSchema { 9 def main(args: Array[String]): Unit = {10 //todo:1、创建SparkSession,指定appName和master11 val spark: SparkSession = SparkSession.builder().appName("SparkSqlSchema").master("local[2]").getOrCreate()12 //todo:2、获取sparkContext对象13 val sc: SparkContext = spark.sparkContext14 //todo:3、加载数据15 val dataRDD: RDD[String] = sc.textFile("D:\\IDEA_Maven\\day08\\src\\main\\resources\\Person.txt")16 //todo:4、切分每一行17 val dataArrayRDD: RDD[Array[String]] = dataRDD.map(_.split(" "))18 //todo:5、加载数据到Row对象中19 val personRDD: RDD[Row] = dataArrayRDD.map(x => Row(x(0).toInt, x(1), x(2).toInt))20 //todo:6、创建schema21 val schema: StructType = StructType(Seq(22 StructField("id", IntegerType, false),23 StructField("name", StringType, false),24 StructField("age", IntegerType, false)25 ))26 27 //todo:7、利用personRDD与schema创建DataFrame28 val personDF: DataFrame = spark.createDataFrame(personRDD, schema)29 30 //todo:8、DSL操作显示DataFrame的数据结果31 personDF.show()32 33 //todo:9、将DataFrame注册成表34 personDF.createOrReplaceTempView("t_person")35 36 //todo:10、sql语句操作37 spark.sql("select * from t_person").show()38 39 spark.sql("select count(*) from t_person").show()40 41 42 sc.stop()43 }44 }
1 package com.spark_sql 2 3 import java.util.Properties 4 import org.apache.spark.rdd.RDD 5 import org.apache.spark.sql.{DataFrame, Dataset, SaveMode, SparkSession} 6 7 object SparkSqlToMysql { 8 def main(args: Array[String]): Unit = { 9 //val spark: SparkSession = SparkSession.builder().appName("DataFromMysql").master("local[2]").getOrCreate()10 //todo:1、创建sparkSession对象11 val spark: SparkSession = SparkSession.builder().appName("SparkSqlToMysql").master("local[2]").getOrCreate()12 //todo:2、读取数据13 val data: RDD[String] = spark.sparkContext.textFile("D:\\IDEA_Maven\\day08\\src\\main\\resources\\Person.txt")14 //todo:3、切分每一行,15 val arrRDD: RDD[Array[String]] = data.map(_.split(" "))16 //todo:4、RDD关联Student17 val studentRDD: RDD[student01] = arrRDD.map(x => student01(x(0).toInt, x(1), x(2).toInt))18 //todo:导入隐式转换19 import spark.implicits._20 //todo:5、将RDD转换成DataFrame21 val studentDF: DataFrame = studentRDD.toDF()22 //todo:6、将DataFrame注册成表23 studentDF.createOrReplaceTempView("student")24 //todo:7、操作student表 ,按照年龄进行降序排列25 val resultDF: DataFrame = spark.sql("select * from student order by age desc")26 27 //todo:8、把结果保存在mysql表中28 //todo:创建Properties对象,配置连接mysql的用户名和密码29 val prop = new Properties()30 prop.setProperty("user", "root")31 prop.setProperty("password", "123")32 33 resultDF.write.jdbc("jdbc:mysql://localhost:3306/spark?serverTimezone=UTC", "student01", prop)34 35 //todo:写入mysql时,可以配置插入mode,overwrite覆盖,append追加,ignore忽略,error默认表存在报错36 //resultDF.write.mode(SaveMode.Overwrite).jdbc("jdbc:mysql://192.168.200.150:3306/spark","student",prop)37 spark.stop()38 }39 }40 41 //todo:创建样例类Student42 case class student01(id: Int, name: String, age: Int)
1 package com.SparkStreaming_Flume_Poll 2 3 import java.net.InetSocketAddress 4 import org.apache.spark.storage.StorageLevel 5 import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} 6 import org.apache.spark.streaming.flume.{FlumeUtils, SparkFlumeEvent} 7 import org.apache.spark.streaming.{Seconds, StreamingContext} 8 import org.apache.spark.{SparkConf, SparkContext} 9 10 object SparkStreaming_Flume_Poll {11 12 //newValues 表示当前批次汇总成的(word,1)中相同单词的所有的113 //runningCount 历史的所有相同key的value总和14 def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {15 val newCount = runningCount.getOrElse(0) + newValues.sum16 Some(newCount)17 }18 19 20 def main(args: Array[String]): Unit = {21 //配置sparkConf参数22 val sparkConf: SparkConf = new SparkConf().setAppName("SparkStreaming_Flume_Poll").setMaster("local[2]")23 //构建sparkContext对象24 val sc: SparkContext = new SparkContext(sparkConf)25 //设置日志级别26 sc.setLogLevel("WARN")27 //构建StreamingContext对象,每个批处理的时间间隔28 val scc: StreamingContext = new StreamingContext(sc, Seconds(5))29 //设置checkpoint30 scc.checkpoint("./")31 //设置flume的地址,可以设置多台32 val address = Seq(new InetSocketAddress("192.168.107.144", 8888))33 // 从flume中拉取数据34 val flumeStream: ReceiverInputDStream[SparkFlumeEvent] = FlumeUtils.createPollingStream(scc, address, StorageLevel.MEMORY_AND_DISK)35 36 //获取flume中数据,数据存在event的body中,转化为String37 val lineStream: DStream[String] = flumeStream.map(x => new String(x.event.getBody.array()))38 //实现单词汇总39 val result: DStream[(String, Int)] = lineStream.flatMap(_.split(" ")).map((_, 1)).updateStateByKey(updateFunction)40 41 result.print()42 scc.start()43 scc.awaitTermination()44 }45 46 47 }