大数据学习之(十九)spark structured streaming

前言

结构化流查询(Structured Streaming Query)对标竞品 Flink

spark_structured_stream_1.png

(一)升级本地spark版本到3.2.0

为了测试 新api toTable()。主要是升级hadoop dll和winutils.exe

1.idea 中 edit configurations

HADOOP_HOME=D:\codes\hadoop-3.2.4

2.C:\Windows\System32

备份旧的hadoop.dll为hadoop.dll_2.6.5,放入3.2.2的hadoop.dll。

winutils/hadoop.dll at master · cdarlint/winutils · GitHub
(事后回滚 hadoop.dll_2.6.5)

3.D:\codes\hadoop-3.2.4\bin

目录中放入winutils.exe.

winutils/winutils.exe at master · cdarlint/winutils · GitHub

(二)要点总结

1.微批处理

内部默认使用微批处理引擎( micro-batch processing engine),它将数据流看作一系列小的批任务(batch jobs)来处理,从而达到端到端如100毫秒这样低的延迟以及只执行一次容错的保证。

ps: 微批处理时每个批次启动一个job,性能损耗较大,exactly-once。

(可以从webUI host:4040/jobs/看到多个批次的多个job)

2.连续处理(Continuous Processing):

然而,从Spark 2.3,我们已经引入了一个新的低延迟处理方式——连续处理(Continuous Processing),可以达到端到端如1毫秒这样低的延迟至少一次保证。不用改变查询中DataSet/DataFrame的操作,你就能够选择基于应用要求的查询模式。

ps: 连续处理,针对所有数据启动一个job来处理,job不会停。周期性的更新checkpoints,at-least-once。

3.基于 事件时间 (event time).

4.checkpoint 维护信息

offset,state,source消费批次信息,sink写出批次信息,针对HDFS写出、RDBMS写出支持exactly-once语义。

5.kafka整合

目前和kafka整合只能实现 at leaset once 语义,即可能重复输出。

6.water marking:

数据延迟到达的上限,过期不候。

Streaming DataSet Api

demo源码

/**
  *  Structured Streaming 实时读取Socket数据
  */
object SSReadSocketData {
  def main(args: Array[String]): Unit = {
    //1.创建SparkSession对象
    val spark: SparkSession = SparkSession.builder().master("local")
      .appName("SSReadSocketData")
      //设置并行度
      .config("spark.sql.shuffle.partitions",1)
      .getOrCreate()

    import spark.implicits._
    spark.sparkContext.setLogLevel("Error")

    //2.读取Socket数据
    val df: DataFrame = spark.readStream
      .format("socket")
      .option("host", "node1")
      .option("port", 9999)
      .load()

    //3.统计wordcount: 对每行数据按照空格进行切分单词
    val words: Dataset[String] = df.as[String]
      .flatMap(line => {
        line.split(" ")
      })

    //4.统计WordCount :按照单词分组,对单词进行计数
    val wordCounts: DataFrame = words.groupBy("value").count()

    //5.打印结果
    val query: StreamingQuery = wordCounts.writeStream
      .format("console")
      .outputMode("complete")
      //指定结果输出模式,默认是append ,除此之外,还有complete 、update
      .start()

    query.awaitTermination()
  }
}

1.启动nc

#node1
nc -l 9999

3.idea Run Demo

4.结果如下

4.1nc输入:

[root@node1 ~]# nc -lk 9999
a b
b c
b d

4.2idea控制台输出:

-------------------------------------------
Batch: 5
-------------------------------------------
+-----+-----+
|value|count|
+-----+-----+
|    d|    1|
|    c|    1|
|    b|    3|
|    a|    2|
+-----+-----+

demo分析

1.减小调度时间

数据少时,减小分区用以调度时间

config("spark.sql.shuffle.partitions",1)

2.DataFrame DataSet互转

处理数据时,如果使用Api方式处理数据,需要将DataFrame 转换成Dataset操作,SQL则用DataFrame。

3.默认value列

Structured Streaming 中默认读取数据得到的DataFrame有“value”列。

4.outputMode输出模式

4.1append

默认输出模式,打印自上批次触发后新增行数据

4.2complete

完整模式,当代码中有聚合操作使用这种模式,打印所有数据,完整输出数据

4.3update

更新模式,打印自上次触发后更新数据会被输出,可以用于代码有聚合或者没有聚合操作情况下

5.Trigger

控制批次间隔。其中 continuous 这个类型比较特殊,处于试验阶段,用来实现连续实时处理,实现1ms的延迟。

对标flink

.trigger(Trigger.Continuous("2 seconds")) //连续处理,每2s 处理以下checkpoint 记录

Streaming Table APIs

spark 3.1之后的版本才有,采用 类SQL的方式编码,更友好。

例如,select,where,groupBy

demo源码

package com.mashibing.stscode.scalacode

import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.{DataFrame, SparkSession}

/**
  *  读取socket数据写入流表,再从流表中读取数据展示到控制台
  */
object StreamTableAPI {
  def main(args: Array[String]): Unit = {
    //1.创建对象
    val spark: SparkSession = SparkSession.builder().master("local")
      .appName("StreamTableAPI")
      .config("spark.sql.shuffle.partitions", 1)
      .config("spark.sql.warehouse.dir","./lyw-spark-warehouse")
//      .config("spark.sql.streaming.checkpointLocation","xxx")
      .getOrCreate()

    spark.sparkContext.setLogLevel("Error")
    import spark.implicits._

    //2.读取socket数据,注册流表
    val df: DataFrame = spark.readStream
      .format("socket")
      .option("host", "node1")
      .option("port", 9999)
      .load()

    //3.对df进行转换
    val personinfo: DataFrame = df.as[String]
      .map(line => {
        val arr: Array[String] = line.split(",")
        (arr(0).toInt, arr(1), arr(2).toInt)
      }).toDF("id", "name", "age")

    //4.将以上personinfo 写入到流表中
    personinfo.writeStream
      .option("checkpointLocation","./checkpoint/lyw-dir1")
      .toTable("mytbl")

    import org.apache.spark.sql.functions._

    //5.读取mytbl 流表中的数据
    val query: StreamingQuery = spark.readStream
      .table("mytbl")
      .withColumn("new_age", col("age").plus(10))
      .select("id", "name", "age", "new_age")
      .writeStream
      .format("console")
      .start()

    query.awaitTermination()
  }

}

1.启动nc

#node1
nc -l 9999

3.idea Run Demo

4.结果如下

4.1nc输入:

[root@node1 ~]# nc -lk 9999
1,zs,18
2,ls,19
3,ww,20
4,ml,21

4.2idea控制台输出:

-------------------------------------------
Batch: 1
-------------------------------------------
+---+----+---+-------+
| id|name|age|new_age|
+---+----+---+-------+
|  1|  zs| 18|     28|
+---+----+---+-------+

-------------------------------------------
Batch: 2
-------------------------------------------
+---+----+---+-------+
| id|name|age|new_age|
+---+----+---+-------+
|  2|  ls| 19|     29|
+---+----+---+-------+

-------------------------------------------
Batch: 3
-------------------------------------------
+---+----+---+-------+
| id|name|age|new_age|
+---+----+---+-------+
|  3|  ww| 20|     30|
+---+----+---+-------+

-------------------------------------------
Batch: 4
-------------------------------------------
+---+----+---+-------+
| id|name|age|new_age|
+---+----+---+-------+
|  4|  ml| 21|     31|
+---+----+---+-------+
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容