博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
RDDs的基本操作
阅读量:6341 次
发布时间:2019-06-22

本文共 3307 字,大约阅读时间需要 11 分钟。

RDDs的基本特性

  1.延迟计算

    Spark对RDDs的计算是当他们第一次使用Action操作的时候。这种方式在处理大数据时很有用,可以减少数据的传输。

    Spark内部记录了metadata表来表明transformation操作是否已经被相应,metadata中只记录已经被相应的Transformation操作  

    加载数据也是延迟计算,数据只有在必要的时候才会被加载进去

  2.持久化

    RDD.persist()讲前面产生的RDD进行缓存,当再次使用时不需要进行前面产生该RDD的一系列操作,而直接在缓存中调用这个RDD即可

RDDs的基本操作

1.Transformation(转换)

  从之前的RDD构建新的RDD,如map()和filter()操作都属于Transformation

  (1)map()

    map()接收一个函数,把函数应用到RDD的每个元素,返回新的RDD

  val lines=sc.parallelize(Array("hello","Spark","hello","World"),4)   val lines2=sc.map(line=>(line,1))    #这里会将所有的元素都和1组成新rdd的元素   lines.collect().foreach(println)
  output:  (hello,1)        (Spark,1)        (hello,1)        (World,1)

  (2)filter()

    filter()接收函数,返回只包含满足filter()函数元素的新RDD

  val lines3=lines.filter(line=>line.contains("hello"))  #这里只留下包含hello的  lines3.collect().foreach(println)
  output:    (hello,1)              (hello,1)

  (3)flatMap()

    flatMap()对每个输入元素,输出多个元素。将RDD中元素压扁后返回一个新的RDD。

    首先我们本地存在一个文件HelloSpark.txt,文件内容如下:

          

    第一步我们将文件内容以每个元素换行方式打印,我们可以看出初始时从文件中读取的文件将内部内容分为”Hello Spark !“和”Are you ok ?“两个元素。

     val text_rdd=sc.textFile("/home/yu/data/HelloSpark.txt")      text_rdd.collect()/foreach(println)
     out:   Hello Spark !           Are you ok ?

    接下来我们将文件使用flatmap()函数进行处理后压扁:

    val flatmap_rdd=text_rdd.flatMap(line=>line.split(" "))    flatmap_rdd.collect().foreach(print)    flatmap_rdd.collect().forech(println)
    out:   HelloSparkAreyouok?           Hello           Spark           !             Are           you           ok           ?           

    从println的输出我们可以看到split方法将整个rdd中的元素根据“ ”分成了7个元素,从print打印的结果可以看出flatMap()讲切割后文件进行了压扁处理。

  (4)集合运算

    distinct()    去重

    union()     并集

    intersection()  交集 

    substract()   A中有而B中没有的元素

      val rdd1=sc.parallelize(Array("coffe","coffe","tea","monkey","time"))      val rdd2=sc.parallelize(Array("coffe","house"))      val distinct_rdd=rdd1.distinct()      val union_rdd=rdd1.union(rdd2)      val inter_rdd=rdd1.intersection(rdd2)       val sub_rdd=rdd1.inter      distinct_rdd.collect().foreach(println)      println("<------>")      union_rdd.collect().foreach(println)      println("<------>")      inter_rdd.collect().foreach(println)       println("<------>")       sub_rdd.collect().foreach(println)

 

     out:   tea
            monkey
            coffe
            time
            <------>
            coffe
            coffe
            tea
            monkey
            time
            coffe
            house
            <------>
            coffe
            <------>
            tea
            monkey
            time

 2.Action

  在RDD上计算一个结果(transformation只是在进行转换等操作,并没有计算出实际的结果),把结果返回给driver program或保存在文件系统。

  (1)collect()  

      遍历整个RDD,返回RDD的所有元素。

      注意:返回内容必须要单机内存能够容纳下(因为数据要拷贝给driver测试使用),大数据时我们一般使用saveAsTextFile()

    val rdd=sc.parallelize(Array(1,2,4,4))     rdd.collect()
    out:Array[Int]=Array(1,2,4,4)

  (2)reduce()  

    接收一个函数,作用在RDD两个类型相同的元素上,返回新的元素。可实现RDD中元素的累加、计数,和其他类型的集聚操作。

    val sum=rdd.reduce((x,y)=>x+y)     print(sum)
    out:11

  (3)take()

    返回RDD的n个元素(尝试访问最少的partitions),返回结果时无序的,一般有测试使用

  (4)top()

    返回排序(根据RDD中的数据比较器)后的结果,返回几个元素。

  (5)foreach() 

    计算RDD中的每个元素,但不返回到本地,可以配合println()打印良好输出。

rdd.foreach(println)
    out:   1          2          4          4

 

转载于:https://www.cnblogs.com/2017Crown/p/7413982.html

你可能感兴趣的文章
Lync 2013更新CU2
查看>>
Tomcat7+ 启动慢的问题解决
查看>>
0802收获
查看>>
google 开源项目C++ 编码规范
查看>>
23种设计模式之观察者模式
查看>>
memcached的安装与开启脚本
查看>>
Linux与Window字符集~~伤不起的幽灵空白符
查看>>
zabbix 邮件报警 -- sendmail
查看>>
JavaScript异步编程
查看>>
tcpdump用法小记
查看>>
MySQL基础安全注意细节
查看>>
Oracle随机函数—dbms_random
查看>>
pvr 批量转换
查看>>
linux命令basename使用方法
查看>>
windows下开发库路径解决方案
查看>>
linux迁移mysql数据目录
查看>>
脚本源码安装LNMP
查看>>
Percona Server安装
查看>>
Spark DateType cast 踩坑
查看>>
函数为左边表达式
查看>>