25-SparkStreaming01

Spark Streaming

基于Spark之上的流处理

流:source ==> compute ==> store

离线是特殊的流

letting you write streaming jobs

the same way you write batch jobs

out of the box  开箱即用 OOTB

编程模型:DStream : represents a continuous stream of data

Core:RDD

SQL:  DF/DS

Streaming入口:StreamingContext

Core:SparkContext

SQL:

SparkSession

SQLContext/HiveContext

import org.apache.spark._

import org.apache.spark.streaming._

val ssc = new StreamingContext(sc, Seconds(1))

val lines = ssc.socketTextStream("localhost", 9999)

val words = lines.flatMap(_.split(" "))

val pairs = words.map(word => (word, 1))

val wordCounts = pairs.reduceByKey(_ + _)

wordCounts.print()

ssc.start()           

ssc.awaitTermination() 

Streaming job running receiver 0  *******

18/09/07 22:42:41 WARN StreamingContext:

spark.master should be set as local[n], n > 1

in local mode

if you have receivers to get data,

otherwise Spark jobs will not get resources

to process the received data.

socket: 有receiver 占用一个core

对DStream做一个操作,其实就是对这个DStream底层的所有RDD都做相同的操作

import org.apache.spark._

import org.apache.spark.streaming._

val ssc = new StreamingContext(sc, Seconds(10))

val lines = ssc.textFileStream("/streaming/input/")

val words = lines.flatMap(_.split("\t"))

val pairs = words.map(word => (word, 1))

val wordCounts = pairs.reduceByKey(_ + _)

wordCounts.print()

ssc.start()

ssc.awaitTermination()

Exception in thread "main" java.lang.IllegalArgumentException:

requirement failed:

The checkpoint directory has not been set.

Please set it by StreamingContext.checkpoint().

def updateFunction(currentValues: Seq[Int], preValues: Option[Int]): Option[Int] = {

val curr = currentValues.sum

val pre = preValues.getOrElse(0)

Some(curr + pre)

}

val ssc = new StreamingContext(sc, Seconds(10))

ssc.checkpoint("/streaming/checkpoint/")

val lines = ssc.socketTextStream("hadoop000",8888)

val words = lines.flatMap(_.split(" "))

val pairs = words.map(word => (word, 1))

val result = pairs.updateStateByKey(updateFunction)

result.print()

ssc.start()

ssc.awaitTermination()

./spark-submit \

--master local[2] \

--name StreamingStateApp \

--class com.ruozedata.spark.streaming.day01.StreamingStateApp \

/home/hadoop/lib/g3-spark-1.0.jar

(love,3)

(juren,2)

(you,3)

(ruoze,2)

(say,2)

(i,3)

(zidong,4)

  juren say

  juren say

©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容