##笔记2中只能够每次累加当前批次的数据,但是不可以累加一开始的数据,这是一个缺陷 #思考: 我们是不是可以先把以前的数据保存到HDFS中,然后每次来的时候把他和当前数据进行累加。 解决: 我们可以使用ssc的checkpoint指定一个目录,比如HDFS。用来保存以前计算的结果。 然后再通过updateStateByKey()取代reduceByKey()进行操作。其他不变。
#代码如下:
package com.liufu.org.streamingimport org.apache.spark.{HashPartitioner, SparkConf}import org.apache.spark.streaming.{Seconds, StreamingContext}import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}/** * Created by liufu on 2016/11/19. */object LeiJiaStream { //定义一个累加的函数,这是updateStateByKey需要的 val updateFunc = (it:Iterator[(String, Seq[Int],Option[Int])]) => { it.map{case(x,y,z) => (x, y.sum + z.getOrElse(0))} } def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("streamTest").setMaster("local[2]") //创建Streamingcomtext对象,然后指定处理数据的时间间隔 val ssc: StreamingContext = new StreamingContext(conf,Seconds(5)) //设置一个文件目录,用于保存以前数据。 ssc.checkpoint("hdfs://hadoop1:9000/Spark_Stream_Checkpoint") //读取Linux中nc的数据。 val textStream: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop1",8888) /** * DStream的操作和RDD一样,所以只要会了RDD,那么DStream没有问题,一样使用。 */ val flatMap: DStream[String] = textStream.flatMap(_.split(" ")) val wordAndOne: DStream[(String, Int)] = flatMap.map((_,1)) val reduced: DStream[(String, Int)] = wordAndOne.updateStateByKey(updateFunc,new HashPartitioner(ssc.sparkContext.defaultParallelism),true) reduced.print() //一定要启动streamContext程序,然后一直等待,否则任务不会提交的。 ssc.start() ssc.awaitTermination() }}
#总结: 做累加可以在redies中做,同时也可以在代码中指定一个checkpoint目录,用来保存以前运算的结果。
主要是这个updateStateByKey方法需要传入一个累加函数 函数的定义如下:
//定义一个累加的函数,这是updateStateByKey需要的 val updateFunc = (it:Iterator[(String, Seq[Int],Option[Int])]) => { it.map{case(x,y,z) => (x, y.sum + z.getOrElse(0))} }
#在指定的目录下,会产生很多的checkpoint文件
#在指定的目录下,会产生很多的checkpoint文件