《Spark 20基础练习-精品文档资料.docx》由会员分享,可在线阅读,更多相关《Spark 20基础练习-精品文档资料.docx(17页珍藏版)》请在taowenge.com淘文阁网|工程机械CAD图纸|机械工程制图|CAD装配图下载|SolidWorks_CaTia_CAD_UG_PROE_设计图分享下载上搜索。
1、Spark Core基本练习Initializing Sparkval conf = new SparkConf().setAppName(appName).setMaster(master)new SparkContext(conf)inteljia ideaval conf = new SparkConf() val sc = new SparkContext(conf) val lines = sc.textFile(file:/home/training/test.txt); val words = lines.flatMap(_.split( ).map(word = (word,1
2、).reduceByKey(_+_)words.collect().foreach(print)Using the Shell$ ./bin/spark-shell -master local4$ ./bin/spark-shell -master local4 -jars code.jarResilient Distributed Datasets (RDDs)val distFile = sc.textFile(data.txt)RDD Operationsval lines = sc.textFile(data.txt)val lineLengths = lines.map(s = s.
3、length)val totalLength = lineLengths.reduce(a, b) = a + b)WordCountval rdd=sc.textFile(/home/training/software/spark/examples/src/main/resources/people.txt)val rdd = sc.textFile(hdfs:/localhost:9000/testspark.txt)rdd.cache()val wordcount=rdd.flatMap(_.split( ).map(x=(x,1).reduceByKey(_+_)Spark-Submi
4、t 实战spark-submit -class org.apache.spark.examples.SparkPi -executor-memory 256m -total-executor-cores 2 /home/training/software/spark/examples/jars/spark-examples_2.11-2.0.1.jar 200Accumulators:scala val accum = sc.longAccumulator(My Accumulator)accum: org.apache.spark.util.LongAccumulator = LongAcc
5、umulator(id: 0, name: Some(My Accumulator), value: 0)scala sc.parallelize(Array(1, 2, 3, 4).foreach(x = accum.add(x).10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 sscala accum.valueres2: Long = 10Spark SQL 基本操作DataFrame Operationsimport org.apache.spark.sql.SparkSessionval spark =
6、SparkSession.builder().appName(Spark SQL basic example).config(spark.some.config.option, some-value).getOrCreate()import spark.implicits._val df = spark.read.json(file:/home/training/software/spark/examples/src/main/resources/people.json)df.show()/ This import is needed to use the $-notationimport s
7、park.implicits._/ Print the schema in a tree formatdf.printSchema()df.select(name).show()df.select($name, $age + 1).show()df.filter($age 21).show()df.groupBy(age).count().show()/ Register the DataFrame as a SQL temporary viewdf.createOrReplaceTempView(people)val sqlDF = spark.sql(SELECT * FROM peopl
8、e)sqlDF.show()/ Register the DataFrame as a global temporary viewspark.sql(SELECT * FROM people).show()/ +-+-+DataSets/ Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,/ you can use custom classes that implement the Product interfacecase class Person(name
9、: String, age: Long)/ Encoders are created for case classesval caseClassDS = Seq(Person(Andy, 32).toDS()caseClassDS.show()/ +-+-+/ |name|age|/ +-+-+/ |Andy| 32|/ +-+-+/ Encoders for most common types are automatically provided by importing spark.implicits._val primitiveDS = Seq(1, 2, 3).toDS()primit
10、iveDS.map(_ + 1).collect() / Returns: Array(2, 3, 4)/ DataFrames can be converted to a Dataset by providing a class. Mapping will be done by nameval path = examples/src/main/resources/people.jsonval peopleDS = spark.read.json(path).asPersonpeopleDS.show()/ For implicit conversions from RDDs to DataF
11、ramesimport spark.implicits._/ Create an RDD of Person objects from a text file, convert it to a Dataframeval peopleDF = spark.sparkContext .textFile(examples/src/main/resources/people.txt) .map(_.split(,) .map(attributes = Person(attributes(0), attributes(1).trim.toInt) .toDF()/ Register the DataFr
12、ame as a temporary viewpeopleDF.createOrReplaceTempView(people)/ SQL statements can be run by using the sql methods provided by Sparkval teenagersDF = spark.sql(SELECT name, age FROM people WHERE age BETWEEN 13 AND 19)/ The columns of a row in the result can be accessed by field indexteenagersDF.map
13、(teenager = Name: + teenager(0).show()/ +-+/ | value|/ +-+/ |Name: Justin|/ +-+/ or by field nameteenagersDF.map(teenager = Name: + teenager.getAsString(name).show()/ +-+/ | value|/ +-+/ |Name: Justin|/ +-+/ No pre-defined encoders for DatasetMapK,V, define explicitlyimplicit val mapEncoder = org.ap
14、ache.spark.sql.Encoders.kryoMapString, Any/ Primitive types and case classes can be also defined as/ implicit val stringIntMapEncoder: EncoderMapString, Any = ExpressionEncoder()/ row.getValuesMapT retrieves multiple columns at once into a MapString, TteenagersDF.map(teenager = teenager.getValuesMap
15、Any(List(name, age).collect()/ Array(Map(name - Justin, age - 19)import org.apache.spark.sql.types._/ Create an RDDval peopleRDD = spark.sparkContext.textFile(examples/src/main/resources/people.txt)/ The schema is encoded in a stringval schemaString = name age/ Generate the schema based on the strin
16、g of schemaval fields = schemaString.split( ) .map(fieldName = StructField(fieldName, StringType, nullable = true)val schema = StructType(fields)/ Convert records of the RDD (people) to Rowsval rowRDD = peopleRDD .map(_.split(,) .map(attributes = Row(attributes(0), attributes(1).trim)/ Apply the sch
17、ema to the RDDval peopleDF = spark.createDataFrame(rowRDD, schema)/ Creates a temporary view using the DataFramepeopleDF.createOrReplaceTempView(people)/ SQL can be run over a temporary view created using DataFramesval results = spark.sql(SELECT name FROM people)/ The results of SQL queries are Data
18、Frames and support all the normal RDD operations/ The columns of a row in the result can be accessed by field index or by field nameresults.map(attributes = Name: + attributes(0).show()/ +-+/ | value|/ +-+/ |Name: Michael|/ | Name: Andy|/ | Name: Justin|/ +-+Data SourcesGeneric Load/Save Functionsva
19、l usersDF = spark.read.load(file:/home/training/software/spark/examples/src/main/resources/users.parquet)usersDF.select(name,favorite_color).write.save(namesAndFavColors.parquet)Manually Specifying Optionsval peopleDF = spark.read.format(json).load(file:/home/training/software/spark/examples/src/mai
20、n/resources/people.json)peopleDF.select(name, age).write.format(parquet).save(namesAndAges.parquet)Run SQL on files directlyval sqlDF = spark.sql(SELECT * FROM parquet.examples/src/main/resources/users.parquet)Bucketing, Sorting and Partitioning:peopleDF.write.bucketBy(42, name).sortBy(age).saveAsTa
21、ble(people_bucketed)CREATE TABLE users_bucketed_by_name( name STRING, favorite_color STRING, favorite_numbers array) USING parquet CLUSTERED BY(name) INTO 42 BUCKETS;peopleDF.write.partitionBy(favorite_color).bucketBy(42, name) .saveAsTable(people_partitioned_bucketed)CREATE TABLE users_bucketed_and
22、_partitioned( name STRING, favorite_color STRING, favorite_numbers array) USING parquet PARTITIONED BY (favorite_color)CLUSTERED BY(name) SORTED BY (favorite_numbers) INTO 42 BUCKETS;Parquet Files/ Encoders for most common types are automatically provided by importing spark.implicits._import spark.i
23、mplicits._val peopleDF = spark.read.json(/home/training/software/spark/examples/src/main/resources/people.json)/ DataFrames can be saved as Parquet files, maintaining the schema informationpeopleDF.write.parquet(people.parquet)/ Read in the parquet file created above/ Parquet files are self-describi
24、ng so the schema is preserved/ The result of loading a Parquet file is also a DataFrameval parquetFileDF = spark.read.parquet(people.parquet)/ Parquet files can also be used to create a temporary view and then used in SQL statementsparquetFileDF.createOrReplaceTempView(parquetFile)val namesDF = spar
25、k.sql(SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19)namesDF.map(attributes = Name: + attributes(0).show()CREATE TEMPORARY VIEW parquetTableUSING org.apache.spark.sql.parquetOPTIONS ( path examples/src/main/resources/people.parquet)SELECT * FROM parquetTableJSON Datasets/ Primitive types (
26、Int, String, etc) and Product types (case classes) encoders are/ supported by importing this when creating a Dataset.import spark.implicits._/ A JSON dataset is pointed to by path./ The path can be either a single text file or a directory storing text filesval path = /home/training/software/spark/ex
27、amples/src/main/resources/people.jsonval peopleDF = spark.read.json(path)/ The inferred schema can be visualized using the printSchema() methodpeopleDF.printSchema()/ root/ |- age: long (nullable = true)/ |- name: string (nullable = true)/ Creates a temporary view using the DataFramepeopleDF.createO
28、rReplaceTempView(people)/ SQL statements can be run by using the sql methods provided by sparkval teenagerNamesDF = spark.sql(SELECT name FROM people WHERE age BETWEEN 13 AND 19)teenagerNamesDF.show()/ +-+/ | name|/ +-+/ |Justin|/ +-+l Spark SQL 操作Hive数据源Spark Streaming 基本操作import org.apache.spark._
29、import org.apache.spark.streaming._import org.apache.spark.streaming.StreamingContext._ / not necessary since Spark 1.3/ Create a local StreamingContext with two working thread and batch interval of 1 second./ The master requires 2 cores to prevent from a starvation scenario.val conf = new SparkConf
30、().setMaster(local2).setAppName(NetworkWordCount)val ssc = new StreamingContext(conf, Seconds(1)val ssc = new StreamingContext(sc, Seconds(1)/ Create a DStream that will connect to hostname:port, like localhost:9999val lines = ssc.socketTextStream(elephant, 9999)/ Split each line into wordsval words
31、 = lines.flatMap(_.split( )import org.apache.spark.streaming.StreamingContext._ / not necessary since Spark 1.3/ Count each word in each batchval pairs = words.map(word = (word, 1)val wordCounts = pairs.reduceByKey(_ + _)/ Print the first ten elements of each RDD generated in this DStream to the con
32、solewordCounts.print()ssc.start() / Start the computationssc.awaitTermination() / Wait for the computation to termina$ nc -lk 9999$ ./bin/run-example streaming.NetworkWordCount localhost 9999Spark MLlib 基本操作kmeansimport org.apache.log4j.Level, Loggerimport org.apache.spark.SparkConf, SparkContextimp
33、ort org.apache.spark.mllib.clustering.KMeansimport org.apache.spark.mllib.linalg.Vectors / 屏蔽不必要的日志显示在终端上 Logger.getLogger(org.apache.spark).setLevel(Level.WARN) Logger.getLogger(org.eclipse.jetty.server).setLevel(Level.OFF) / 装载数据集val data = sc.textFile(/home/training/software/spark/data/mllib/kmea
34、ns_data.txt, 1) val parsedData = data.map(s = Vectors.dense(s.split( ).map(_.toDouble) / 将数据集聚类,2个类,20次迭代,进行模型训练形成数据模型 val numClusters = 2 val numIterations = 20 val model = KMeans.train(parsedData, numClusters, numIterations) / 打印数据模型的中心点 println(Cluster centers:) for (c val parts = line.split(,) L
35、abeledPoint(parts(0).toDouble, Vectors.dense(parts(1).split( ).map(_.toDouble) / Building the model val numIterations = 100 val model = LinearRegressionWithSGD.train(parsedData, numIterations) / Evaluate model on training examples and compute training error val valuesAndPreds = parsedData.map point
36、= val prediction = model.predict(point.features) (point.label, prediction) val MSE = valuesAndPreds.map case(v, p) = math.pow(v - p), 2).reduce (_ + _) / valuesAndPreds.countprintln(training Mean Squared Error = + MSE)spark-default.confspark.dynamicAllocation.enabled truespark.shuffle.service.enabled truespark.dynamicAllocation.minExecutors 0spark.dynamicAllocation.maxExecutors 20 yarn.nodemanager.aux-services mapreduce_shuffle,spark_shuffle yarn.nodemanager.aux-services.spark_shuffle.class work.yarn.YarnShuffleService