本文共 1808 字,大约阅读时间需要 6 分钟。
Streaming处理socket数据源或者本地/hdfs上的数据源。
SparkStreaming处理socket源的数据,并进行wordcount的统计。
package Sparkimport org.apache.spark.SparkConfimport org.apache.spark.streaming.{Seconds, StreamingContext}/** * spark Streaming 处理socket数据 * * 使用nc测试nc -lk 6789 */object NetworkWordCount { def main(args: Array[String]): Unit = { val sparkConf=new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount") /*** * 创建StreamingContext需要sparkConf和batch interval */ val ssc=new StreamingContext(sparkConf,Seconds(5)) val lines=ssc.socketTextStream("hadoop",6789) val result= lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_) result.print() ssc.start() ssc.awaitTermination() }}
(1)开启nc端口
nc -lk 6789
(2)然后再运行代码,否则会报各种奇葩的错误!
SparkStreaming处理本地或者HDFS文件,并进行wordcount的统计。
(1)hdfs
(2)metastore
(1)本地目录写法:
file:///E:\\Tools\\WorkspaceforMyeclipse\\scalaProjectMaven\\datas\\
(2)hdfs目录写法:
/spark/
(3)代码(以本地为例)
package Sparkimport org.apache.spark.SparkConfimport org.apache.spark.streaming.{Seconds, StreamingContext}/** * 使用spark Streaming处理文件系统(local/hdfs)的数据 */object FileWordCount { def main(args: Array[String]): Unit = { val sparkConf=new SparkConf().setMaster("local[2]").setAppName("FileWordCount") val ssc=new StreamingContext(sparkConf,Seconds(5)) // file:///opt/modules/spark-2.1.0-bin-2.7.3/README.md val lines=ssc.textFileStream("file:///E:\\Tools\\WorkspaceforMyeclipse\\scalaProjectMaven\\datas\\") val result= lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_) result.print() ssc.start() ssc.awaitTermination() }}
(1)将内容写入test.log
(2)将文件test.log采用cp方式,放到对应datas文件下面
cp .\test.log .\datas\
(注意:(2)非常重要,一定要通过cp或者mv的方式移动进去,否者streaming读取不到增加的流信息!)
转载地址:http://evygi.baihongyu.com/