Spark 使用手册
俗话说,工欲善其事,必先利其器,Spark无疑是现在大数据计算框架中的一颗明星,就像他的名字Spark~
虽然Spark出道许久,现在市场也有更为先进及时的大数据处理框架 Flink ,但是现在从Spark开始认真学习大数据依然是不错的选择,因为其底层思想即承接了他的前辈MR的优势,又紧跟步伐在 SparkStreaming 中支持流批一体数据处理,在某些不是很极端的情况下,依然是大数据计算引擎中的中流砥柱
本文将采用Scala作为Spark学习的唯一代码
Spark三种连接方式
对我来说,首先要学会用,才能进一步了解底层原理,所以我们先粗浅说一下如何连接Spark到各种数据源~
1、Spark core
val sparkConf: SparkConf = new SparkConf()
.setMaster("local")
.setAppName("name")
val sparkContext = new SparkContext(sparkConf)
//方式一
val value: RDD[String] = sparkContext.textFile("src/main/resources/pvuvdata")
//方式二
val value: RDD[String] = sparkContext.parallelize(arr)
//写出
value.foreach(println)
//关闭
sparkContext.stop()
2、spark SQL
val sparkSession: SparkSession = SparkSession
.builder()
.master("local")
.appName("hello01SparkSql")
.getOrCreate()
//方式一、文本读入
val value: RDD[String] = sparkSession
.sparkContext
.textFile("src/main/resources/emp.txt")
//方式二 指定格式 读入
val dFt1: DataFrame = sparkSession
.read
.format("格式").load("Path")
//方式三,规定格式读入
val dFt2: Dataset[Row] = sparkSession
.read
.格式("Path")
.as("Class类")
//写出到其他地方
df.show()
df.write().mode(SaveMode.Overwrite).format("格式").save("path");
df.write().mode(SaveMode.Overwrite).格式("path");
//关闭
sparkSession.stop()
2.0. DSL查询与SQL查询
2.0.1. DSL查询
- DSL就是用算子进行查询
- 通过样式类 case class 方式转化为dataFrame
- 通过结构化类型 structType 方式转化为dataFrame
//开始转换,
val dataFrame = sparkSession.createDataFrame(empRDD, empStructType)
//将RDD转成DataFrame
import sparkSession.implicits._
val Dataset: Dataset[Emp] = emps.toDS()
2.0.2. SQL查询
createOrReplaceTempView(“表名”) 创建一个零时表,通过SQL方式查询
def main(args: Array[String]): Unit = {
val sparkSession: SparkSession = SparkSession.builder().master("local").appName("StructType").getOrCreate()
val value: RDD[String] = sparkSession.sparkContext.textFile("src/main/resources/emp.txt")
//进行转换(每行都构建一个Emp)
val empRDD: RDD[Row] = value.map(_.split(",")).map(e => Row(e(0).toInt, e(1), e(2), e(3).toInt, e(4), e(5).toDouble, e(6).toDouble, e(7).toInt))
//转成dataFrame
val dfr: DataFrame = sparkSession.createDataFrame(empRDD,empSt)
//创建临时表,使用SQL语法查询
dfr.createOrReplaceTempView("t_emp")
sparkSession.sql("select * from t_emp").show()
}
2.1. 普通文本
//读取
sparkSession.sparkContext.textFile("src/main/resources/emp.txt")
//写出
empSal.saveAsTextFile("src/main/resources/t_emp_sal.txt")
2.2. json
def main(args: Array[String]): Unit = {
val sparkSession: SparkSession = SparkSession.builder().master("local").appName("hello01_sql").getOrCreate()
//方式一:读取
val dFt1: DataFrame = sparkSession.read.json("src/main/resources/t_emp1.txt").as("Emp")
//方式二:读取
//val dFt2: DataFrame = sparkSession.read.format("json").load("src/main/resources/t_emp.txt")
//写出
//dFt1.write.mode(SaveMode.Overwrite).format("json").save("src/main/resources/t_emp1.txt")
//写出 方式二
dFt1.write.mode(SaveMode.Overwrite).json("src/main/resources/t_emp.txt")
dFt1.show()
//dFt2.show()
}
2.3. parquet
def main(args: Array[String]): Unit = {
val sparkSession: SparkSession = SparkSession.builder().master("local").appName("sql04_parquet").getOrCreate()
//读入 方式一
//val dFt1: DataFrame = sparkSession.read.format("parquet").load("src/main/resources/t_emp.txt")
//读入 方式二
val dFt2: Dataset[Row] = sparkSession.read.parquet("src/main/resources/t_emp.txt").as("Emp")
//写出 方式一
//dFt2.write.mode(SaveMode.Overwrite).format("parquet").save("src/main/resources/t_emp1.txt")
//写出 方式二
dFt2.write.mode(SaveMode.Overwrite).parquet("src/main/resources/t_emp1.txt")
dFt2.show()
}
2.4. jdbc
def main(args: Array[String]): Unit = {
//创建SQL环境
val sparkSession: SparkSession = SparkSession.builder().master("local").appName("sql04_parquet").getOrCreate()
//数据库参数
val map = new mutable.HashMap[String, String]()
map.put("url", "jdbc:mysql://localhost:3306/java46?useSSL=false&serverTimezone=UTC&characterEncoding=utf8")
map.put("driver", "com.mysql.cj.jdbc.Driver")
map.put("user", "root")
map.put("password", "root")
map.put("dbtable", "emp")
//读取JDBC数据 方式一
val dFt1: DataFrame = sparkSession.read.format("jdbc").options(map).load()
//数据库参数
val properties = new Properties()
properties.setProperty("driver", "com.mysql.cj.jdbc.Driver")
properties.setProperty("user", "root")
properties.setProperty("password", "root")
//读取JDBC数据 方式二
val dFt2: DataFrame = sparkSession.read
.jdbc("jdbc:mysql://localhost:3306/java46?useSSL=false&serverTimezone=UTC&characterEncoding=utf8", "emp", properties)
//写入JDBC数据 方式一
dFt2.write.mode(SaveMode.Overwrite)
.jdbc("jdbc:mysql://localhost:3306/java46?serverTimezone=UTC&characterEncoding=utf8&useSSL=false","t_emp",properties)
//写入JDBC数据 方式二
map.put("dbtable", "t_emp3")
dFt1.write.mode(SaveMode.Overwrite).format("jdbc")
.options(map).save("jdbc:mysql://localhost:3306/java46?serverTimezone=UTC&characterEncoding=utf8&useSSL=false")
dFt1.show()
dFt2.show()
}
2.5. hive
拷贝配置文件,从linux上拷贝hadoop、hive配置
hdfs-site.xml
core-site.xml
hive-site.xml
package com.yjxxtimport org.apache.spark.sql.SparkSession
object HelloSourceHive {
def main(args: Array[String]): Unit = {
//搭建环境
val spark =SparkSession.builder().master("local").appName("HelloSourceHive").enableHiveSupport().getOrCreate()
//操作数据
spark.sql("use yjx")
val dataFrame = spark.sql("select * from t_user")
dataFrame.show()
//关闭
sparkSession.stop()
}
}
3、spark streaming
def main(args: Array[String]): Unit = {
//搭建环境
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("App")
//设置参数
val streamingContext = new StreamingContext(sparkConf, streaming.Seconds(5))
//获取数据
val value: ReceiverInputDStream[String] = streamingContext.socketTextStream("localhost", 9999)
//操作数据
val dStream: DStream[(String, Int)] = value.flatMap(_.split("\\s")).map((_, 1)).reduceByKey((x, y) => x + y)
//打印数据
dStream.print()
//开启服务
streamingContext.start()
//等待停止
streamingContext.awaitTermination()
//关闭服务
streamingContext.stop(false)
}
3.2. streaming+kafka
首先要启动zkserver
再启动kafka
def main(args: Array[String]): Unit = {
//搭建环境
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]")
.setAppName("stream05_kafka")
.set("spark.streaming.stopGracefullyOnShutdown","true")
//设置streaming封装间隔
val streamingContext = new StreamingContext(sparkConf, streaming.Seconds(3))
//kafka配置
val kafkaPar: Map[String, Object] = Map[String, Object](
"bootstrap.servers" -> "node01:9092,node02:9092,node03:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "yjx_bigdata",
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (true: lang.Boolean)
)
//创建主题
val topics: Array[String] = Array("userlog")
//创建kafka
val kfDStream: InputDStream[ConsumerRecord[String, String]]
= KafkaUtils.createDirectStream(streamingContext, PreferConsistent, Subscribe[String, String](topics, kafkaPar))
//编辑数据
val result: DStream[(String, Int)] = kfDStream.map(_.value()).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
//打印数据
result.print()
//result.saveAsHadoopFiles("yjx", "txt")
//开启streaming
streamingContext.start()
streamingContext.awaitTermination()
//关闭streaming
streamingContext.stop(false)
}