博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
3、Stream设置checkpoint累加之前的数据
阅读量:6335 次
发布时间:2019-06-22

本文共 1878 字,大约阅读时间需要 6 分钟。

hot3.png

##笔记2中只能够每次累加当前批次的数据,但是不可以累加一开始的数据,这是一个缺陷 #思考: 我们是不是可以先把以前的数据保存到HDFS中,然后每次来的时候把他和当前数据进行累加。 解决: 我们可以使用ssc的checkpoint指定一个目录,比如HDFS。用来保存以前计算的结果。 然后再通过updateStateByKey()取代reduceByKey()进行操作。其他不变。

#代码如下:

package com.liufu.org.streamingimport org.apache.spark.{HashPartitioner, SparkConf}import org.apache.spark.streaming.{Seconds, StreamingContext}import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}/**  * Created by liufu on 2016/11/19.  */object LeiJiaStream {  //定义一个累加的函数,这是updateStateByKey需要的  val updateFunc = (it:Iterator[(String, Seq[Int],Option[Int])]) => {    it.map{case(x,y,z) => (x, y.sum + z.getOrElse(0))}  }  def main(args: Array[String]): Unit = {    val conf = new SparkConf().setAppName("streamTest").setMaster("local[2]")    //创建Streamingcomtext对象,然后指定处理数据的时间间隔    val ssc: StreamingContext = new StreamingContext(conf,Seconds(5))    //设置一个文件目录,用于保存以前数据。    ssc.checkpoint("hdfs://hadoop1:9000/Spark_Stream_Checkpoint")    //读取Linux中nc的数据。    val textStream: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop1",8888)    /**      * DStream的操作和RDD一样,所以只要会了RDD,那么DStream没有问题,一样使用。      */    val flatMap: DStream[String] = textStream.flatMap(_.split(" "))    val wordAndOne: DStream[(String, Int)] = flatMap.map((_,1))    val reduced: DStream[(String, Int)] = wordAndOne.updateStateByKey(updateFunc,new HashPartitioner(ssc.sparkContext.defaultParallelism),true)    reduced.print()    //一定要启动streamContext程序,然后一直等待,否则任务不会提交的。    ssc.start()    ssc.awaitTermination()  }}

#总结: 做累加可以在redies中做,同时也可以在代码中指定一个checkpoint目录,用来保存以前运算的结果。

主要是这个updateStateByKey方法需要传入一个累加函数 函数的定义如下:

//定义一个累加的函数,这是updateStateByKey需要的  val updateFunc = (it:Iterator[(String, Seq[Int],Option[Int])]) => {    it.map{case(x,y,z) => (x, y.sum + z.getOrElse(0))}  }

#在指定的目录下,会产生很多的checkpoint文件

#在指定的目录下,会产生很多的checkpoint文件 输入图片说明

转载于:https://my.oschina.net/liufukin/blog/796499

你可能感兴趣的文章
解决部分月份绩效无法显示的问题:timestamp\union al\autocommit等的用法
查看>>
nginx 域名跳转 Nginx跳转自动到带www域名规则配置、nginx多域名向主域名跳转
查看>>
man openstack >>1.txt
查看>>
linux几大服务器版本大比拼
查看>>
在BT5系统中安装postgresQL
查看>>
【Magedu】Week01
查看>>
写给MongoDB开发者的50条建议Tip25
查看>>
为什么要让带宽制约云计算发展
查看>>
[iOS Animation]-CALayer 绘图效率
查看>>
2012-8-5
查看>>
VS中ProjectDir的值以及$(ProjectDir)../的含义
查看>>
我的友情链接
查看>>
PHP实现排序算法
查看>>
Business Contact Mnanager for Outlook2010
查看>>
9种用户体验设计的状态是必须知道的(五)
查看>>
解决WIN7下组播问题
查看>>
陈松松:视频营销成交率低,这三个因素没到位
查看>>
vmware nat模式原理探究,实现虚拟机跨网段管理
查看>>
JavaSE 学习参考:集合运算
查看>>
【Signals and Systems】 SYLLABUS
查看>>