jjzjj

Note_Spark_Day02:Spark 基础环境

Maynor学长 2023-03-28 原文
Spark Day02:Spark 基础环境(二) Hadoop3.0-HDFS https://www.bilibili.com/video/BV1yX4y1K7Lq Hadoop3.0-MapReduce https://www.bilibili.com/video/BV1Tf4y167U8 Hadoop3.0-yarn https://www.bilibili.com/video/BV1wh411S76Z

01-[了解]-上次课程内容回顾

主要讲解2个方面的内容:Spark 框架概述和Spark 快速入门。

1、Spark 框架概述 - Spark 框架诞生背景 加州大学、伯克利分校、APMLab实验室、2009年 - Spark 框架功能(官方定义),类似MapReduce框架,分析处理数据 Apache Spark™ is a unified analytics engine for large-scale data processing. 分析引擎、统一的(任意类型分析基本都可以完成)、大规模数据集(海量数据) - Spark 发展史 2009年、2010年发布论文(RDD)、2014年(1.0)、2016年(2.0)、2020年(3.0) - Spark 官方四个特性 快Speed,与MapReduce相比较,2个方面比较 统一 支持多语言,Scala、Java、Python、R、SQL - 框架模块 Core、SQL、Streaming(StructuredStreaming)、MLlib及GraphX、PySpark和SparkR等 - 运行方式 本地模型运行(1JVM进程,运行Task,线程方式)、集群模式运行和容器(云端):K8s 2、Spark 快速入门 - 环境准备 导入虚拟机、基本配置 Spark 框架基本配置(设置):解压、设置JAVA和Scala环境变量 - spark-shell 本地模式运行交互式命令行 $SPARK_HOME/bin/spark-shell --master local[2] - 经典案例:词频统计WordCount map\flatMap reduceByKey 数据结构:RDD,认为就是一个集合,比如列表List,存储很多数据,调用高价函数处理数据 - 圆周率PI 使用提交命令:spark-submit --class xxx --master yyyy xxx.jar parameter

02-[了解]-今日课程内容提纲

讲解2个方面的内容:Standalone集群模式和使用IDEA开发应用程序。

1、Standalone 集群 Spark框架自身提供类似Hadoop YARN分布式集群资源管理集群Standalone功能,管理集群资源和分配资源运行Spark应用程序。 集群架构组成,类似Hadoop YARN集群架构 配置、部署、启动和测试 Spark应用运行在集群上架构组成 Spark 应用运行WEB UI监控 2、IDEA应用开发,编写入门案例词频统计 创建Maven Project SparkContext实例创建 WordCount代码编写 使用spark-submit提交应用执行

03-[掌握]-Standalone集群【架构组成】

​ Spark Stanadlone集群类似Hadoop YARN集群功能,管理整个集群中资源(CUP Core核数、内存Memory、磁盘Disk、网络带宽等)

Standalone集群使用了分布式计算中的master-slave模型,master是集群中含有Master进程的节点,slave是集群中的Worker节点含有Executor进程。

  • Standalone集群主从架构:Master-Slave
  • 主节点:老大,管理者,Master
  • 从节点:小弟,干活的,Workers
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-q6rRP9ve-1618964047915)(/img/image-20210420150910411.png)]

Spark Standalone集群,类似Hadoop YARN,管理集群资源和调度资源:

  • Master,管理整个集群资源,接收提交应用,分配资源给每个应用,运行Task任务
  • Worker,管理每个机器的资源,分配对应的资源来运行Task;每个从节点分配资源信息给Worker管理,资源信息包含内存Memory和CPU Cores核数
  • HistoryServer,Spark Application运行完成以后,保存事件日志数据至HDFS,启动HistoryServer可以查
    看应用运行相关信息。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-9tbOm9N1-1618964047916)(/img/image-20210420151117271.png)]

04-[掌握]-Standalone 集群【配置和部署】

Standalone集群安装服务规划与资源配置:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-b11aPtDw-1618964047918)(/img/image-20210420151348915.png)]

需要将三台虚拟机,全部恢复到【04、分布式集群环境】快照。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-MtgU5XoS-1618964047919)(/img/image-20210420151536109.png)]

按照讲义上步骤进行配置即可,具体步骤如下:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-E26MVOim-1618964047920)(/img/image-20210420151622616.png)]

05-[掌握]-Standalone 集群【服务启动和运行应用】

​ 在Master节点node1.itcast.cn上启动,进入$SPARK_HOME,必须配置主节点到所有从节点的SSH无密钥登录,集群各个机器时间同步

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-jLfxx50Q-1618964047923)(/img/image-20210420153805851.png)]

  • 主节点Master启动命令
[root@node1 ~]# /export/server/spark/sbin/start-master.sh starting org.apache.spark.deploy.master.Master, logging to /export/server/spark/logs/spark-root-org.apache.spark.deploy.master.Master-1-node1.itcast.cn.out [root@node1 ~]# [root@node1 ~]# jps 15076 DataNode 15497 Master 15545 Jps 14973 NameNode WEB UI页面地址:http://node1.itcast.cn:8080

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-3q5oRn5F-1618964047924)(/img/image-20210420154048592.png)]

  • 从节点Workers启动命令
/export/server/spark/sbin/start-slaves.sh [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-uNOavcbM-1618964047924)(/img/image-20210420154139476.png)]

  • 历史服务器HistoryServer
/export/server/spark/sbin/start-history-server.sh WEB UI页面地址:http://node1.itcast.cn:18080

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-5yIllLKz-1618964047925)(/img/image-20210420154316602.png)]

​ 将上述运行在Local Mode的圆周率PI程序,运行在Standalone集群上,修改【--master】地址为Standalone集群地址:spark://node1.itcast.cn:7077,具体命令如下:

SPARK_HOME=/export/server/spark ${SPARK_HOME}/bin/spark-submit \ --master spark://node1.itcast.cn:7077 \ --class org.apache.spark.examples.SparkPi \ ${SPARK_HOME}/examples/jars/spark-examples_2.11-2.4.5.jar \ 10 查看Master主节点WEB UI界面:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-PCqcO9vX-1618964047925)(/img/image-20210420154550512.png)]

06-[掌握]-Spark 应用架构组成

登录到Spark HistoryServer历史服务器WEB UI界面,点击刚刚运行圆周率PI程序:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-h1kn71uP-1618964047926)(/img/image-20210420154704019.png)]

切换到【Executors】Tab页面:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-sSpL3blj-1618964047926)(/img/image-20210420154832210.png)]

从图中可以看到Spark Application运行到集群上时,由两部分组成:Driver Program和Executors

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-B6mJ97Zo-1618964047927)(/img/image-20210420155259800.png)]

每个Executor相当于线程池,每个线程运行Task任务,需要1Core CPU。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-rYMFd4wh-1618964047927)(/img/image-20210420155515175.png)]

  • 第一、Driver Program
    • 相当于AppMaster,整个应用管理者,负责应用中所有Job的调度执行;
    • 运行JVM Process,运行程序的MAIN函数,必须创建SparkContext上下文对象;
    • 一个SparkApplication仅有一个;
  • 第二、Executors
    • 相当于一个线程池,运行JVM Process,其中有很多线程,每个线程运行一个Task任务,
      一个Task运行需要1 Core CPU,所有可以认为Executor中线程数就等于CPU Core核数;
    • 一个Spark Application可以有多个,可以设置个数和资源信息;
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-fvMXnl6y-1618964047928)(/img/image-20210420155723115.png)]

07-[掌握]-Spark 应用WEB UI 监控

Spark 提供了多个监控界面,当运行Spark任务后可以直接在网页对各种信息进行监控查看。

运行spark-shell交互式命令在Standalone集群上,命令如下:

/export/server/spark/bin/spark-shell --master spark://node1.itcast.cn:7077 [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-teZExosk-1618964047928)(/img/image-20210420155940596.png)]

在spark-shell中执行词频统计WordCount程序代码,运行如下:

val inputRDD = sc.textFile("/datas/wordcount.data") val wordcountsRDD = inputRDD.flatMap(line => line.split("\\s+")).map(word => (word, 1)).reduceByKey((tmp, item) => tmp +item) wordcountsRDD.take(5) 截图如下:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-5J1ZwOuk-1618964047929)(/img/image-20210420160308792.png)]

可以发现在一个Spark Application中,包含多个Job,每个Job有多个Stage组成,每个Job执行按照DAG图进行的。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-RjQsM5wG-1618964047929)(/img/image-20210420160356244.png)]

其中每个Stage中包含多个Task任务,每个Task以线程Thread方式执行,需要1Core CPU。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-HAoZO8qn-1618964047929)(/img/image-20210420160508614.png)]

Spark Application程序运行时三个核心概念:Job、Stage、Task,说明如下:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-kH5wyJxM-1618964047930)(/img/image-20210420160752870.png)]

Job和Stage及Task之间关系:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-SYn7JPZB-1618964047930)(/img/image-20210420160808463.png)]

08-[理解]-Standalone 集群【Standalone HA】

Spark Standalone集群是Master-Slaves架构的集群模式,和大部分的Master-Slaves结构集群一样,存在着Master单点故障(SPOF:single Point of Failover)的问题。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-dsbv7BV8-1618964047931)(/img/image-20210420161334636.png)]

ZooKeeper提供了一个Leader Election机制,利用这个机制可以保证虽然集群存在多个Master,但是只有一个是Active的,其他的都是Standby。当Active的Master出现故障时,另外的一个Standby Master会被选举出来。

使用Zookeeper集群:选举leader、监控leader

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-emuvLbgT-1618964047931)(/img/image-20210420161458094.png)]

基于Zookeeper实现HA:http://spark.apache.org/docs/2.4.5/spark-standalone.html#high-availability

09-[掌握]-IDEA 应用开发【构建Maven Project】

Spark课程代码,创建一个Maven Project工程,每天创建Maven Module模块,方便复习。

创建Maven Project工程【bigdata-spark_2.11】,设置GAV三要素的值如下:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-AVlOJseH-1618964047931)(/img/image-20210420164518995.png)]

创建Maven Module模块【spark-chapter01_2.11】,对应的GAV三要素值如下:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-xXLlDdD1-1618964047932)(/img/image-20210420165049405.png)]

至此,将Maven Module模块创建完成,可以开始编写第一个Spark程序。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-xYiOoMVl-1618964047932)(/img/image-20210420165439243.png)]

10-[掌握]-IDEA 应用开发【应用入口SparkContext】

Spark Application程序入口为:SparkContext,任何一个应用首先需要构建SparkContext对象,如下两步构建:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-l1A1WJMC-1618964047933)(/img/image-20210420165942766.png)]

11-[掌握]-IDEA 应用开发【编程实现:WordCount】

​ 从HDFS上读取数据,所以需要将HDFS Client配置文件放入到Maven Module资源目录下,同时设置应用运行时日志信息。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-d9teejcw-1618964047933)(/img/image-20210420170446831.png)]

完整代码如下:

package cn.itcast.spark.start import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} /** * 使用Spark实现词频统计WordCount程序 */ object SparkWordCount { def main(args: Array[String]): Unit = { // TODO: 创建SparkContext实例对象,首先构建SparkConf实例,设置应用基本信息 val sc: SparkContext = { // 其一、构建SparkConf对象,设置应用名称和master val sparkConf: SparkConf = new SparkConf() .setAppName("SparkWordCount") .setMaster("local[2]") // 其二、创建SparkContext实例,传递sparkConf对象 new SparkContext(sparkConf) } // TODO: 第一步、从HDFS读取文件数据,sc.textFile方法,将数据封装到RDD中 val inputRDD: RDD[String] = sc.textFile("/datas/wordcount.data") // TODO: 第二步、调用RDD中高阶函数,进行处理转换处理,函数:flapMap、map和reduceByKey /* mapreduce spark spark hive | flatMap() = map + flatten mapreduce spark spark hive |map mapreduce,1 spark,1 spark,1 hive,1 | reduceByKey spark, 2 mapreduce, 1 hive, 1 */ val resultRDD: RDD[(String, Int)] = inputRDD // 按照分隔符分割单词 .flatMap(line => line.split("\\s+")) // 转换单词为二元组,表示每个单词出现一次 .map(word => word -> 1) // 按照单词分组,对组内执进行聚合reduce操作,求和 .reduceByKey((tmp, item) => tmp + item) // TODO: 第三步、将最终处理结果RDD保存到HDFS或打印控制台 resultRDD.saveAsTextFile("/datas/spark-wordcount") resultRDD.foreach(tuple => println(tuple)) // 为了查看应用监控,可以让进程休眠 Thread.sleep(100000) // 应用结束,关闭资源 sc.stop() } }

12-[掌握]-IDEA 应用开发【编程实现:TopKey】

​ 在上述词频统计WordCount代码基础上,对统计出的每个单词的词频Count,按照降序排序,获取词频次数最多Top3单词

数据结构RDD中关于排序函数有如下三个:

  • 1)、sortByKey:针对RDD中数据类型key/value对时,按照Key进行排序
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-2PmCdUQ6-1618964047934)(/img/image-20210420172648581.png)]

  • 2)、sortBy:针对RDD中数据指定排序规则
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-nWcp7xo4-1618964047934)(/img/image-20210420172724243.png)]

  • 3)、top:按照RDD中数据采用降序方式排序,如果是Key/Value对,按照Key降序排序
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-0F0ldmgf-1618964047934)(/img/image-20210420172810137.png)]

具体演示代码如下,建议使用sortByKey函数进行数据排序操作,慎用top函数。

package cn.itcast.spark.top import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} /** * 使用Spark实现词频统计WordCount程序,按照词频降序排序 */ object SparkTopKey { def main(args: Array[String]): Unit = { // TODO: 创建SparkContext实例对象,首先构建SparkConf实例,设置应用基本信息 val sc: SparkContext = { // 其一、构建SparkConf对象,设置应用名称和master val sparkConf: SparkConf = new SparkConf() .setAppName("SparkWordCount") .setMaster("local[2]") // 其二、创建SparkContext实例,传递sparkConf对象 new SparkContext(sparkConf) } // TODO: 第一步、从HDFS读取文件数据,sc.textFile方法,将数据封装到RDD中 val inputRDD: RDD[String] = sc.textFile("/datas/wordcount.data") // TODO: 第二步、调用RDD中高阶函数,进行处理转换处理,函数:flapMap、map和reduceByKey /* mapreduce spark spark hive | flatMap() = map + flatten mapreduce spark spark hive |map mapreduce,1 spark,1 spark,1 hive,1 | reduceByKey spark, 2 mapreduce, 1 hive, 1 */ val resultRDD: RDD[(String, Int)] = inputRDD // 按照分隔符分割单词 .flatMap(line => line.split("\\s+")) // 转换单词为二元组,表示每个单词出现一次 .map(word => word -> 1) // 按照单词分组,对组内执进行聚合reduce操作,求和 .reduceByKey((tmp, item) => tmp + item) // TODO: 第三步、将最终处理结果RDD保存到HDFS或打印控制台 /* (spark,11) (hadoop,3) (hive,6) (hdfs,2) (mapreduce,4) (sql,2) */ resultRDD.foreach(tuple => println(tuple)) println("===========================") // =========================== sortByKey ========================= resultRDD // 将单词和词频互换 .map(tuple => tuple.swap) // (tuple => (tuple._2, tuple._1)) // 调用sortByKey安装,按照Key进行排序,设置降序排序 .sortByKey(ascending = false) // 打印结果 .take(3) .foreach(tuple => println(tuple)) println("===========================") // =========================== sortBy ========================= /* def sortBy[K]( f: (T) => K, // 指定排序规则 ascending: Boolean = true, numPartitions: Int = this.partitions.length ) (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] */ resultRDD .sortBy(tuple => tuple._2, ascending = false) // 打印结果 .take(3) .foreach(tuple => println(tuple)) println("===========================") // =========================== top ========================= /* def top(num: Int)(implicit ord: Ordering[T]): Array[T] */ resultRDD .top(3)(Ordering.by(tuple => - tuple._2)) .foreach(tuple => println(tuple)) // 为了查看应用监控,可以让进程休眠 Thread.sleep(100000) // 应用结束,关闭资源 sc.stop() } }

13-[理解]-Spark 应用提交命令【spark-submit】

​ 使用IDEA集成开发工具开发测试Spark Application程序以后,类似MapReduce程序一样,打成jar包,使用命令【spark-submit】提交应用的执行,提交命令帮助文档:

[root@node1 ~]# /export/server/spark/bin/spark-submit --help Usage: spark-submit [options] <app jar | python file | R file> [app arguments] Usage: spark-submit --kill [submission ID] --master [spark://...] Usage: spark-submit --status [submission ID] --master [spark://...] Usage: spark-submit run-example [options] example-class [example args] Options: --master MASTER_URL spark://host:port, mesos://host:port, yarn, k8s://https://host:port, or local (Default: local[*]). --deploy-mode DEPLOY_MODE Whether to launch the driver program locally ("client") or on one of the worker machines inside the cluster ("cluster") (Default: client). --class CLASS_NAME Your application's main class (for Java / Scala apps). --name NAME A name of your application. --jars JARS Comma-separated list of jars to include on the driver and executor classpaths. --packages Comma-separated list of maven coordinates of jars to include on the driver and executor classpaths. Will search the local maven repo, then maven central and any additional remote repositories given by --repositories. The format for the coordinates should be groupId:artifactId:version. --exclude-packages Comma-separated list of groupId:artifactId, to exclude while resolving the dependencies provided in --packages to avoid dependency conflicts. --repositories Comma-separated list of additional remote repositories to search for the maven coordinates given with --packages. --py-files PY_FILES Comma-separated list of .zip, .egg, or .py files to place on the PYTHONPATH for Python apps. --files FILES Comma-separated list of files to be placed in the working directory of each executor. File paths of these files in executors can be accessed via SparkFiles.get(fileName). --conf PROP=VALUE Arbitrary Spark configuration property. --properties-file FILE Path to a file from which to load extra properties. If not specified, this will look for conf/spark-defaults.conf. --driver-memory MEM Memory for driver (e.g. 1000M, 2G) (Default: 1024M). --driver-java-options Extra Java options to pass to the driver. --driver-library-path Extra library path entries to pass to the driver. --driver-class-path Extra class path entries to pass to the driver. Note that jars added with --jars are automatically included in the classpath. --executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G). --proxy-user NAME User to impersonate when submitting the application. This argument does not work with --principal / --keytab. --help, -h Show this help message and exit. --verbose, -v Print additional debug output. --version, Print the version of current Spark. Cluster deploy mode only: --driver-cores NUM Number of cores used by the driver, only in cluster mode (Default: 1). Spark standalone or Mesos with cluster deploy mode only: --supervise If given, restarts the driver on failure. --kill SUBMISSION_ID If given, kills the driver specified. --status SUBMISSION_ID If given, requests the status of the driver specified. Spark standalone and Mesos only: --total-executor-cores NUM Total cores for all executors. Spark standalone and YARN only: --executor-cores NUM Number of cores per executor. (Default: 1 in YARN mode, or all available cores on the worker in standalone mode) YARN-only: --queue QUEUE_NAME The YARN queue to submit to (Default: "default"). --num-executors NUM Number of executors to launch (Default: 2). If dynamic allocation is enabled, the initial number of executors will be at least NUM. --archives ARCHIVES Comma separated list of archives to be extracted into the working directory of each executor. --principal PRINCIPAL Principal to be used to login to KDC, while running on secure HDFS. --keytab KEYTAB The full path to the file that contains the keytab for the principal specified above. This keytab will be copied to the node running the Application Master via the Secure Distributed Cache, for renewing the login tickets and the delegation tokens periodically.
提交一个应用命令:

Usage: spark-submit [options] <app jar | python file | R file> [app arguments]

  • 第一种:基本参数配置
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-9SF8ch3i-1618964047935)(/img/image-20210420175129024.png)]

  • 第二种:Driver Program 参数配置
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-g0Z7eBAV-1618964047935)(/img/image-20210420175259412.png)]

  • 第三种:Executor 参数配置
​ 每个Spark Application运行时,需要启动Executor运行任务Task,需要指定Executor个数及每个Executor资源信息(内存Memory和CPU Core核数)。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-sgMSmgG3-1618964047936)(/img/image-20210420175359934.png)]

官方案例,提交Spark应用运行设置

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-3Acouk57-1618964047936)(/img/image-20210420175618529.png)]

14-[掌握]-IDEA应用开发【应用打包运行】

​ 将开发测试完成的WordCount程序打成jar保存,使用【spark-submit】分别提交运行在本地模式LocalMode和集群模式Standalone集群。

先修改代码,通过master设置运行模式及传递处理数据路径

package cn.itcast.spark.submit import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} /** * 使用Spark实现词频统计WordCount程序 */ object SparkSubmit { def main(args: Array[String]): Unit = { //判断是否传递2个参数,如果不是,直接抛出异常 if(args.length < 2){ println("Usage: SparkSubmit <input> <output> ...................") System.exit(-1) } // TODO: 创建SparkContext实例对象,首先构建SparkConf实例,设置应用基本信息 val sc: SparkContext = { // 其一、构建SparkConf对象,设置应用名称和master val sparkConf: SparkConf = new SparkConf() .setAppName("SparkWordCount") //.setMaster("local[2]") // 其二、创建SparkContext实例,传递sparkConf对象 new SparkContext(sparkConf) } // TODO: 第一步、从HDFS读取文件数据,sc.textFile方法,将数据封装到RDD中 val inputRDD: RDD[String] = sc.textFile(args(0)) // TODO: 第二步、调用RDD中高阶函数,进行处理转换处理,函数:flapMap、map和reduceByKey /* mapreduce spark spark hive | flatMap() = map + flatten mapreduce spark spark hive |map mapreduce,1 spark,1 spark,1 hive,1 | reduceByKey spark, 2 mapreduce, 1 hive, 1 */ val resultRDD: RDD[(String, Int)] = inputRDD // 按照分隔符分割单词 .flatMap(line => line.split("\\s+")) // 转换单词为二元组,表示每个单词出现一次 .map(word => word -> 1) // 按照单词分组,对组内执进行聚合reduce操作,求和 .reduceByKey((tmp, item) => tmp + item) // TODO: 第三步、将最终处理结果RDD保存到HDFS或打印控制台 resultRDD.saveAsTextFile(s"${args(1)}-${System.currentTimeMillis()}") // 应用结束,关闭资源 sc.stop() } }
打成jar包,上传至HDFS文件系统:/spark/apps

SPARK_HOME=/export/server/spark ${SPARK_HOME}/bin/spark-submit \ --master local[2] \ --class cn.itcast.spark.submit.SparkSubmit \ hdfs://node1.itcast.cn:8020/spark/apps/spark-day02_2.11-1.0.0.jar \ /datas/wordcount.data /datas/swc-output SPARK_HOME=/export/server/spark ${SPARK_HOME}/bin/spark-submit \ --master spark://node1.itcast.cn:7077,node2.itcast.cn:7077 \ --class cn.itcast.spark.submit.SparkSubmit \ --driver-memory 512m \ --executor-memory 512m \ --executor-cores 1 \ --total-executor-cores 2 \ hdfs://node1.itcast.cn:8020/spark/apps/spark-day02_2.11-1.0.0.jar \ /datas/wordcount.data /datas/swc-output

附录一、创建Maven模块

1)、Maven 工程结构

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-qDZki2jH-1618964047936)(img/1595891933073.png)]

​ MAVEN工程GAV三要素:

<parent> <artifactId>bigdata-spark_2.11</artifactId> <groupId>cn.itcast.spark</groupId> <version>1.0.0</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>spark-chapter01_2.11</artifactId>

2)、POM 文件内容

​ Maven 工程POM文件中内容(依赖包):

<repositories> <repository> <id>aliyun</id> <url>http://maven.aliyun.com/nexus/content/groups/public/</url> </repository> <repository> <id>cloudera</id> <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url> </repository> <repository> <id>jboss</id> <url>http://repository.jboss.com/nexus/content/groups/public</url> </repository> </repositories> <properties> <scala.version>2.11.12</scala.version> <scala.binary.version>2.11</scala.binary.version> <spark.version>2.4.5</spark.version> <hadoop.version>2.6.0-cdh5.16.2</hadoop.version> </properties> <dependencies> <!-- 依赖Scala语言 --> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <!-- Spark Core 依赖 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_${scala.binary.version}</artifactId> <version>${spark.version}</version> </dependency> <!-- Hadoop Client 依赖 --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> </dependencies> <build> <outputDirectory>target/classes</outputDirectory> <testOutputDirectory>target/test-classes</testOutputDirectory> <resources> <resource> <directory>${project.basedir}/src/main/resources</directory> </resource> </resources> <!-- Maven 编译的插件 --> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.0</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>UTF-8</encoding> </configuration> </plugin> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.0</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
IDEA中配置远程连接服务器

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-XLVeLsw0-1618964047937)(/img/1605688796757.png)]

.0

1.8
1.8
UTF-8



net.alchim31.maven
scala-maven-plugin
3.2.0



compile
testCompile





> IDEA中配置远程连接服务器 [外链图片转存中...(img-XLVeLsw0-1618964047937)]

有关Note_Spark_Day02:Spark 基础环境的更多相关文章

  1. ruby-on-rails - 在 Rails 开发环境中为 .ogv 文件设置 Mime 类型 - 2

    我正在玩HTML5视频并且在ERB中有以下片段:mp4视频从在我的开发环境中运行的服务器很好地流式传输到chrome。然而firefox显示带有海报图像的视频播放器,但带有一个大X。问题似乎是mongrel不确定ogv扩展的mime类型,并且只返回text/plain,如curl所示:$curl-Ihttp://0.0.0.0:3000/pr6.ogvHTTP/1.1200OKConnection:closeDate:Mon,19Apr201012:33:50GMTLast-Modified:Sun,18Apr201012:46:07GMTContent-Type:text/plain

  2. Vscode+Cmake配置并运行opencv环境(Windows和Ubuntu大同小异) - 2

    之前在培训新生的时候,windows环境下配置opencv环境一直教的都是网上主流的vsstudio配置属性表,但是这个似乎对新生来说难度略高(虽然个人觉得完全是他们自己的问题),加之暑假之后对cmake实在是爱不释手,且这样配置确实十分简单(其实都不需要配置),故斗胆妄言vscode下配置CV之法。其实极为简单,图比较多所以很长。如果你看此文还配不好,你应该思考一下是不是自己的问题。闲话少说,直接开始。0.CMkae简介有的人到大二了都不知道cmake是什么,我不说是谁。CMake是一个开源免费并且跨平台的构建工具,可以用简单的语句来描述所有平台的编译过程。它能够根据当前所在平台输出对应的m

  3. postman接口测试工具-基础使用教程 - 2

    1.postman介绍Postman一款非常流行的API调试工具。其实,开发人员用的更多。因为测试人员做接口测试会有更多选择,例如Jmeter、soapUI等。不过,对于开发过程中去调试接口,Postman确实足够的简单方便,而且功能强大。2.下载安装官网地址:https://www.postman.com/下载完成后双击安装吧,安装过程极其简单,无需任何操作3.使用教程这里以百度为例,工具使用简单,填写URL地址即可发送请求,在下方查看响应结果和响应状态码常用方法都有支持请求方法:getpostputdeleteGet、Post、Put与Delete的作用get:请求方法一般是用于数据查询,

  4. postman——集合——执行集合——测试脚本——pm对象简单示例02 - 2

    //1.验证返回状态码是否是200pm.test("Statuscodeis200",function(){pm.response.to.have.status(200);});//2.验证返回body内是否含有某个值pm.test("Bodymatchesstring",function(){pm.expect(pm.response.text()).to.include("string_you_want_to_search");});//3.验证某个返回值是否是100pm.test("Yourtestname",function(){varjsonData=pm.response.json

  5. 软件测试基础 - 2

    Ⅰ软件测试基础一、软件测试基础理论1、软件测试的必要性所有的产品或者服务上线都需要测试2、测试的发展过程3、什么是软件测试找bug,发现缺陷4、测试的定义使用人工或自动的手段来运行或者测试某个系统的过程。目的在于检测它是否满足规定的需求。弄清预期结果和实际结果的差别。5、测试的目的以最小的人力、物力和时间找出软件中潜在的错误和缺陷6、测试的原则28原则:20%的主要功能要重点测(eg:支付宝的支付功能,其他功能都是次要的)80%的错误存在于20%的代码中7、测试标准8、测试的基本要求功能测试性能测试安全性测试兼容性测试易用性测试外观界面测试可靠性测试二、质量模型衡量一个优秀软件的维度①功能性功

  6. ES基础入门 - 2

    ES一、简介1、ElasticStackES技术栈:ElasticSearch:存数据+搜索;QL;Kibana:Web可视化平台,分析。LogStash:日志收集,Log4j:产生日志;log.info(xxx)。。。。使用场景:metrics:指标监控…2、基本概念Index(索引)动词:保存(插入)名词:类似MySQL数据库,给数据Type(类型)已废弃,以前类似MySQL的表现在用索引对数据分类Document(文档)真正要保存的一个JSON数据{name:"tcx"}二、入门实战{"name":"DESKTOP-1TSVGKG","cluster_name":"elasticsear

  7. 牛客网专项练习30天Pytnon篇第02天 - 2

    1.在Python3中,下列关于数学运算结果正确的是:(B)a=10b=3print(a//b)print(a%b)print(a/b)A.3,3,3.3333...B.3,1,3.3333...C.3.3333...,3.3333...,3D.3.3333...,1,3.3333...解析:    在Python中,//表示地板除(向下取整),%表示取余,/表示除(Python2向下取整返回3)2.如下程序Python2会打印多少个数:(D)k=1000whilek>1:    print(k)k=k/2A.1000 B.10C.11D.9解析:    按照题意每次循环K/2,直到K值小于等

  8. ruby-on-rails - ruby gem如何在rails环境下工作 - 2

    我试图在rails中了解rubygems是如何变得可以自动使用的,而不是在使用required的文件中gem? 最佳答案 这是通过bundler/setup完成的:http://bundler.io/v1.3/bundler_setup.html.它在您的config/boot.rb文件中是必需的。简而言之,它首先将环境变量设置为指向您的Gemfile:ENV['BUNDLE_GEMFILE']||=File.expand_path('../../Gemfile',__FILE__)然后它通过要求bundler/setup将所有ge

  9. ruby-on-rails - 我需要一个真正的 UNIX RoR 开发环境 - 2

    从一开始,我就是一个Windows高手。我从MS-DOS开始。我安装了Windows2.1以及此后的所有Windows。现在,我家里有10台不同的Windows机器在运行,从Windows7Ultimate到各种版本的WindowsServer。我还没有完成Windows8,也不想去那里。我在服务器和各种软件方面都有UNIX经验,但它并不是我的首选环境。但是,我想我正在转换。我试图假装使用Cygwin和MSYS在Windows下运行UNIX。我的目的是搭建一个开发环境。两者都让我失望了。我花了比开发更多的时间来解决一系列技术问题。这是NotAcceptable。到目前为止,我的Ruby

  10. ruby-on-rails - 如果特定语言环境中缺少翻译,如何配置 i18n 以使用 en 语言环境? - 2

    如果特定语言环境中缺少翻译,如何配置i18n以使用en语言环境翻译?当前已插入翻译缺失消息。我正在使用RoR3.1。 最佳答案 找到相似的question这里是答案:#application.rb#railswillfallbacktoconfig.i18n.default_localetranslationconfig.i18n.fallbacks=true#railswillfallbacktoen,nomatterwhatissetasconfig.i18n.default_localeconfig.i18n.fallback

随机推荐