spark内核-精品文档资料整理.pdf

上传人:安*** 文档编号:19244609 上传时间:2022-06-05 格式:PDF 页数:36 大小:1.25MB
返回 下载 相关 举报
spark内核-精品文档资料整理.pdf_第1页
第1页 / 共36页
spark内核-精品文档资料整理.pdf_第2页
第2页 / 共36页
点击查看更多>>
资源描述

《spark内核-精品文档资料整理.pdf》由会员分享,可在线阅读,更多相关《spark内核-精品文档资料整理.pdf(36页珍藏版)》请在taowenge.com淘文阁网|工程机械CAD图纸|机械工程制图|CAD装配图下载|SolidWorks_CaTia_CAD_UG_PROE_设计图分享下载上搜索。

1、Spark 内核内核讲师:陈博400-009-400-009-1906 RDD是基础是基础 Resilient Distributed Dataset 弹性分布式数据集 五大特性: A list of partitions A function for computing each split A list of dependencies on other RDDs Optionally, a Partitioner for key-value RDDs Optionally, a list of preferred locations to compute each split 400-00

2、9-1906 Spark运行时运行时400-009-1906 流流程示意程示意 分布式文件系统(File system)-加载数据集 transformations延迟执行-针对RDD的操作 Action触发执行400-009-1906 代码示例代码示例 lines = sc.textFile(“hdfs:/.”) 加载进来成为RDD errors = lines.filter(_.startsWith(“ERROR”) Transformation转换 errors.persist() 缓存RDD Mysql_errors = errors.filter(_.contain(“MySQL”)

3、.count Action执行 http_errors = errors.filter(_.contain(“Http”).count Action执行400-009-1906 缓存策略缓存策略400-009-1906 转换算子转换算子 操作算子操作算子400-009-1906 400-009-1906 血统血统 Lineage 每个看做一个RDD400-009-1906 Rdd 容错容错Lineage(血统)利用内存加快数据加载,在众多的其它的In-Memory类数据库或Cache类系统中也有实现,Spark的主要区别在于它处理分布式运算环境下的数据容错性(节点实效/数据丢失)问题时采用的方

4、案。为了保证RDD中数据的鲁棒性,RDD数据集通过所谓的血统关系(Lineage)记住了它是如何从其它RDD中演变过来的。相比其它系统的细颗粒度的内存数据更新级别的备份或者LOG机制,RDD的Lineage记录的是粗颗粒度的特定数据转换(Transformation)操作(filter, map, join etc.)行为。当这个RDD的部分分区数据丢失时,它可以通过Lineage获取足够的信息来重新运算和恢复丢失的数据分区。这种粗颗粒的数据模型,限制了Spark的运用场合,但同时相比细颗粒度的数据模型,也带来了性能的提升。RDD在Lineage依赖方面分为两种Narrow Dependenc

5、ies与Wide Dependencies用来解决数据容错的高效性。Narrow Dependencies是指父RDD的每一个分区最多被一个子RDD的分区所用,表现为一个父RDD的分区对应于一个子RDD的分区或多个父RDD的分区对应于一个子RDD的分区,也就是说一个父RDD的一个分区不可能对应一个子RDD的多个分区。Wide Dependencies是指子RDD的分区依赖于父RDD的多个分区或所有分区,也就是说存在一个父RDD的一个分区对应一个子RDD的多个分区。对与Wide Dependencies,这种计算的输入和输出在不同的节点上,lineage方法对与输入节点完好,而输出节点宕机时,通

6、过重新计算,这种情况下,这种方法容错是有效的,否则无效,因为无法重试,需要向上其祖先追溯看是否可以重试(这就是lineage,血统的意思),Narrow Dependencies对于数据的重算开销要远小于Wide Dependencies的数据重算开销。容错在RDD计算,通过checkpoint进行容错,做checkpoint有两种方式,一个是checkpoint data,一个是logging the updates。用户可以控制采用哪种方式来实现容错,默认是logging the updates方式,通过记录跟踪所有生成RDD的转换(transformations)也就是记录每个RDD的l

7、ineage(血统)来重新计算生成丢失的分区数据。400-009-1906 容错容错 val logs = sc.textFile(.).filter(_.contains(“spark”).map(_.split(t)(1) 上面代码对应 HadoopRDDsc.textFile(.) FilterRDD_.contains(.) MappedRDD_.split(.) 每个RDD都会记录自己依赖与哪个或哪些RDD,万一某个RDD的某些partition挂了,可以通过其它RDD并行计算迅速恢复出来400-009-1906 checkpoint Lineage过长 对rdd做doCheckpo

8、int() SparkContext.setCheckPointDir() 设置数据存路径400-009-1906 窄依赖和宽依赖的例子。(方框表示窄依赖和宽依赖的例子。(方框表示RDD,实心矩形表示分区)实心矩形表示分区)400-009-1906 术语解术语解释释 Application 基于Spark的用户程序,包含了driver程序和集群上的executor Driver Program 运行main函数并且新建SparkContext的程序 Cluster Manager 在集群上获取资源的外部服务(例如standalone,Mesos,Yarn )400-009-1906 Worke

9、r Node 集群中任何可以运行应用代码的节点 Executor是在一个worker node上为某应用启动的一个进程,该进程负责运行任务,并且负责将数据存在内存或者磁盘上。每个应用都有各自独立的executors Task 被送到某个executor上的工作单元400-009-1906 Job 包含很多任务的并行计算,可以看做和Spark的action对应 Stage一个Job会被拆分很多组任务,每组任务被称为Stage(就像Mapreduce分map任务和reduce任务一样)400-009-1906 Cluster O400-009-1906 Spark任务调度任务调度器器 调度器根据R

10、DD的结构信息为每个动作确定有效的执行计划。调度器的接口是runJob函数,参数为RDD及其分区集,和一个RDD分区上的函数。该接口足以表示Spark中的所有动作(即count、collect、save等)。 总的来说,我们的调度器跟Dryad类似,但我们还考虑了哪些RDD分区是缓存在内存中的。调度器根据目标RDD的Lineage图创建一个由 stage构成的无回路有向图(DAG)。每个stage内部尽可能多地包含一组具有窄依赖关系的转换,并将它们流水线并行化(pipeline)。 stage的边界有两种情况:一是宽依赖上的Shuffle操作;二是已缓存分区,它可以缩短父RDD的计算过程。例如

11、图6。父RDD完成计算后,可以在stage内启动一组任务计算丢失的分区。400-009-1906 400-009-1906 一个一个stage内的窄依赖进行内的窄依赖进行pipeline操作操作 1+1+1+1=4 1+1=2; 2+1=3; 3+1=400-009-1906 任任务调度务调度400-009-1906 DAG Scheduler 基于Stage构建DAG,决定每个任务的最佳位置 记录哪个RDD或者Stage输出被物化 将taskset传给底层调度器TaskScheduler 重新提交shuffle输出丢失的400-009-1906 Task Scheduler 提交taskse

12、t(一组task)到集群运行并汇报结果 出现shuffle输出lost要报告fetch failed错误 碰到straggle任务需要放到别的节点上重试 为每一个TaskSet维护一个TaskSetManager(追踪本地性及错误信息)400-009-1906 Job调度流程调度流程400-009-1906 wordcount 我们在sparkshell中运行一下最简单的例子,在spark-shell中输入如下代码 scalasc.textFile(README.md).filter(_.contains(Spark).count 上述代码统计在README.md中含有Spark的行数有多少4

13、00-009-1906 400-009-1906 400-009-1906 400-009-1906 400-009-1906 性能优化性能优化 问题: val rdd = data.filter(f1).filter(f2).reduceBy经过以上语句会有很多空任务或者小任务 解决: 使用coalesce或者repartition去减少RDD中partition数量400-009-1906 性能优化性能优化 问题: 每个记录的开销太大rdd.mapx=conn=getDBConn;conn.write(x.toString);conn.close 解决:rdd.mapPartitions(

14、records = conn.getDBConn;for(item -records)write(item.toString); conn.close)400-009-1906 性能优化性能优化 问题: 任务执行速度倾斜 解决: 1、数据倾斜(一般是partition key取的不好) 考虑其它的并行处理方式 中间可以加入一步aggregation 2、Worker倾斜(在某些worker上的executor不给力) 设置spark.speculation=true 把那些持续不给力的node去掉400-009-1906 性能优化性能优化 问题:不设置spark.local.dir 这是spa

15、rk写shuffle输出的地 解决: 设置一组磁盘 spark.local.dir=/mn1/spark, /mnt2/spar, /mnt3/spark 增加IO即加快速度400-009-1906 性能优化性能优化 问题: reducer数量不合适 解决: 需要按照实际情况调整 太多的reducer,造成很多的小任务,以此产生很多启动任务的开销。 太少的reducer,任务执行慢! reduce的任务数还会影响到内存400-009-1906 性能优化性能优化 问题 : collect输出大量结果慢,审视源码 解决 : 直接输出到分布式文件系统400-009-1906 性能优化性能优化 问题:序列化 Spark默认使用JDK自带的ObjectOutputStream 兼容性好,体积大,速度慢 解决: 使用Kryo serialization 体积小,速度快

展开阅读全文
相关资源
相关搜索

当前位置:首页 > 教育专区 > 教案示例

本站为文档C TO C交易模式,本站只提供存储空间、用户上传的文档直接被用户下载,本站只是中间服务平台,本站所有文档下载所得的收益归上传人(含作者)所有。本站仅对用户上传内容的表现方式做保护处理,对上载内容本身不做任何修改或编辑。若文档所含内容侵犯了您的版权或隐私,请立即通知淘文阁网,我们立即给予删除!客服QQ:136780468 微信:18945177775 电话:18904686070

工信部备案号:黑ICP备15003705号© 2020-2023 www.taowenge.com 淘文阁