博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
SparkStreaming(5):处理不同数据源(socket源数据或者处理本地/HDFS文件)
阅读量:4281 次
发布时间:2019-05-27

本文共 1808 字,大约阅读时间需要 6 分钟。

一、实现功能

Streaming处理socket数据源或者本地/hdfs上的数据源。

二、处理socket源数据

1.实现功能:

SparkStreaming处理socket源的数据,并进行wordcount的统计。

2.scala代码

package Sparkimport org.apache.spark.SparkConfimport org.apache.spark.streaming.{Seconds, StreamingContext}/**  * spark Streaming 处理socket数据  *  * 使用nc测试nc -lk 6789  */object NetworkWordCount {  def main(args: Array[String]): Unit = {    val sparkConf=new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")    /***      * 创建StreamingContext需要sparkConf和batch interval      */    val ssc=new StreamingContext(sparkConf,Seconds(5))    val lines=ssc.socketTextStream("hadoop",6789)    val result= lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)    result.print()    ssc.start()    ssc.awaitTermination()  }}

3.测试

(1)开启nc端口

nc -lk 6789

(2)然后再运行代码,否则会报各种奇葩的错误!

 

三、处理本地/HDFS源数据

1.实现功能:

SparkStreaming处理本地或者HDFS文件,并进行wordcount的统计。

2.前提开启:

(1)hdfs

(2)metastore

3.scala代码:

(1)本地目录写法:

file:///E:\\Tools\\WorkspaceforMyeclipse\\scalaProjectMaven\\datas\\

(2)hdfs目录写法:

/spark/

(3)代码(以本地为例)

package Sparkimport org.apache.spark.SparkConfimport org.apache.spark.streaming.{Seconds, StreamingContext}/**  * 使用spark Streaming处理文件系统(local/hdfs)的数据  */object FileWordCount {  def main(args: Array[String]): Unit = {    val sparkConf=new SparkConf().setMaster("local[2]").setAppName("FileWordCount")    val ssc=new StreamingContext(sparkConf,Seconds(5))    //    file:///opt/modules/spark-2.1.0-bin-2.7.3/README.md    val lines=ssc.textFileStream("file:///E:\\Tools\\WorkspaceforMyeclipse\\scalaProjectMaven\\datas\\")    val result= lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)    result.print()    ssc.start()    ssc.awaitTermination()  }}

4.测试:

(1)将内容写入test.log

(2)将文件test.log采用cp方式,放到对应datas文件下面

cp .\test.log .\datas\

(注意:(2)非常重要,一定要通过cp或者mv的方式移动进去,否者streaming读取不到增加的流信息!)

转载地址:http://evygi.baihongyu.com/

你可能感兴趣的文章
Xamarin 实现 Button LongClick 和 Touch
查看>>
Xamarin 进阶文档
查看>>
在 Xamarin Forms 中实现 Banner
查看>>
Android 加载本地图片路径
查看>>
Android CPU 架构详解
查看>>
Android
查看>>
Android UncaughtExceptionHandler 原理分析
查看>>
Linker 分析器
查看>>
Android APP 检测安装打开 APK 三步操作
查看>>
Xamarin.Forms Performance on Android
查看>>
AndroidManifest.xml <uses-feature> 和 <uses-permisstion>
查看>>
No toolchains found in the NDK toolchains folder for ABI with prefix: mipsel-linux-android
查看>>
【Java】JSP入门
查看>>
【JAVA】Session
查看>>
【Java】EL和JSTL
查看>>
【JAVA】三层架构,综合练习
查看>>
【Java】Filter和Listener
查看>>
【Python3】网络编程基础
查看>>
【Flask】制作用户登陆
查看>>
javascript操作html元素CSS属性
查看>>