《Spark-机器学习.pdf》由会员分享,可在线阅读,更多相关《Spark-机器学习.pdf(28页珍藏版)》请在taowenge.com淘文阁网|工程机械CAD图纸|机械工程制图|CAD装配图下载|SolidWorks_CaTia_CAD_UG_PROE_设计图分享下载上搜索。
1、1.机器学习 1.1.机器学习 1.1.1.机器学习 随着大数据的发展,人们对大数据的处理要求也越来越高,原有的批处理框架MapReduce 适合离线计算(在 MapReduce 中,由于其分布式特性所有数据需要读写磁盘、启动 job 耗时较大,难以满足时效性要求。),却无法满足实时性要求较高的业务,如实时推荐、用户行为分析等。Spark 是一个类似于 MapReduce 的分布式计算框架,其核心是弹性分布式数据集,提供了比 MapReduce 更丰富的模型,可以在快速在内存中对数据集进行多次迭代,以支持复杂的数据挖掘算法和图形计算算法。1.2.介绍 1.2.1.介绍 给定已知的数据,由这些数
2、据估计出海淀区某个房子它的售价大概在多少万?可以从上面的图看到,用一些离散的点把刚才这些数据表示出来。上面的图哼坐标是面积,纵坐标是售价。从中可以看到一定的规矩。如 200 平的面积售价在上千万了。那机器学习就是用来做这样的事情。也就是从已知的数据中,找出规律,方便做出预测或者估计的功能。那如果去找出规律?可以看到,它的点基本都围绕着这个横线。那 200 平米,基本上在 1400 万到 1600 万之间。所以机器学习就是怎样找到这个直线,怎么找到房价的规律。1.2.2.hypthesis(假设)预测函数 平面几何里的一根线,用函数表示为:x0可以认为是一个常量 1 房价=0.0+6.87*面积
3、 0=0,x0=1,1=6.87,x1 就是面积,h(x)最后算出来就是房价。这种函数我们把它叫做线性函数。0,1 更多的取值,它们取不同值时,画线的规律,或者与真实数据的吻合程度。可以看绿色的线,当 0 等于 0 时。可以看到位置还是有所偏差的。所以 1=7 时大部分房价是吻合的。那我们就可以以 0=0 和 1=7 时来预测房价了。我们肉眼一看就知道 1=7 最好,但计算机怎么知道呢?1.2.3.Cost 函数 用来判断预测函数 h(x)和真实数据之间的误差程度 计算红色线的房价,计算绿色线的房价,然后和真实房价做比较。可以发现绿色的线的值更接近真实房价。一个点当然不够,那就所有的点,就是我
4、预测的点和所有真实的点的差值。通过上面的分析,问题变化为寻找一个最接近真实规律的 0和 1,最终我们就得到了下面的 cost 函数。cost 函数有很多种表现方式,这个是一个比较好理解的方式。h 是预测函数 x 面积 feature(特征)0和 1两个特征 y 房价 lable(标签)真实的数据,(预测值-真实值)2求平均,误差比较小,放大这个误差,做了个平方。求和函数把所有差值的平方求和最后求平均。m 就是真实样本数据的数。那我们有 100 条数据,那 m=100.这个函数用来衡量预测值 h(xi)与真实样本数据 yi之间的差距 假设我们给定了 0和 1,那这个结果值是大了好,还是小了好呢?
5、当然是越小越好。1.2.4.怎么找到最新值?就是前面给出的预测函数 就是前面给出的 Cost 函数 x 是 features 特征 x0h(x)运行的结果,假定都是 1,x1 就是样本的数据。当然我们要考虑的不只是一个点,要考虑所有点,所有有多条,减去真实的房价 预测值-真实值,然后平方,最后求和 求和完了求平均。22 就是我们的样本数量。上面计算完,是不就剩下 0和 1了。那我们针对每一组 0和 1来计算它的运算结果。不断改变 0和 1的值,看哪个结果最小。怎么看呢,笨办法也是最有效的办法,就是不断尝试改变 0和 1。如0和 1都是 0 先试一遍计算的结果是多少,然后 0改成 1,看刚才结果
6、谁小。如果是小当然好,如果是大,就把这组结果排除,再试下一组。反正不断的尝试 0和 1,让计算的结果最小。上面这套规则,以及样本数据就称之为模型。不断尝试改变 0和 1的值来求最合适的 的过程称之为训练模型。1.2.5.向量化 经过我们讲解,看似很复杂的数学公司,其实也不过带几个参数就可以计算出来,但这样编程还是有些太麻烦。我们将它转化下,它将用到数学中的向量和乘法。把 x0 和 x1 先变成线性代数中的矩阵,也就相当于 java 中的数组。m 行样本数,n 列特征数(有几列就是看有多少个特征,x0 是常量,x1 就是建筑面积)将所有的房价抽取成y。像x0,x1的我们称为矩阵,只有一维的 y
7、称为向量。把 0和 1也各自抽取成一个矩阵。这样表示起来也更清晰些,最后编程运算也方便。java 也提供了很多这样的函数,方便矩阵和向量的运算。这样就将上面的过程表示为一个公式:X 就是 x0 和 x1 的矩阵 就是 0和 1的向量 y 就是房价的向量 最终平方,sum 求和,除以 2 乘样本数 一般数学上大写的是表示矩阵,小写的表示向量。矩阵*向量时的规则要求:矩阵的列数=向量的行数,如上面的矩阵有 2 列,向量的个数有 2 个。最后的结果成为一个向量。加减时,它们的维度都要一样。上面的结果成为一列,房价是一列,这样才可以加减。1.3.Octave 开源分析工具 1.3.1.介绍 Octav
8、e是一个旨在提供与Matlab语法兼容的开放源代码科学计算及数值分析的工具,是 Matlab 商业软件的一个强有力的竞争产品。Octave 比较小,安装程序只有几十兆;而Matlab 非常庞大,最新版的安装程序大约 8G,即使只安装最基本的系统,至少也要几百兆以上。Matlab 之所以那么庞大,是因为有大量的面向各种应用领域的工具箱,Octave 无法相比的。安装完 Octave 提供两个方式,一种图形化界面 GUI,一种命令行 CLI。1.3.2.如何定义矩阵和向量 规则:用分号分割行,用逗号或者空格分隔列 定义一个矩阵 x=1,2;3,4 x=1 2 3 4 定义一个向量 x=1;2;3
9、y=1 2 3 技巧:不想回显结果在行尾加个分号,结果是相同的。如果想看 X 是什么,敲入 X 就回显。1.4.常用的函数 1.4.1.ones 都是 1 的矩阵 ones(3,4)ans=1 1 1 1 1 1 1 1 1 1 1 1 1.4.2.zeros 都是 0 的矩阵 zeros(3,4)ans=0 0 0 0 0 0 0 0 0 0 0 0 1.4.3.产生数列 1:2:100#规则:起始值:步长:结束值 1:2:100#1 到 100 之间的奇数 0:2:100#0 到 100 之间的偶数-100:10:100#-100 到 100 之间 q=1,0.2,2#产生 1 到 2 之
10、间,步长为 0.2 注意:写不写两边的都可以 1.4.4.rand 随机数矩阵 rand(3,4)产生 0 到 1 的随机数 ans=0.897313 0.379222 0.674536 0.272163 0.893621 0.284886 0.729541 0.479639 0.267196 0.095435 0.867866 0.800308 1.4.5.*矩阵转置 r=rand(3,4)r#单撇就代表转置,将之前 m*n 的矩阵转换为 n*m 的矩阵 r=rand(3,4)r=0.412314 0.810688 0.753933 0.607234 0.766888 0.888016 0.
11、017703 0.380165 0.107094 0.571571 0.234939 0.030356 r ans=0.412314 0.766888 0.107094 0.810688 0.888016 0.571571 0.753933 0.017703 0.234939 0.607234 0.380165 0.030356 1.4.6.获取矩阵行数和列数 size(r)#得到矩阵大小 ans=3 4 size(r,1)#得到矩阵的行数 ans=3 size(r,2)#得到矩阵的列数 ans=4 y=1,2,3,4,5 y=1 2 3 4 5 length(y)#向量的长度 ans=5 1
12、.4.7.读取外部文本文件形成矩阵 文件是按 tab 键分割,load 函数可以自动识别。也支持逗号和空格分隔。cd c:#进入 c 盘 cd txt#进入目录 ls#列出当前目录下文件 F=load(prices.txt)#也可以直接指定路径 c:/prices/txt 文件内容多,它会出现这个提示,f 向后翻页,d 先前翻页,q 退出 1.4.8.求矩阵子集 F(:,:)#所有行,所有列 F(:,1)#所有行,第一列 F(:,2)#所有行,第二列 F(:,2:4)#所有行,第二列到第四列,总共三列 F(3,:)#第三行,所有列 F(1:10,1:2)#1 到 10 行,1 到 2 列 注意
13、:下标从 1 开始 现有矩阵和信的矩阵合并 例子 1:x=1,2,3 y=4,5,6 x,y 结果:1,2,3,4,5,6 x=1;2;3 y=4;5;6 x,y 结果:1 4 2 5 3 6 x=1,2,3 y=4;5;6 x,y 结果:error:horizontal dimensions mismatch(1x3 vs 3x1)例子 2:F=load(c:/prices.txt)读取 prices.txt 文件,有 22 行数据 C=ones(22,1)定义一个 22 行 1 列的向量 C,其值都为 1 D=F(:,2)获取房屋面积列,第二列 E=C,D 将两个向量和在一起,前提行数必须
14、相同,这里都是 22 行 1.5.集合的运算 1.5.1.矩阵相加 x=1;2;3 y=4;5;6 x+y 结果:5 7 9 1.5.2.矩阵相减 x-y 结果:-3-3-3 1.5.3.矩阵相乘 要求:坐边为 m*n,右边为 n*1 的矩阵,结果为 m*1 的矩阵。第一个的列数=第二个的行数 x=1 2;3 4;5 6 y=2;4 x*y 过程:x 中行的元素*对应 y 列中元素,一行的结果相加 1*2+2*4 3*2+4*4 4*2+4*6 结果:10 22 34 实际之前讲的预测函数就是矩阵相乘的每一行的结果 A=o,1;o,1;o,1 三行数据 B=Xo;X1 第一行:o*Xo+1*X
15、1 第二行:o*Xo+1*X1 第三行:o*Xo+1*X1 1.5.4.矩阵每个元素求平方 y=1;2;3 y.2#平方 y.3#立方 注:在 otcave 中.代表应用于每个元素之上 1.5.5.矩阵求和 y=1 1;2 2;3 3 sum(y)结果:6 6 sum(y.2)平方后求和 14 14 1.6.房价预测案例 1.6.1.计算预测 X 是矩阵,是向量,y 是向量;2 就是求平方,在 sum 求和,最后在除以 2 倍的 m 行数,求平均值。如果使用octave去机器学习,这些公司数学家已经提供,你不需要懂这个公式是什么,你只要明白这种向量格式,然后往里一套就可以了。会运算就可以,你是
16、要编程,不是要做数学家,要去弄明白公式怎么来的。只要会套公式就可以。分析拆解过程:A=load(c:/prices.txt)ones(22,1)x0 A(:,2)x1 取 prices.txt 中的第二列,房屋面积 ones(22,1),A(:,2)X 的矩阵,m 行,2 列 0;6 0=0,1=6,随便指定的预测值 执行过程:A=load(c:/prices.txt);X=ones(22,1),A(:,2);h=X*0;6;预测值的向量,矩阵相乘结果是 22 行,1 列 y=A(:,1);真实房价 sum(h-y).2/(2*22)计算误差值 代码:A=load(c:/prices.txt)
17、;X=ones(22,1),A(:,2);h=X*0;6;y=A(:,1);sum(h-y).2/(2*22)不断的重复上面的过程,不断的改变的值 1=6,计算结果 92498,1=7,计算结果 5221,1=8,计算结果 101290 h=X*0;7;sum(h-y).2/(2*22)可以看到结果差异巨大,那0 和1 可以有无穷组合,那问题就变成怎么找一个0,1 的误差数最小?1.6.2.调用公式 那我们就要解决如何利用算法得到01 的值,之前所学的 COST 函数就发挥作用了。Z 返回值=函数名(参数)z0 就是0,给个范围-100100 z1 就是1,给个范围-100100 X 就是之前
18、的矩阵 y 就是之前的向量 function Z=costFunction(z0,z1,X,y)Z=zeros(length(z0),length(z1);/初始 0 的集合 m=length(y);/样本个数 /通过两层循环不断的尝试0 和1 的组合 for i=1:length(z0)for j=1:length(z1)/运算公式,构建了 100*100 的组合都放在 Z(i,j)中 Z(i,j)=sum(X*z0(i);z1(j)-y).2)/(2*m);end end Z=Z;/单撇矩阵转置 end 注意:octave 对中文支持不好,所以源码中不要出现中文 function Z=co
19、stFunction(z0,z1,X,y)Z=zeros(length(z0),length(z1);m=length(y);for i=1:length(z0)for j=1:length(z1)Z(i,j)=sum(X*z0(i);z1(j)-y).2)/(2*m);end end Z=Z;end 保存成文件放在磁盘上,注意文件名和函数名相同,后缀必须为 m。costFunction.m。检查代码是否正确,可以先直接到文件所在目录,然后只敲入函数名。如果语法正确就会提示缺失参数。调用:z0=-200:10:300;/由-200 开始,步长为 10,到 300 结束 z1=2:0.2:12;
20、/由 2 到 12,步长为 0.2 J=costFunction(z0,z1,X,y)执行结果:这么多结果,如果找到最小的误差呢?z=costFunction()这有非常多的结果,怎么看呢?octave 中可以很方便的把它画成图。x0,x1 两个自变量 y 因变量。这有 3 个动态的值,很容易就画成立体图。plot3(J)/生成 3D 的图,速度比较慢,稍加等待 1.6.3.梯度下降 如果同时变化0 和1,问题变得复杂,不妨固定其中一个,变化另一个。比如说,先让0=0 z0=zeros(1,51);z1=2:0.2:12;J=costFunction(z0,z1,X,y);plot(z1,J(
21、:,1)z0 都为 0 了,所以展示时可以看二维的更加直观,忽略 z0 的值,所以只取 J 结果集中的第一列。变成了二维的比三维看着简单多了,可还是老问题,如何求最小值值呢?求最小值有很多方法,其中一个方法就是梯度下降方法。这里我们不去探讨数学的问题,也无需弄懂它的推导过程,我们只需记住结论就可以。我们要找到每次应该减少多少呢?下次的1 就应该是 可以乘以一个常量,来调整下降的速率,这个在机器学习中称为学习速率。不能太大。可能就一下跳过最小值了。太小也不行,找到最小值的速度就会变慢。这种找到最小值的方法,就称为梯度下降法,非常形象。除了这种方法,还可以使用解方程的方式,两种各有优缺点。解方程的
22、方法,当数据量少的时候,速度快。解方程就无需了。单用梯度下降法就适用数据量特别大的时候。编程时梯度下降法更加方便。用 java 解方程很麻烦。1.6.4.代码实现 伪代码:循环很多次,不断的改变1,观察 J(1)的值如果值 3000,然后继续循环,还是 3000,继续循环,还是 3000,不变化了。就可以认为找到最小值了。function z,J_his=descentFunction(z,X,y,a,iters)J_his=zeros(iters,1);m=length(y);%样本数 n=length(z);t=zeros(n,1);for iter=1:iters for i=1:n%变
23、化率 t(i)=(a/m)*(X*z-y)*X(:,i);end;for i=1:n z(i)=z(i)-t(i);end;J_his(iter)=sum(X*z-y).2)/(2*m);end end function z,J_his 定义了两个返回值,J_his 就是 costFunction 的结果。descentFunction(z,X,y,a,iters)z 就是,初始值,多少无所谓。X 还是之前的矩阵,y 还是之前的向量,a 就是学习速率,决定了计算的速度,越大循环的次数就越小,太小就会造成运算速度太慢,太大就可能造成跳过最小值。iters 就代表循环次数,循环多少次去求解。t(i
24、)=(a/m)*(X*z-y)*X(:,i);t 就代表变化量,X*z 就是 h 预测函数。因为两边的维度的对上,得跟后面的维度对上,所以做了个转置。z(i)=z(i)-t(i);上次的 z(i)再减去 t(i)变化率,就得到下一次的值。J_his(iter)=sum(X*z-y).2)/(2*m);在套入 costFunction,看它的结果是不是跟上次结果相比不在变化了。只要不在变化或者变化微小。这时我就找到最小值了。这个算法大家大致明白它的含义,会应用就可以。因为在 spark 中它已经替我们实现了。执行代码:descentFunction(z,X,y,a,iters)A=load(c:
25、/prices.txt);X=ones(22,1),A(:,2);y=A(:,1);a=0.0001%事先测试好,这个值比较好 iters=1%循环一次 z,J=descentFunction(z,X,y,a,iters)可以看到 J 不断在减小,那什么时候是头呢?观察 J 的变化越来越微乎其微了,只小数点后发生变化。1.7.Spark 中机器学习 1.7.1.Vector 向量 首先要解决怎么把原始数据包装成向量,这些向量其实也是 RDD 封装实现,在把这些向量分布到集群上然后做运算。创建向量就要用到 Vector。Vector.dense()创建稠密向量 Vector.dense(1,2,
26、3);就创建了 123 的向量。在 spark 中向量都是横着放的。123,而 Octave 中是纵的。什么叫稠密向量呢?每个元素都要给定。相对稀疏向量,稀疏向量只给定向量元素不为 0 的值。为 0 的值默认就不给它。导包 import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.linalg.Vectors 算法 基于线性回归思想解决线性回归的算法 SGD import org.apache.spark.mllib.regression.LinearRegressionModel i
27、mport org.apache.spark.mllib.regression.LinearRegressionWithSGD scala Vector.dense(1,2,3)res0:org.apache.spark.mllib.linalg.Vector=1.0,2.0,3.0 稀疏向量用的比较少,它主要为了节约空间。scala Vector.sparse(5,Array(0,1),Array(2,3)第一个参数是元素个数,第二个参数是非零的坐标,第三个参数是非零的值 res2:org.apache.spark.mllib.linalg.Vector=(5,0,1,2.0,3.0)1.7
28、.2.LabeledPoint 只有先把数据转变成 LabeledPoint 才能使用 spark 提供的函数。结构:LabledPoint lable y featrues x1,x2,x3 定义:LabeledPoint 必须先有有个 Vector。val v=Vector.dense(1,2,3);val y=4 val l=LabeledPoint(y,v)l:org.apache.spark.mllib.regression.LabeledPoint=(4.0,1.0,2.0,3.0)数据:prices.data 825.0 135.00 3 2 997.5 133.00 3 2 1
29、005.0 134.00 3 2 第一列就是 y,后面三列就是一个 Vector。val rdd=sc.textFile(file:/root/prcies.data);rdd.first/获取到第一条记录 res6:String=825.0 1 135.00 3 2 目的就是先要把文本数据变成 LabledPoint,它才能进行下一步运算。val rdd2=rdd.map x=x.split(t)res7:ArrayString=Array(825.0,1,135.00,3,2)val rdd3=rdd2.map x=LabledPoint(x(0).toDouble,Vectors.den
30、se(x(1).toDouble,x(2).toDouble)rdd3.first rdd3:org.apache.spark.mllib.regession.LabledPoint=(825.0,1.0,135.0)1.0 就相当于0,135.0 就相当于1 1.7.3.train 训练 接着把原始数据,将这个 RDD 做训练,求,把任务交给 LinearRegressionWithSGD这个函数就行了。中间这个函数会计算梯度下降,去求那些方程。就调用 train 方法,去训练模型,从而解出的值。参数:数据、迭代次数、学习速率 val model=LinearRegressionWithSG
31、D.train(rdd3,1000,0.0001)它得到的结果就叫一个模型,这个结果就包括了的值。这看起来多简单,我们只需准备数据,spark 把算法已经准备好了,我们给它要的数据、迭代次数和学习速率,它就把结果反馈给我们了。model:org.apache.spark.mllib.regression.LinearRegressionModel=org.apache.spark.mllib.regression.LinearRegressionModel:intercept=0.0,numFeatures=2 model.weights res11:org.apache.spark.mlli
32、b.linalg.Vector=0.04801929334645109,6.94251586995129 0=0.04801929334645109 1=6.94251586995129 可以看到和 octave 中差不了多少,差异主要是这里的0也在变化。求到0、1 都无需带入方程,predict 方法就可以实现预测,model.predict()X0=1 常量值,X1=80 平米大 model.predict(Vectors.dense(1,80)res12:Double=555.4492888894501 上面是 spark 中使用机器学习的例子,使用起来多么的简单。当然这里没有 cost
33、Function,必须自己写一个。0、1 已经计算出来了,我怎么知道这就是最优的呢?你的自己评估误差函数。这个spark 没有提供独立的方法,只能自己实现。1.7.4.验证模型的值是否是最小 均方差,误差函数 w 就是,X 就是 x 集合,y 就是之前的 y。rdd3.mapx=math.pow(x.lable-model.predict(x.features),2).mean()/2 x.lable 就是 y 值 model.predict(x.features),2)预测函数 math.pow 求平方 再求平均值:mean()就对集合中每个元素求平均值 res16:Double=3275.
34、48735910783 在0、1 下计算出来的损失函数,就是 3275。跟我们在 octave 下的计算结果差不多。通过这个就验证了这个模型是不是最小了。得足够小才行。这个在数学中也有个叫法,就叫均方差。在这里叫损失函数 lossFunction 或者 costFunction。1.7.5.其他 cost 函数 如 logistic loss 逻辑回归时,使用的公式就不一样,使用另外一个函数。解决不同的问题 cost 函数也不一样。最后的目的就是监测和真实函数的差异。它们设计出来就是一个碗状的曲线,不然就没有最小值了。1.7.6.小结 1、先要将数据转换为 RDD,然后将 RDD 转换成 La
35、bledPoint 类型 2、训练模型 LinearRegressionWithSGD.train,选择一个算法,包括数据、迭代次数、学习速率 3、训练结果就可以预测了 LinearRegressionWithSGD.predict 4、怎么看这个模型是否好,最后一步,写个误差函数,估算下这个值是否是最小值 1.8.Spark 的实际应用 1.8.1.Spark 在美团的应用 美团是数据驱动的互联网服务,用户每天在美团上的点击、浏览、下单支付行为都会产生海量的日志,这些日志数据将被汇总处理、分析、挖掘与学习,为美团的各种推荐、搜索系统甚至公司战略目标制定提供数据支持。大数据处理渗透到了美团各业
36、务线的各种应用场景,选择合适、高效的数据处理引擎能够大大提高数据生产的效率,进而间接或直接提升相关团队的工作效率。美团最初的数据处理以 Hive SQL 为主,底层计算引擎为 MapReduce,部分相对复杂的业务会由工程师编写 MapReduce 程序实现。随着业务的发展,单纯的 Hive SQL 查询或者 MapReduce 程序已经越来越难以满足数据处理和分析的需求。一方面,MapReduce 计算模型对多轮迭代的 DAG 作业支持不给力,每轮迭代都需要将数据落盘,极大地影响了作业执行效率,另外只提供 Map 和 Reduce 这两种计算因子,使得用户在实现迭代式计算(比如:机器学习算法
37、)时成本高且效率低。另一方面,在数据仓库的按天生产中,由于某些原始日志是半结构化或者非结构化数据,因此,对其进行清洗和转换操作时,需要结合 SQL 查询以及复杂的过程式逻辑处理,这部分工作之前是由 Hive SQL 结合 Python 脚本来完成。这种方式存在效率问题,当数据量比较大的时候,流程的运行时间较长,这些 ETL 流程通常处于比较上游的位置,会直接影响到一系列下游的完成时间以及各种重要数据报表的生成。基于以上原因,美团在 2014 年的时候引入了 Spark。为了充分利用现有 Hadoop 集群的资源,我们采用了 Spark on Yarn 模式,所有的 Spark app 以及 M
38、apReduce 作业会通过 Yarn 统一调度执行。经过近两年的推广和发展,从最开始只有少数团队尝试用 Spark 解决数据处理、机器学习等问题,到现在已经覆盖了美团各大业务线的各种应用场景。从上游的 ETL 生产,到下游的 SQL 查询分析以及机器学习等,Spark 正在逐步替代 MapReduce 作业,成为美团大数据处理的主流计算引擎。目前美团 Hadoop集群用户每天提交的 Spark作业数和 MapReduce作业数比例为 4:1,对于一些上游的 Hive ETL 流程,迁移到 Spark 之后,在相同的资源使用情况下,作业执行速度提升了十倍,极大地提升了业务方的生产效率。Spar
39、k 交互式开发平台 在推广如何使用 Spark 的过程中,我们总结了用户开发应用的主要需求:数据调研:在正式开发程序之前,首先需要认识待处理的业务数据,包括:数据格式,类型(若以表结构存储则对应到字段类型)、存储方式、有无脏数据,甚至分析根据业务逻辑实现是否可能存在数据倾斜等等。这个需求十分基础且重要,只有对数据有充分的掌控,才能写出高效的 Spark 代码;代码调试:业务的编码实现很难保证一蹴而就,可能需要不断地调试;如果每次少量的修改,测试代码都需要经过编译、打包、提交线上,会对用户的开发效率影响是非常大的;联合开发:对于一整个业务的实现,一般会有多方的协作,这时候需要能有一个方便的代码和
40、执行结果共享的途径,用于分享各自的想法和试验结论。基于这些需求,我们调研了现有的开源系统,最终选择了 Apache 的孵化项目 Zeppelin,将其作为基于 Spark 的交互式开发平台。Zeppelin 整合了 Spark,Markdown,Shell,Angular 等引擎,集成了数据分析和可视化等功能。我们在原生的 Zeppelin 上增加了用户登陆认证、用户行为日志审计、权限管理以及执行 Spark 作业资源隔离,打造了一个美团的 Spark 的交互式开发平台,不同的用户可以在该平台上调研数据、调试程序、共享代码和结论。集成在 Zeppelin 的 Spark 提供了三种解释器:Sp
41、ark、Pyspark、SQL,分别适用于编写 Scala、Python、SQL 代码。对于上述的数据调研需求,无论是程序设计之初,还是编码实现过程中,当需要检索数据信息时,通过 Zeppelin 提供的 SQL 接口可以很便利的获取到分析结果;另外,Zeppelin 中 Scala 和 Python 解释器自身的交互式特性满足了用户对Spark 和 Pyspark 分步调试的需求,同时由于 Zeppelin 可以直接连接线上集群,因此可以满足用户对线上数据的读写处理请求;最后,Zeppelin 使用 Web Socket 通信,用户只需要简单地发送要分享内容所在的 http 链接,所有接受者
42、就可以同步感知代码修改,运行结果等,实现多个开发者协同工作。1.8.2.优酷土豆应用 Spark 完善大数据分析案例 大数据,一个似乎已经被媒体传播的过于泛滥的词汇,的的确确又在逐渐影响和改变着我们的生活。也许有人认为大数据在中国仍然只是噱头,但在当前中国互联网领域,大数据以及大数据所催生出来的生产力正在潜移默化地推动业务发展,并为广大中国网民提供更加优秀的服务。优酷土豆作为国内最大的视频网站,和国内其他互联网巨头一样,率先看到大数据对公司业务的价值,早在 2009 年就开始使用 Hadoop 集群,随着这些年业务迅猛发展,优酷土豆又率先尝试了仍处于大数据前沿领域的 Spark/Shark 内
43、存计算框架,很好地解决了机器学习和图计算多次迭代的瓶颈问题,使得公司大数据分析更加完善。MapReduce 之痛 提到大数据,自然不能不提 Hadoop。HDFS 已然成为大数据公认的存储,而 MapReduce作为其搭配的数据处理框架在大数据发展的早期表现出了重大的价值。可由于其设计上的约束 MapReduce 只适合处理离线计算,其在实时性上仍有较大的不足,随着业务的发展,业界对实时性和准确性有更多的需求,很明显单纯依靠 MapReduce 框架已经不能满足业务的需求了。优酷土豆集团大数据团队技术总监卢学裕就表示:“现在我们使用 Hadoop 处理一些问题诸如迭代式计算,每次对磁盘和网络的
44、开销相当大。尤其每一次迭代计算都将结果要写到磁盘再读回来,另外计算的中间结果还需要三个备份,这其实是浪费。”图一:Hadoop 中的数据传送与共享,串行方式、复制以及磁盘 IO 等因素使得 Hadoop集群在低延迟、实时计算方面表现有待改进。据悉,优酷土豆的 Hadoop 大数据平台是从 2009 年开始采用,最初只有 10 多个节点,2012 年集群节点达到 150 个,2013 年更是达到 300 个,每天处理数据量达到 200TB。优酷土豆鉴于 Hadoop 集群已经逐渐胜任不了一些应用,于是决定引入 Spark/Shark 内存计算框架,以此来满足图计算迭代等的需求。Spark 是一个
45、通用的并行计算框架,由伯克利大学的 AMP 实验室开发,Spark 已经成为继 Hadoop 之后又一大热门开源项目,目前已经有英特尔等企业加入到该开源项目。图二:Spark 内存计算框架使得数据共享比网络和磁盘快 10 倍到 100 倍。“我们大数据平台对快速需求的响应延时,尤其是在商业智能 BI 以及产品研究分析等需要多次对大数据做 Drill Down 与 Drill Up 时,等待成了效率杀手。”优酷土豆集团大数据团队技术总监卢学裕表示。用 Spark/Shark 完善大数据分析 目前大数据在互联网公司主要应用在广告、报表、推荐系统等业务上。在广告业务方面需要大数据做应用分析、效果分析
46、、定向优化等,在推荐系统方面则需要大数据优化相关排名、个性化推荐以及热点点击分析等。优酷土豆属于典型的互联网公司,目前运用大数据分析平台的主要工作是运营分析、机器学习、广告定向优化、搜索优化等方面。优酷土豆集团大数据团队技术总监卢学裕表示:“优酷土豆的大数据平台已经用了很多年,突出问题主要包括:第一是商业智能 BI 方面,公司的分析师提交任务之后需要等待很久才得到结果;第二就是大数据量计算,比如进行一些模拟广告投放之时,计算量非常大的同时对效率要求也比较高,用 Hadoop 消耗资源非常大而且响应比较慢;最后就是机器学习和图计算的迭代运算也是需要耗费大量资源且速度很慢。”因此,面对复杂任务、交
47、互式查询以及流在线处理时,Hadoop 与 MapReduce 并不适用。Spark/Shark 这种内存型计算框架则比较适合各种迭代算法和交互式数据分析,可每次将弹性分布式数据集(RDD)操作之后的结果存入内存中,下次操作可直接从内存中读取,省去了大量的磁盘 IO,效率也随之大幅提升。优酷土豆集团大数据团队大数据平台架构师傅杰表示:“一些应用场景并不适合在 MapReduce 里面去处理。通过对比,我们发现 Spark性能比 MapReduce 提升很多。”图三:Spark/Shark 内存计算框架实时日志聚合处理。“比如在图计算方面,视频与视频之间存在的相似关系,这就构成了一个图谱,通过图
48、谱来做聚类,再给用户做视频推荐。”优酷土豆集团大数据团队技术总监卢学裕表示。图四:图计算分析 N 度关联算法示意图。优酷土豆集团大数据团队技术总监卢学裕表示:“我们进行过图计算方面的测试,在 4台节点的 Spark 集群上用时只有 5.6 分钟,而同规模的数据量,单机实现需要 80 多分钟,并且内存吃满,单机无法实现 Scale-Out,不能计算更大规模数据。”“在今天,数据处理要求非常快。比如优酷土豆的一些客户、广告商往往临时就需要看一下投放效果。所以在前端应用不变的情况下,如果能更快的响应市场的需要就变得很有竞争力。市场是瞬息万变的,有一些分析结果也需要快速响应成一个产品,Spark 集成
49、到数据平台正能发挥这样的效果。”优酷土豆集团大数据团队大数据平台架构师傅杰补充道。据了解,优酷土豆采用 Spark/Shark 大数据计算框架得到了英特尔公司的帮助,起初优酷土豆并不熟悉 Spark 以及 Scala 语言,英特尔帮助优酷土豆设计出具体符合业务需求的解决方案,并协助优酷土豆实现了该方案。此外,英特尔还给优酷土豆的大数据团队进行了 Scala 语言、Spark 的培训等。“优酷土豆作为国内视频行业第一家商用部署 Spark/Shark 方案的公司,从视频行业的多样化分析角度来看是个非常好的方案。未来,英特尔将会继续与优酷土豆在Spark/Shark 进行合作,包括硬件配置的优化以
50、及整体方案的优化等”英特尔(中国)有限公司销售市场部互联网及媒体行业企业客户经理李志辉介绍道。1.8.3.分布式计算 Mesos 框架和应用 Mesos2009 年诞生于伯克利 AMPlab,用于构建和运行其他分布式系统的分布式系统,可以毫不费力得支撑 3 万台服务器。Mesos 广泛应用于 Twitter、eBay、Vimeo 等公司,支持 Hadoop、Spark、Storm、Jenkins、Marathon、Chronos 等多种框架,同时支持自行构建架构,例如视频转码。为什么 Mesos 强大?1.Protocol buffer Protocal Buffer 是 google 开源的