如何创建网站的八个步骤?

摘要:建网站的八个步骤,游戏网页链接,漂亮网站,网站页面设计报价模板spark笔记 1. 概述 Spark是一种基于内存的快速、通用、可扩展的大数据分析计算引擎;Spark提供内存计算,
建网站的八个步骤,游戏网页链接,漂亮网站,网站页面设计报价模板spark笔记 1. 概述 Spark是一种基于内存的快速、通用、可扩展的大数据分析计算引擎#xff1b;Spark提供内存计算#xff0c;将计算结果直接放在内存中#xff0c;减少了迭代计算的IO开销#xff0c;有更高效的运算效率。 1.1 Spark核心模块 Spark Core#xff1a;提供S…spark笔记 1. 概述 Spark是一种基于内存的快速、通用、可扩展的大数据分析计算引擎Spark提供内存计算将计算结果直接放在内存中减少了迭代计算的IO开销有更高效的运算效率。 1.1 Spark核心模块 Spark Core提供Spark最基础与最核心的功能Spark SQL是Spark用来操作结构化数据的组件。通过Spark SQL用户可以使用SQL或者Apache Hive 版本的SQL 方言HQL来查询数据Spark Streaming是Spark平台上针对实时数据进行流式计算的组件提供了丰富的处理数据流的API 1.2 基本概念 RDD弹性分布式数据集的简称分布式内存的一个抽象概念提供了一种高度受限的共享内存模 (可以看作一个不可变的分布式对象集合) DAG有向无环图的简称 反映RDD之间的依赖关系 Executor是运行在工作节点WorkerNode的一个进程负责运行Task 应用Application用户编写的Spark应用程序 任务 Task 运行在Executor上的工作单元 作业 Job 一个作业包含多个RDD及作用于相应RDD上的各种操作 阶段 Stage 是作业的基本调度单位一个作业会分为多组任务每组任务被称为阶段或者也被称为任务集合代表了一组关联的、相互之间没有Shuffle依赖关系的任务组成的任务集 2. spark工作架构 Cluster Manager集群资源管理器Worker Node运行作业任务的工作节点运行在集群中的一台服务器上Cluster Manager每个应用的任务控制节点Driver每个应用的任务控制节点Executor每个工作节点上负责具体任务的执行进程   一个应用由一个Driver和若干个作业构成一个作业由多个阶段构成一个阶段由多个没有Shuffle关系的任务组成   当执行一个应用时Driver会向集群管理器申请资源(即由Driver创建一个SparkContext进行资源的申请、任务的分配和监控) 启动Executor并向Executor发送应用程序代码和文件然后在Executor上执行任务运行结束后执行结果会返回给Driver或者写到HDFS或者其他数据库中。 3. RDD 弹性分布式数据集spark最基本的数据处理模型代码中是一个抽象类它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。 3.1 特性 弹性 内存与磁盘的自动切换、数据丢失可以自动恢复、计算出错重试机制、可根据需要重新分片 分布式 数据存储在大数据集群不同节点上 数据集 RDD封装计算逻辑不保存数据 不可变 RDD封装计算逻辑是不可以改变的想要改变只能产生新的RDD在新的RDD里面封装计算逻辑 可分区 并行计算 3.2 RDD 创建 RDD的创建可以从从文件系统中加载数据创建得到或者通过并行集合数组创建RDD。 从文件系统中加载数据 val lines sc.textFile(C:\Users\26414\Downloads\word.txt) 从HDFS中加载数据 val lines sc.textFile(hdfs://localhost:9000/user/hadoop/word.txt) 从已经存在的集合数组上创建 val list List(1,2,3,4,5)val rdd sc.parallelize(list) 3.3 RDD 操作 3.3.1 转换算子(产生新的RDD算子不触发计算返回新的RDD)   单值类型 方法描述map()一对一的转换操作对每一条数据计算默认分区数不变mapPartitions()以分区为单位进行计算一次性获取分区的所有数据参数为迭代器返回也是迭代器mapPartitionsWithIndex()参数为分区号和迭代器返回值是迭代器flatMap()将map结果扁平化glom()将分区内数据转为数组ArraygroupBy(函数)用于执行分组操作返回值为元组第一个元素为分组的key第二个元素为相同key的可迭代集合将数据进行分组操作但是分区是不会改变的也可以传入参数改变分区数shufflefilter(函数)过滤参数为返回值类型为Boolean的函数将数据根据指定的规则进行筛选sample()抽取样本也可以给定种子distinct()去重也可以传入参数num改变分区数shufflecoalesce(num)缩减分区默认shufflerepartition(num)扩大分区底层是coalesce(num, true)sortBy()排序之前可以使用函数进行处理默认升序可以传入false降序 map和mapPartitions区别 map每次处理一条数据mapPartitions每次将一个分区当成一个整体进行处理如果一个分区没有处理完那么所有的数据都不会释放容易出现内存溢出 双值类型 方法描述union(rdd)并集分区合并intersection(rdd)交集保留最大分区shufflesubstring(rdd)差集分区数为前面的shufflekey-value类型 方法描述groupByKey()根据key进行分组将value合并为一个迭代器(列表)reduceByKey()根据key进行分组将value合并为一个迭代器列表然后根据传入的自定义函数对每组中value数据进行聚合处理。sortByKey()根据key进行排序操作。默认升序若想倒序排列设置参数ascending False可直接写False省略 ascending countByValue()对value进行count的数量统计combineByKey()对 key-value 型 rdd 进行聚集操作的聚集函数aggregation function。类似于aggregate()combineByKey()允许用户返回值的类型与输入不一致 reduceByKey和 groupByKey的区别   从 shuffle 的角度: reduceByKey 和 groupByKey 都存在 shuffle 的操作但是 reduceByKey可以在 shuffle 前对分区内相同 key 的数据进行预聚合combine功能这样会减少落盘的数据量而 groupByKey 只是进行分组不存在数据量减少的问题reduceByKey 性能比较高。   从功能的角度 reduceByKey 其实包含分组和聚合的功能。GroupByKey 只能分组不能聚合所以在分组聚合的场合下推荐使用 reduceByKey如果仅仅是分组而不需要聚合。那么还是只能使用 groupByKey 3.3.2 行动算子(不产生新的RDD算子触发计算返回具体值) 方法描述reduce()聚集 RDD 中的所有元素先聚合分区内数据再聚合分区间数据collect()以数组 Array 的形式返回数据集的所有元素count()返回 RDD 中元素的个数take()返回一个由 RDD 的前 n 个元素组成的数组aggregate()分区的数据通过初始值和分区内的数据进行聚合然后再和初始值进行分区间的数据聚合fold()折叠操作aggregate 的简化版操作countByKey()统计每种 key 的个数save相关算子saveAsTextFile()、saveAsObjectFile()、saveAsSequenceFile()foreach()分布式遍历 RDD 中的每一个元素调用指定函数 3.4 Shuffle与依赖 Shuffle数据混洗 Spark中的两种依赖 宽依赖一个父RDD的一个分区对应一个子RDD的多个分区会引起Shuffle窄依赖一个父RDD的分区对应于一个子RDD的分区或多个父RDD的分区对应于一个子RDD的分区。   宽窄依赖通俗理解参考https://www.likecs.com/show-203912211.html 阶段的划分作业分成几段落盘完才能进行下一阶段取决于Shuffle的个数阶段的个数 Shuffle的个数 1Shuffle将阶段一分为二 3.5 RDD 持久化 RDD是不保存数据的所以如果多个RDD需要共享其中一个RDD的数据必须重头执行效率非常低所以需要将一些重复性比较高比较耗时的操作的结果缓存起来提高效率。 cache()缓存RDD数据在执行行动算子之后会在血缘关系中添加缓存相关的依赖一旦发生错误可以重新执行persist()可以设置不同的级别存储在磁盘内存cache默认只在内存中缓存checkpoint()执行前需要设置检查点目录为了保证数据的准确性执行时会启动新的job所以一般与cache结合使用这样checkpoint的job就可以从cache缓存中读数据checkpoint()会切断血缘关系因为它将数据保存在分布式文件系统中当成了一个数据源数据相对安全。   血缘关系 A的操作行为依赖于BB的操作行为依赖于C然而A的操作行为间接依赖于C推导于相邻的两个RDD的关系称之为依赖关系,新的RDD依赖于旧的RDD,多个连续的RDD的依赖关系称之为血缘关系。每个RDD会保存血缘关系,但每个RDD不会保存数据如果在reduceByKey过程中出现错误时由于RDD不会保存数据但可以根据血缘关系将数据源重新读取进行计算。 参考https://www.likecs.com/show-203912211.html 4. Spark 共享变量 4.1 累加器 使用场景 主要用于多个节点对一个变量进行共享性的操作在分布式运行时每个task运行的只是原始变量的一个副本并不能改变原始变量的值但是当这个变量被声明为累加器后该变量就会有分布式计数的功能。 object Accumulator_scala {def main(args: Array[String]): Unit {val conf new SparkConf().setAppName(Accumulator_scala).setMaster(local)val sc new SparkContext(conf)val sum sc.accumulator(0);val numberArray Array(1,2,3,4,5)val numbers sc.parallelize(numberArray)numbers.foreach( num sum num)println(sum)} }注 累加器在Driver端定义赋初始值累加器只能在Driver端读取最后的值在Excutor端更新。 4.2 广播变量 使用场景 假如在spark程序里需要用到大对象比如字典黑白名单等这个都会由Driver端进行分发一般来讲如果这个变量不是广播变量那么每个task就会分发一份这在task数目十分多的情况下Driver的带宽会成为系统的瓶颈而且会大量消耗Executor服务器上的资源如果将这个变量声明为广播变量那么只是每个Executor拥有一份这个Executor启动的task会共享这个变量节省了通信的成本和服务器的资源。 object BroadcastVariable_scala {def main(args: Array[String]): Unit {val conf new SparkConf().setAppName(BroadcastVariable_scala).setMaster(local)val sc new SparkContext(conf)val factor 3val factorBroadcast sc.broadcast(factor)val numberArray Array(1,2,3,4,5)val numbers sc.parallelize(numberArray,1)val multipleNumber numbers.map{num num * factorBroadcast.value}multipleNumber.foreach(num println(num))} }注 因为RDD是不存储数据的。可以将RDD的结果广播出去。广播变量只能在Driver端定义不能在Executor端定义。在Driver端可以修改广播变量的值在Executor端无法修改广播变量的值。 - 如果executor端用到了Driver的变量如果不使用广播变量在Executor有多少task就有多少Driver端的变量副本。如果Executor端用到了Driver的变量如果使用广播变量在每个Executor中只有一份Driver端的变量副本 参考https://blog.csdn.net/chanyue123/article/details/123175776(累加器介绍)            https://zhuanlan.zhihu.com/p/158894710(广播变量原理) 5. Spark SQL Spark SQL 是Spark 用于结构化数据(structured data)处理的Spark 模块兼容Hive数据既可以来自RDD也可以是Hive、HDFS、Cassandra等外部数据源还可以是JSON格式的数据提高开发效率提供两个编程抽象DataFrameDataSet。 RDD是数据DF是结构DS是类型RDD只包含数据没有结构DataFrame添加了结构DataSet添加了类型DataFrame就是在RDD基础上加入了列处理数据就像处理二维表一样             5.1 DataFrame 5.1.1 DataFrame的组成 在结构层面: StructType对象描述整个DataFrame的表结构 StructField对象描述一个列的信息 在数据层面 Row对象记录一行数据 Column对象记录一列数据并包含列的信息 5.1.2 DataFrame之DSL agg: 它是GroupedData对象的API, 作用是 在里面可以写多个聚合alias: 它是Column对象的API, 可以针对一个列 进行改名withColumnRenamed: 它是DataFrame的API, 可以对DF中的列进行改名, 一次改一个列, 改多个列 可以链式调用orderBy: DataFrame的API, 进行排序, 参数1是被排序的列, 参数2是 升序(True) 或 降序 Falsefirst: DataFrame的API, 取出DF的第一行数据, 返回值结果是Row对象.(Row对象 就是一个数组, 你可以通过row[‘列名’] 来取出当前行中, 某一列的具体数值. 返回值不再是DF 或者GroupedData 或者Column而是具体的值(字符串, 数字等))       常用方法 show方法 功能:展示DataFrame中的数据, 默认展示20条printSchema方法 功能:打印输出df的schema信息        DSL语法 df.select(username).show()查看某一列数据df.select($username,$age 1).show列数据进行运算df.select(username, age 1).show()df.select(username, age 1 as newage).show()起别名df.filter($age30).show()过滤数据df.groupBy(age).count.show()分组5.1.2 DataFrame之SQL        SQL语法 df.createOrReplaceTempView(people)创建一个临时表val sqlDF spark.sql(SELECT * FROM people)查询sqlDF.show()显示结果df.createGlobalTempView(people)创建一个全局表spark.sql(SELECT * FROM global_temp.people).show()查询全局表必须加global_tempspark.newSession().sql(SELECT * FROM global_temp.people).show()在新的Session里可以查询全局表5.1.3 DataFrame转换 RDD转为DF RDD没有结构指定结构idRDD.toDF(“id”).show使用toDF(“列名”) 通过样例类将数据转换成样例类的对象使用idRDD.toDF使用样例类中的列名也可以重命名.toDF(“id”,“name”) DF转为RDD df.rdd返回的是Row对象 注DataFrame创建参考https://blog.csdn.net/feizuiku0116/article/details/121523791 5.2 DataSet 是具有强类型的数据集合需要提供对应的类型信息查询之后使用方便可以看做DataFrame的拓展 DataFrame 一行记录中没有指定特定的数据类型Dataset 一行记录中每个对象有明确类型 6. Spark Streaming SparkStreaming是一套框架。 SparkStreaming是Spark核心API的一个扩展可以实现高吞吐量的具备容错机制的实时流数据处理。 实时 数据处理的时延在毫秒内响应 离线 数据处理的时延在小时、天 数据处理的方式 批处理多条处理流式一条一条处理 微批次准实时的数据处理引擎 支持多种数据源获取数据 Spark Streaming接收Kafka、Flume、HDFS等各种来源的实时输入数据进行处理后处理结构保存在HDFS、DataBase等各种地方。 *使用的最多的是kafkaSpark Streaming 内部工作原理 Spark Streaming从实时数据流接入数据再将其划分为一个个小批量供后续Spark engine处理所以实际上Spark Streaming是按一个个小批量来处理数据流的。 Spark Streaming为这种持续的数据流提供了的一个高级抽象即discretized[dɪˈskriːtaizd] stream离散数据流或者叫DStream。DStream既可以从输入数据源创建得来如Kafka、Flume或者Kinesis也可以从其他DStream经一些算子操作得到。其实在内部一个DStream就是包含了一系列RDDs。 6.1 StreamingContext StreamingContext定义之后做的操作如下 通过创建输入DStream来创建输入数据源。通过对DStream定义transformation和output算子操作来定义实时计算逻辑。调用StreamingContext的start()方法来开始实时处理数据。调用StreamingContext的awaitTermination()方法来等待应用程序的终止。可以使 用CTRLC手动停止或者就是让它持续不断的运行进行计算。也可以通过调用StreamingContext的stop()方法来停止应用程序。   StreamingContext创建方式 SparkConf创建 val conf new SparkConf().setAppName(appName).setMaster(master); val ssc new StreamingContext(conf, Seconds(1));appName是用来在Spark UI上显示的应用名称。master是Spark、Mesos或Yarn集群的URL或者是local[*]。batch interval可以根据应用程序的延迟要求以及可用的集群资源情况来设置。SparkContext创建 val sc new SparkContext(conf) val ssc new StreamingContext(sc, Seconds(1))6.2 DStream 简介 Spark Streaming提供了表示连续数据流的、高度抽象的被称为离散流的DStream 假如外部数据不断涌入按照一分钟切片每个一分钟内部的数据是连续的连续数据流而一分钟与一分钟的切片却是相互独立的离散流。 DStream是Spark Streaming特有的数据类型。 Dstream可以看做一组RDDs即RDD的一个序列 Spark的RDD可以理解为空间维度Dstream的RDD理解为在空间维度上又加了个时间维度。 将连续的数据持久化离散化然后进行批量处理。 持久化接收到的数据暂存(持久化原因做容错的当数据流出错了因为没有得到计算需要把数据从源头进行回溯暂存的数据可以进行恢复。) 离散化按时间分片形成处理单元。 分片处理分批处理。 6.3 DStream 转换操作 方法描述Scala示例map()对DStream中的每个元素应用指定函数并返回各元素输出元素组成的DStreamds.map(xx1)flatMap()对DStream中的每个元素应用指定函数并返回各元素输出迭代器组成的DStreamds.flatMap(xx.split( ))filter()筛选过滤ds.filter(xx ! 1)repartition()改变DStream分区数ds.repartition(10)reduceByKey()将每个批次中键相同的记录归约ds.reduceByKey((x,y)xy)groupByKey()将每个批次中的记录按键分组ds.groupByKey() 6.4 DStream 输出操作 方法描述print()在Driver中打印前10个元素saveAsTextFiles(prefix, [suffix])以文本形式保存为文件。其中每次批处理产生文件以prefix-TIME_IN_MS[.suffix]命名saveAsObjectFiles(prefix, [suffix])将DStream中内容按对象序列化并以SequenceFile格式保存。其中每次批处理产生文件以prefix-TIME_IN_MS[.suffix]命名saveAsHadoopFiles(prefix, [suffix])以文本形式保存为hadoop文件。其中每次批处理产生文件以prefix-TIME_IN_MS[.suffix]命名foreachRDD(func)最基本输出操作将func函数应用于DStream中的RDD上这个操作会输出数据到外部系统 6.5 Spark Streaming 窗口操作 Spark提供了一组窗口操作通过滑动窗口技术对大规模数据的增量更新进行统计分析 Window Operation定时进行一定时间段内的数据处理 任何基于窗口操作需要指定两个参数 窗口总长度你想计算多长时间的数据滑动时间间隔你每多长时间去更新一次     参考文档 https://www.cnblogs.com/fishperson/p/10447033.html https://blog.csdn.net/u012369535/article/details/93042905 7. Spark 调优 7.1 常规性能调优 常规性能调优 增加executor个数 增加每个executor的cpu个数 增加每个executor的内存1缓存更多的数据减少磁盘IO2为shuffle提供更多内存减少写入磁盘数据3内存较小会频繁GC增加内存可以避免频繁GC提升整体性能RDD优化 避免在相同的算子和计算逻辑下对RDD进行重复的计算 对多次使用的RDD进行持久化可以序列化的对于可靠性要求高的数据可以使用副本 RDD尽可能早的过滤操作减少内存占用并行度调节 在资源允许的情况下应尽可能让并行度与资源匹配官方推荐task数量应该是总cpu核心数的2-3倍因为有的任务执行的快执行完毕后可以立马执行下一个task广播大的变量 如果task算子中使用了外部的变量每个task会有一个变量的副本这就造成了内存的极大消耗如果使用广播变量的话每个executor保存一个副本可以很大程度上减少内存的使用 序列化使用Kryo序列化java的序列化太重慢字节多效率不高 调整本地化等待时长 7.2 算子调优 使用mapPartitionsmap对分区中每一个元素操作mapPartitions对真个分区操作如果使用jdbc写入数据那么每条数据都要建立一个连接资源消耗大如果使用mapPartitions只需要建立一个连接 缺点如果数据量很大的话使用mapPartitions容易OOM如果资源允许可以考虑使用mapPartitions代替map使用foreachPartition优化数据库操作与mapPartitions类似foreachPartition将RDD的每个分区作为遍历对象filter与coalesce的配合使用过滤后数据量变小再用原来的去执行浪费资源有可能导致数据倾斜可以进行合并分区或者扩大分区宽依赖需要设置为true即开启shuffle否则不起作用使用repartition提高SparkSQL的并行度默认是hive表的文件的切片个数spark SQL的并行度没法手动更改可以读取到数据后立马进行扩大分区数提高并行度使用reduceByKey预聚合groupByKey 不会进行map 端的聚合减少磁盘IO减少对磁盘空间的占用reduce端的数据也变少 7.3 Shuffle 调优 增加map端缓冲区的大小避免频繁溢写到磁盘增加reduce端拉取数据缓冲区大小减少拉取次数减少网络传输增加reduce端拉取数据的重试次数避免因为JVM的full GC或者网络原因导致数据拉取失败进而导致作业执行失败增加reduce端拉取数据等待间隔拉取失败会等一段时间后重试以增加shuffle - 操作的稳定性增加SortShuffle 排序操作阈值 7.4 JVM调优 降低cache 操作的内存占比1静态内存管理机制需要调节2统一内存管理机制堆内存被划分为了两块Storage 和Execution增大Executor 堆外内存有时去拉取Executor数据时Executor可能已经由于内存溢出挂掉了增大连接等待时长Executor优先从本地关联的BlockManager获取数据获取不到会去其他节点上获取数据反复拉取不到会导致DAGScheduler反复提交几次stageTaskScheduler反复提交几次Task大大延长作业运行时间 参考文档 https://zhuanlan.zhihu.com/p/463537198 https://www.bbsmax.com/A/q4zVE2aWdK/ https://www.cnblogs.com/kdy11/p/10943547.html 《spark官方文档》 《Spark快速大数据分析》 《Apache Spark 2.0.2 中文文档》