Spark 使用手册


Spark 使用手册

俗话说,工欲善其事,必先利其器,Spark无疑是现在大数据计算框架中的一颗明星,就像他的名字Spark~

虽然Spark出道许久,现在市场也有更为先进及时的大数据处理框架 Flink ,但是现在从Spark开始认真学习大数据依然是不错的选择,因为其底层思想即承接了他的前辈MR的优势,又紧跟步伐在 SparkStreaming 中支持流批一体数据处理,在某些不是很极端的情况下,依然是大数据计算引擎中的中流砥柱

本文将采用Scala作为Spark学习的唯一代码

Spark三种连接方式

对我来说,首先要学会用,才能进一步了解底层原理,所以我们先粗浅说一下如何连接Spark到各种数据源~

1、Spark core

val sparkConf: SparkConf = new SparkConf()
.setMaster("local")
.setAppName("name")
val sparkContext = new SparkContext(sparkConf)
//方式一
val value: RDD[String] = sparkContext.textFile("src/main/resources/pvuvdata")
//方式二
val value: RDD[String] = sparkContext.parallelize(arr)
//写出
value.foreach(println)
//关闭
sparkContext.stop()

2、spark SQL

val sparkSession: SparkSession = SparkSession
.builder()
.master("local")
.appName("hello01SparkSql")
.getOrCreate()
//方式一、文本读入
val value: RDD[String] = sparkSession
.sparkContext
.textFile("src/main/resources/emp.txt")

//方式二 指定格式 读入 
val dFt1: DataFrame = sparkSession
.read
.format("格式").load("Path")
//方式三,规定格式读入 
 val dFt2: Dataset[Row] = sparkSession
.read
.格式("Path")
.as("Class类")

//写出到其他地方
df.show()
df.write().mode(SaveMode.Overwrite).format("格式").save("path");
df.write().mode(SaveMode.Overwrite).格式("path");

//关闭
sparkSession.stop()

2.0. DSL查询与SQL查询

2.0.1. DSL查询
  • DSL就是用算子进行查询
    • 通过样式类 case class 方式转化为dataFrame
    • 通过结构化类型 structType 方式转化为dataFrame
//开始转换,
val dataFrame = sparkSession.createDataFrame(empRDD, empStructType)

//将RDD转成DataFrame
import sparkSession.implicits._
val Dataset: Dataset[Emp] = emps.toDS()
2.0.2. SQL查询

createOrReplaceTempView(“表名”) 创建一个零时表,通过SQL方式查询

def main(args: Array[String]): Unit = {
    val sparkSession: SparkSession = SparkSession.builder().master("local").appName("StructType").getOrCreate()
    val value: RDD[String] = sparkSession.sparkContext.textFile("src/main/resources/emp.txt")
    //进行转换(每行都构建一个Emp)
    val empRDD: RDD[Row] = value.map(_.split(",")).map(e => Row(e(0).toInt, e(1), e(2), e(3).toInt, e(4), e(5).toDouble, e(6).toDouble, e(7).toInt))
    //转成dataFrame
    val dfr: DataFrame = sparkSession.createDataFrame(empRDD,empSt)
    //创建临时表,使用SQL语法查询
    dfr.createOrReplaceTempView("t_emp")
    sparkSession.sql("select * from t_emp").show()

}

2.1. 普通文本

//读取
sparkSession.sparkContext.textFile("src/main/resources/emp.txt")
//写出
empSal.saveAsTextFile("src/main/resources/t_emp_sal.txt")

2.2. json

def main(args: Array[String]): Unit = {
    val sparkSession: SparkSession = SparkSession.builder().master("local").appName("hello01_sql").getOrCreate()
    //方式一:读取
    val dFt1: DataFrame = sparkSession.read.json("src/main/resources/t_emp1.txt").as("Emp")
    //方式二:读取
    //val dFt2: DataFrame = sparkSession.read.format("json").load("src/main/resources/t_emp.txt")
//写出
//dFt1.write.mode(SaveMode.Overwrite).format("json").save("src/main/resources/t_emp1.txt")
//写出 方式二
    dFt1.write.mode(SaveMode.Overwrite).json("src/main/resources/t_emp.txt")

    dFt1.show()
//dFt2.show()
}

2.3. parquet

def main(args: Array[String]): Unit = {
    val sparkSession: SparkSession = SparkSession.builder().master("local").appName("sql04_parquet").getOrCreate()
    //读入 方式一
    //val dFt1: DataFrame = sparkSession.read.format("parquet").load("src/main/resources/t_emp.txt")
    //读入 方式二
    val dFt2: Dataset[Row] = sparkSession.read.parquet("src/main/resources/t_emp.txt").as("Emp")
    //写出 方式一
//dFt2.write.mode(SaveMode.Overwrite).format("parquet").save("src/main/resources/t_emp1.txt")
    //写出 方式二
    dFt2.write.mode(SaveMode.Overwrite).parquet("src/main/resources/t_emp1.txt")

    dFt2.show()
}

2.4. jdbc

def main(args: Array[String]): Unit = {

    //创建SQL环境
    val sparkSession: SparkSession = SparkSession.builder().master("local").appName("sql04_parquet").getOrCreate()
    //数据库参数
    val map = new mutable.HashMap[String, String]()
    map.put("url", "jdbc:mysql://localhost:3306/java46?useSSL=false&serverTimezone=UTC&characterEncoding=utf8")
    map.put("driver", "com.mysql.cj.jdbc.Driver")
    map.put("user", "root")
    map.put("password", "root")
    map.put("dbtable", "emp")
    //读取JDBC数据 方式一
    val dFt1: DataFrame = sparkSession.read.format("jdbc").options(map).load()

    //数据库参数
    val properties = new Properties()
    properties.setProperty("driver", "com.mysql.cj.jdbc.Driver")
    properties.setProperty("user", "root")
    properties.setProperty("password", "root")
    //读取JDBC数据 方式二
    val dFt2: DataFrame = sparkSession.read
      .jdbc("jdbc:mysql://localhost:3306/java46?useSSL=false&serverTimezone=UTC&characterEncoding=utf8", "emp", properties)

    //写入JDBC数据 方式一
    dFt2.write.mode(SaveMode.Overwrite)
      .jdbc("jdbc:mysql://localhost:3306/java46?serverTimezone=UTC&characterEncoding=utf8&useSSL=false","t_emp",properties)

    //写入JDBC数据 方式二
    map.put("dbtable", "t_emp3")
    dFt1.write.mode(SaveMode.Overwrite).format("jdbc")
      .options(map).save("jdbc:mysql://localhost:3306/java46?serverTimezone=UTC&characterEncoding=utf8&useSSL=false")

    dFt1.show()
    dFt2.show()
}

2.5. hive

拷贝配置文件,从linux上拷贝hadoop、hive配置

hdfs-site.xml

core-site.xml

hive-site.xml

package com.yjxxtimport org.apache.spark.sql.SparkSession
    object HelloSourceHive {
    def main(args: Array[String]): Unit = {
        //搭建环境
        val spark =SparkSession.builder().master("local").appName("HelloSourceHive").enableHiveSupport().getOrCreate()
        //操作数据
        spark.sql("use yjx")
        val dataFrame = spark.sql("select * from t_user")
        dataFrame.show()
        //关闭
        sparkSession.stop()
    }
}

3、spark streaming

def main(args: Array[String]): Unit = {
  //搭建环境
  val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("App")
  //设置参数
  val streamingContext = new StreamingContext(sparkConf, streaming.Seconds(5))
  //获取数据
  val value: ReceiverInputDStream[String] = streamingContext.socketTextStream("localhost", 9999)
  //操作数据
  val dStream: DStream[(String, Int)] = value.flatMap(_.split("\\s")).map((_, 1)).reduceByKey((x, y) => x + y)
  //打印数据
  dStream.print()
    
  //开启服务
  streamingContext.start()
  //等待停止
  streamingContext.awaitTermination()
  //关闭服务
  streamingContext.stop(false)
}

3.2. streaming+kafka

首先要启动zkserver
再启动kafka

def main(args: Array[String]): Unit = {
    //搭建环境
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]")
    .setAppName("stream05_kafka")
    .set("spark.streaming.stopGracefullyOnShutdown","true")
    //设置streaming封装间隔
    val streamingContext = new StreamingContext(sparkConf, streaming.Seconds(3))
    //kafka配置
    val kafkaPar: Map[String, Object] = Map[String, Object](
      "bootstrap.servers" -> "node01:9092,node02:9092,node03:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "yjx_bigdata",
      "auto.offset.reset" -> "earliest",
      "enable.auto.commit" -> (true: lang.Boolean)
    )
    //创建主题
    val topics: Array[String] = Array("userlog")
    //创建kafka
    val kfDStream: InputDStream[ConsumerRecord[String, String]]
        = KafkaUtils.createDirectStream(streamingContext, PreferConsistent, Subscribe[String, String](topics, kafkaPar))
    //编辑数据
    val result: DStream[(String, Int)] = kfDStream.map(_.value()).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
    //打印数据
    result.print()
    //result.saveAsHadoopFiles("yjx", "txt")

    //开启streaming
    streamingContext.start()
    streamingContext.awaitTermination()
    //关闭streaming
    streamingContext.stop(false)
}

文章作者: kanaikee
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 kanaikee !
  目录