RDDs的基本特性
1.延迟计算
Spark对RDDs的计算是当他们第一次使用Action操作的时候。这种方式在处理大数据时很有用,可以减少数据的传输。
Spark内部记录了metadata表来表明transformation操作是否已经被相应,metadata中只记录已经被相应的Transformation操作
加载数据也是延迟计算,数据只有在必要的时候才会被加载进去
2.持久化
RDD.persist()讲前面产生的RDD进行缓存,当再次使用时不需要进行前面产生该RDD的一系列操作,而直接在缓存中调用这个RDD即可
RDDs的基本操作
1.Transformation(转换)
从之前的RDD构建新的RDD,如map()和filter()操作都属于Transformation
(1)map()
map()接收一个函数,把函数应用到RDD的每个元素,返回新的RDD
val lines=sc.parallelize(Array("hello","Spark","hello","World"),4) val lines2=sc.map(line=>(line,1)) #这里会将所有的元素都和1组成新rdd的元素 lines.collect().foreach(println)
output: (hello,1) (Spark,1) (hello,1) (World,1)
(2)filter()
filter()接收函数,返回只包含满足filter()函数元素的新RDD
val lines3=lines.filter(line=>line.contains("hello")) #这里只留下包含hello的 lines3.collect().foreach(println)
output: (hello,1) (hello,1)
(3)flatMap()
flatMap()对每个输入元素,输出多个元素。将RDD中元素压扁后返回一个新的RDD。
首先我们本地存在一个文件HelloSpark.txt,文件内容如下:
第一步我们将文件内容以每个元素换行方式打印,我们可以看出初始时从文件中读取的文件将内部内容分为”Hello Spark !“和”Are you ok ?“两个元素。
val text_rdd=sc.textFile("/home/yu/data/HelloSpark.txt") text_rdd.collect()/foreach(println)
out: Hello Spark ! Are you ok ?
接下来我们将文件使用flatmap()函数进行处理后压扁:
val flatmap_rdd=text_rdd.flatMap(line=>line.split(" ")) flatmap_rdd.collect().foreach(print) flatmap_rdd.collect().forech(println)
out: HelloSparkAreyouok? Hello Spark ! Are you ok ?
从println的输出我们可以看到split方法将整个rdd中的元素根据“ ”分成了7个元素,从print打印的结果可以看出flatMap()讲切割后文件进行了压扁处理。
(4)集合运算
distinct() 去重
union() 并集
intersection() 交集
substract() A中有而B中没有的元素
val rdd1=sc.parallelize(Array("coffe","coffe","tea","monkey","time")) val rdd2=sc.parallelize(Array("coffe","house")) val distinct_rdd=rdd1.distinct() val union_rdd=rdd1.union(rdd2) val inter_rdd=rdd1.intersection(rdd2) val sub_rdd=rdd1.inter distinct_rdd.collect().foreach(println) println("<------>") union_rdd.collect().foreach(println) println("<------>") inter_rdd.collect().foreach(println) println("<------>") sub_rdd.collect().foreach(println)
out: tea
2.Action
在RDD上计算一个结果(transformation只是在进行转换等操作,并没有计算出实际的结果),把结果返回给driver program或保存在文件系统。
(1)collect()
遍历整个RDD,返回RDD的所有元素。
注意:返回内容必须要单机内存能够容纳下(因为数据要拷贝给driver测试使用),大数据时我们一般使用saveAsTextFile()
val rdd=sc.parallelize(Array(1,2,4,4)) rdd.collect()
out:Array[Int]=Array(1,2,4,4)
(2)reduce()
接收一个函数,作用在RDD两个类型相同的元素上,返回新的元素。可实现RDD中元素的累加、计数,和其他类型的集聚操作。
val sum=rdd.reduce((x,y)=>x+y) print(sum)
out:11
(3)take()
返回RDD的n个元素(尝试访问最少的partitions),返回结果时无序的,一般有测试使用
(4)top()
返回排序(根据RDD中的数据比较器)后的结果,返回几个元素。
(5)foreach()
计算RDD中的每个元素,但不返回到本地,可以配合println()打印良好输出。
rdd.foreach(println)
out: 1 2 4 4