大家好,欢迎来到IT知识分享网。
目录
一,RDD的介绍
- Resilient Distributed Datasets(弹性分布式数据集,简称 RDD)是 Spark 中最基本的数据抽象。
- 它代表一个不可变、可分区、里面的元素可并行计算的集合。
- RDD 可以从多种数据源创建,如 HDFS 文件、本地文件系统、数据库等。
- RDD 允许用户在大规模数据集上进行高效的并行计算,同时提供了容错机制,确保在节点故障时能够自动恢复数据和计算。
1,RDD计算原理图
二,RDD的特点
- 分区
- 可以把计算的海量数据分成多份,需要分成多少分区,可以通过方法指定
- 每一个分区对应一个task线程执行计算
- 只读
- rdd的数据不能直接修改,需要通过方法计算之后得到一个新的rdd
- rdd本身存储的数据只能读取
- 依赖
- rdd之间有很强的依赖关系
- 新的rdd是通过旧的rdd计算得出
- 缓存
- 可以把计算中的结果缓存起来,如果后续计算错误,可以从缓存位置重新计算
- 把数据存储在内存或者本地磁盘
- 提高容错率
- 缓存在执行计算任务程序结束后会释放删除
- 检查
- checkpoint作用和缓存是一样的
- checkpoint可以把数据存储在分布式存储系统中,比如hdfs
三,RDD的创建
- SparkContext称为Spark的入口类
- rdd的转化方法是由sprakcontext提供,所以需要先生成sparkcontext
- sc中还包括资源申请与任务划分功能
1,把python中的数据转为RDD
# encoding=utf-8 #常规查看 list1 = [1,2,3,4] print(list1,'常规查看') print(sum(list1),'常规聚合列表') #导入 sprak封装了很多rdd方法的包 from pyspark import SparkContext #创建这个类的实例 #如果不指定master 那么默认就是本地模式 locat # sc = SparkContext() sc = SparkContext(master='yarn') # parallelize 可以吧列表转为rdd对象 也可以转为元组和字典 但是不推荐 字典只能转KEY rdd=sc.parallelize(list1) print(rdd.foreach(lambda x:print(x)),'查看rdd') print(rdd.take(100),'查看rdd') # 聚合rdd res = rdd.reduce(lambda x,y:x+y) print(res,'spark计算的结果')
2,把hdfs的数据转为RDD
# encoding=utf-8 from pyspark import SparkContext #创建sc对象 sc = SparkContext() # 导入hdfs上的数据 读取目录数据 rdd = sc.textFile('hdfs://node1:8020/test') #第二种方式 直接读取文件 rdd1 = sc.textFile('hdfs://node1:8020/test/test.txt') #查看rdd print(rdd.collect()) print(rdd1.collect())
3,设置RDD的分区
# encoding=utf-8 from pyspark import SparkContext sc = SparkContext() data = [1,2,3,4,5] #转化列表为rdd的时候 指定分区 #用numslices指定分区 rdd1 = sc.parallelize(data,numSlices=4) #查看分区后的数据形式 用glom查看 res1 = rdd1.glom().collect() print(res1) #[[1], [2], [3], [4, 5]] #如果指定的分区大于文件的与元素个数 会出现空分区 rdd2 = sc.parallelize(data,numSlices=7) res2 = rdd2.glom().collect() print(res2) #[[], [1], [2], [], [3], [4], [5]] 会出现2个空分区 #读取hdsf上的数据 创建rdd的时候 指定分区 rdd3 = sc.textFile('hdfs://node1:8020/test/test.txt',minPartitions=3) res3 = rdd3.glom().collect() print(res3) #读取hdsf上的数据 分区计算规则: 文件大小 除以 分区数 == 一个值 余数/值 是否超过10% 如果超过就会创建一个空分区 rdd4 = sc.textFile('hdfs://node1:8020/test/test.txt',minPartitions=2) res4 = rdd4.glom().collect() print(res4) rdd5 = sc.textFile('hdfs://node1:8020/test/test.txt',minPartitions=5) res5 = rdd5.glom().collect() print(res5)
4,RDD中小文件的读取
# encoding=utf-8 from pyspark import SparkContext #创建sc对象 sc = SparkContext(master='yarn') #小文件合并 并重新指定分区 rdd_mini = sc.wholeTextFiles('hdfs://node1:8020/test',minPartitions=2) #查看数据 print(rdd_mini.glom().collect()) ''' 很明显 4个文件原来是4个分区 现在被合并 重新生成了2个分区 [[('hdfs://node1:8020/test/tes2.txt', '1,22,凡梦,男\r\n2,18,阿萨辛,女\r\n3,22,萨莉亚,女\r\n4,18,陆危楼,男'), ('hdfs://node1:8020/test/test.txt', '1,22,凡梦,男\r\n2,18,阿萨辛,女\r\n3,22,萨莉亚,女\r\n4,18,陆危楼,男')], [('hdfs://node1:8020/test/test1.txt', '1,22,凡梦,男\r\n2,18,阿萨辛,女\r\n3,22,萨莉亚,女\r\n4,18,陆危楼,男'), ('hdfs://node1:8020/test/test3.txt', '1,22,凡梦,男\r\n2,18,阿萨辛,女\r\n3,22,萨莉亚,女\r\n4,18,陆危楼,男')]] '''
RDD算子回顾
- 创建sc对象 SparkContext(mater=’填入不同的资源调度模式 比如 locat 或者yarn’)
- sc.parcllelize(列表) 通常是把列表转为RDD,
- 也可以转元组或者字典,但是由于元组是不可变对象,所以不常用;
- 转字典的话只是把key转过去 值会丢失;
- 所以通常推荐转列表过去;
- sc.textFile(‘hdfs上的路径’) 可以吧hdfs上的文件转为RDD
- 在把列表或者hdfs上的数据转为RDD的时候可以设置分区
- sc.parallelize(list,numSlices=3) 3就是分3个区
- sc.textFile(‘hdfs上的路径’,minPartitions=3) 3就是设置分3个区
- 如果分区数量大于元素的数量,那么会产生空分区
- 使用rdd.collect() 可以查看RDD的内容
- 使用rdd.glom().collect 可以查看分区 以及分区里面的内容
- 使用 sc.wholeTextFiles(‘hdfs上的路径’,minpartitions=3)
- 3是指定的分区数
- 可以这个种方式可以合并小文件,并且重新指定分区
免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://haidsoft.com/126923.html