前言
结构化流查询(Structured Streaming Query)对标竞品 Flink

(一)升级本地spark版本到3.2.0
为了测试 新api toTable()。主要是升级hadoop dll和winutils.exe
1.idea 中 edit configurations
HADOOP_HOME=D:\codes\hadoop-3.2.4
2.C:\Windows\System32
备份旧的hadoop.dll为hadoop.dll_2.6.5,放入3.2.2的hadoop.dll。
winutils/hadoop.dll at master · cdarlint/winutils · GitHub
(事后回滚 hadoop.dll_2.6.5)
3.D:\codes\hadoop-3.2.4\bin
目录中放入winutils.exe.
winutils/winutils.exe at master · cdarlint/winutils · GitHub
(二)要点总结
1.微批处理
内部默认使用微批处理引擎( micro-batch processing engine),它将数据流看作一系列小的批任务(batch jobs)来处理,从而达到端到端如100毫秒这样低的延迟以及只执行一次容错的保证。
ps: 微批处理时每个批次启动一个job,性能损耗较大,exactly-once。
(可以从webUI host:4040/jobs/看到多个批次的多个job)
2.连续处理(Continuous Processing):
然而,从Spark 2.3,我们已经引入了一个新的低延迟处理方式——连续处理(Continuous Processing),可以达到端到端如1毫秒这样低的延迟至少一次保证。不用改变查询中DataSet/DataFrame的操作,你就能够选择基于应用要求的查询模式。
ps: 连续处理,针对所有数据启动一个job来处理,job不会停。周期性的更新checkpoints,at-least-once。
3.基于 事件时间 (event time).
4.checkpoint 维护信息
offset,state,source消费批次信息,sink写出批次信息,针对HDFS写出、RDBMS写出支持exactly-once语义。
5.kafka整合
目前和kafka整合只能实现 at leaset once 语义,即可能重复输出。
6.water marking:
数据延迟到达的上限,过期不候。
Streaming DataSet Api
demo源码
/**
* Structured Streaming 实时读取Socket数据
*/
object SSReadSocketData {
def main(args: Array[String]): Unit = {
//1.创建SparkSession对象
val spark: SparkSession = SparkSession.builder().master("local")
.appName("SSReadSocketData")
//设置并行度
.config("spark.sql.shuffle.partitions",1)
.getOrCreate()
import spark.implicits._
spark.sparkContext.setLogLevel("Error")
//2.读取Socket数据
val df: DataFrame = spark.readStream
.format("socket")
.option("host", "node1")
.option("port", 9999)
.load()
//3.统计wordcount: 对每行数据按照空格进行切分单词
val words: Dataset[String] = df.as[String]
.flatMap(line => {
line.split(" ")
})
//4.统计WordCount :按照单词分组,对单词进行计数
val wordCounts: DataFrame = words.groupBy("value").count()
//5.打印结果
val query: StreamingQuery = wordCounts.writeStream
.format("console")
.outputMode("complete")
//指定结果输出模式,默认是append ,除此之外,还有complete 、update
.start()
query.awaitTermination()
}
}
1.启动nc
#node1
nc -l 9999
3.idea Run Demo
4.结果如下
4.1nc输入:
[root@node1 ~]# nc -lk 9999
a b
b c
b d
4.2idea控制台输出:
-------------------------------------------
Batch: 5
-------------------------------------------
+-----+-----+
|value|count|
+-----+-----+
| d| 1|
| c| 1|
| b| 3|
| a| 2|
+-----+-----+
demo分析
1.减小调度时间
数据少时,减小分区用以调度时间
config("spark.sql.shuffle.partitions",1)
2.DataFrame DataSet互转
处理数据时,如果使用Api方式处理数据,需要将DataFrame 转换成Dataset操作,SQL则用DataFrame。
3.默认value列
Structured Streaming 中默认读取数据得到的DataFrame有“value”列。
4.outputMode输出模式
4.1append
默认输出模式,打印自上批次触发后新增行数据
4.2complete
完整模式,当代码中有聚合操作使用这种模式,打印所有数据,完整输出数据
4.3update
更新模式,打印自上次触发后更新数据会被输出,可以用于代码有聚合或者没有聚合操作情况下
5.Trigger
控制批次间隔。其中 continuous 这个类型比较特殊,处于试验阶段,用来实现连续实时处理,实现1ms的延迟。
对标flink
.trigger(Trigger.Continuous("2 seconds")) //连续处理,每2s 处理以下checkpoint 记录
Streaming Table APIs
spark 3.1之后的版本才有,采用 类SQL的方式编码,更友好。
例如,select,where,groupBy
demo源码
package com.mashibing.stscode.scalacode
import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
* 读取socket数据写入流表,再从流表中读取数据展示到控制台
*/
object StreamTableAPI {
def main(args: Array[String]): Unit = {
//1.创建对象
val spark: SparkSession = SparkSession.builder().master("local")
.appName("StreamTableAPI")
.config("spark.sql.shuffle.partitions", 1)
.config("spark.sql.warehouse.dir","./lyw-spark-warehouse")
// .config("spark.sql.streaming.checkpointLocation","xxx")
.getOrCreate()
spark.sparkContext.setLogLevel("Error")
import spark.implicits._
//2.读取socket数据,注册流表
val df: DataFrame = spark.readStream
.format("socket")
.option("host", "node1")
.option("port", 9999)
.load()
//3.对df进行转换
val personinfo: DataFrame = df.as[String]
.map(line => {
val arr: Array[String] = line.split(",")
(arr(0).toInt, arr(1), arr(2).toInt)
}).toDF("id", "name", "age")
//4.将以上personinfo 写入到流表中
personinfo.writeStream
.option("checkpointLocation","./checkpoint/lyw-dir1")
.toTable("mytbl")
import org.apache.spark.sql.functions._
//5.读取mytbl 流表中的数据
val query: StreamingQuery = spark.readStream
.table("mytbl")
.withColumn("new_age", col("age").plus(10))
.select("id", "name", "age", "new_age")
.writeStream
.format("console")
.start()
query.awaitTermination()
}
}
1.启动nc
#node1
nc -l 9999
3.idea Run Demo
4.结果如下
4.1nc输入:
[root@node1 ~]# nc -lk 9999
1,zs,18
2,ls,19
3,ww,20
4,ml,21
4.2idea控制台输出:
-------------------------------------------
Batch: 1
-------------------------------------------
+---+----+---+-------+
| id|name|age|new_age|
+---+----+---+-------+
| 1| zs| 18| 28|
+---+----+---+-------+
-------------------------------------------
Batch: 2
-------------------------------------------
+---+----+---+-------+
| id|name|age|new_age|
+---+----+---+-------+
| 2| ls| 19| 29|
+---+----+---+-------+
-------------------------------------------
Batch: 3
-------------------------------------------
+---+----+---+-------+
| id|name|age|new_age|
+---+----+---+-------+
| 3| ww| 20| 30|
+---+----+---+-------+
-------------------------------------------
Batch: 4
-------------------------------------------
+---+----+---+-------+
| id|name|age|new_age|
+---+----+---+-------+
| 4| ml| 21| 31|
+---+----+---+-------+
