当前位置 博文首页 > ?:Spark大数据分析与实战:RDD编程初级实践

    ?:Spark大数据分析与实战:RDD编程初级实践

    作者:[db:作者] 时间:2021-07-17 19:05

    Spark大数据分析与实战:RDD编程初级实践

    一、安装Hadoop和Spark

    具体的安装过程在我以前的博客里面有,大家可以通过以下链接进入操作:

    Hadoop的安装:https://blog.csdn.net/weixin_47580081/article/details/108647420
    Scala及Spark的安装:https://blog.csdn.net/weixin_47580081/article/details/114250894

    提示:如果IDEA未构建Spark项目,可以转接到以下的博客:

    IDEA使用Maven构建Spark项目:https://blog.csdn.net/weixin_47580081/article/details/115435536

    二、启动Hadoop与Spark

    查看3个节点的进程

    master在这里插入图片描述
    slave1
    在这里插入图片描述
    slave2
    在这里插入图片描述

    Spark shell命令界面与端口页面

    在这里插入图片描述
    在这里插入图片描述

    三、spark-shell交互式编程

    请到教程官网的“下载专区”的“数据集”中下载chapter5-data1.txt,该数据集包含了某大学计算机系的成绩,数据格式如下所示: Tom,DataBase,80
    Tom,Algorithm,50
    Tom,DataStructure,60
    Jim,DataBase,90
    Jim,Algorithm,60
    Jim,DataStructure,80
    ……
    请根据给定的实验数据,在spark-shell中通过编程来计算以下内容:

    如果找不到数据可以从这下载:
    数据集链接:https://pan.baidu.com/s/19lYqDZD2u745TbmVdU5jSQ
    提取码:z49l

    (1)该系总共有多少学生;

    shell命令:

    val lines = sc.textFile("file:///opt/software/Data01.txt")
    lines.map(row=>row.split(",")(0)).distinct().count
    

    运行截图:
    在这里插入图片描述

    (2)该系共开设来多少门课程;

    shell命令:

    lines.map(row=>row.split(",")(1)).distinct().count
    

    运行截图:
    在这里插入图片描述

    (3)Tom同学的总成绩平均分是多少;

    shell命令:

    lines.filter(row=>row.split(",")(0)=="Tom").map(row=>(row.split(",")(0),row.split(",")(2).toInt))
    .mapValues(x=>(x,1))
    .reduceByKey((x,y) => (x._1+y._1,x._2 + y._2))
    .mapValues(x => (x._1 / x._2))
    .collect()
    

    运行截图:
    在这里插入图片描述

    (4)求每名同学的选修的课程门数;

    shell命令:

    lines.map(row=>(row.split(",")(0),1))
    .reduceByKey((x,y)=>x+y)
    .collect
    

    运行截图:
    在这里插入图片描述

    (5)该系DataBase课程共有多少人选修;

    shell命令:

    lines.filter(row=>row.split(",")(1)=="DataBase").count
    

    运行截图:
    在这里插入图片描述

    (6)各门课程的平均分是多少;

    shell命令:

    lines.map(row=>(row.split(",")(1),row.split(",")(2).toInt))
    .mapValues(x=>(x,1))
    .reduceByKey((x,y) => (x._1+y._1,x._2 + y._2))
    .mapValues(x => (x._1 / x._2))
    .collect()
    

    运行截图:
    在这里插入图片描述

    (7)使用累加器计算共有多少人选了DataBase这门课。

    shell命令:

    val accum = sc.longAccumulator("My Accumulator")
    lines.filter(row=>row.split(",")(1)=="DataBase")
    .map(row=>(row.split(",")(1),1))
    .values
    .foreach(x => accum.add(x))
    accum.value
    

    运行截图:
    在这里插入图片描述

    四、编写独立应用程序实现数据去重

    对于两个输入文件A和B,编写Spark独立应用程序,对两个文件进行合并,并剔除其中重复的内容,得到一个新文件C。下面是输入文件和输出文件的一个样例,供参考。
    输入文件A的样例如下:
    20170101 x
    20170102 y
    20170103 x
    20170104 y
    20170105 z
    20170106 z
    输入文件B的样例如下:
    20170101 y
    20170102 y
    20170103 x
    20170104 z
    20170105 y
    根据输入的文件A和B合并得到的输出文件C的样例如下:
    20170101 x
    20170101 y
    20170102 y
    20170103 x
    20170104 y
    20170104 z
    20170105 y
    20170105 z
    20170106 z
    源代码:

    package com.John.Sparkstudy.SparkTest.Test03
    
    import java.io.FileWriter
    
    import org.apache.spark.{SparkConf, SparkContext}
    
    /**
     * @author John
     * @Date 2021/4/1 12:43
     */
    object distinctTest {
      def main(args: Array[String]): Unit = {
    
        // 配置信息
        val conf = new SparkConf().setAppName("distinctTest")
          .set("spark.executor.memory", "512m")
          .setMaster("local")
        val sc = new SparkContext(conf)
    
        // 读取文件A.txt
        val A = sc.textFile("D://bigdata//Spark分布式计算框架//data//A.txt")
        // 读取文件B.txt"
        val B = sc.textFile("D://bigdata//Spark分布式计算框架//data//B.txt")
        // 对两个文件进行合并
        val all = A++B
        // 1 用distinct()去重
        // 2 以空格切割 例如:(20170101 x)
        // 3 根据key排序
        val distinct_lines = all.distinct().map(row=>(row.split(" ")(0),row.split(" ")(1))).sortByKey()
        // 将RDD类型的数据转化为数组
        val results = distinct_lines.collect()
        // 将结果输出到C.txt中
        val out = new FileWriter("D://bigdata//Spark分布式计算框架//data//C.txt",true)
        for(item<-results){
          out.write(item+"\n")
          println(item)
        }
        out.close()
      }
    }
    

    运行截图:
    在这里插入图片描述

    在这里插入图片描述

    五、编写独立应用程序实现求平均值问题

    每个输入文件表示班级学生某个学科的成绩,每行内容由两个字段组成,第一个是学生名字,第二个是学生的成绩;编写Spark独立应用程序求出所有学生的平均成绩,并输出到一个新文件中。下面是输入文件和输出文件的一个样例,供参考。
    Algorithm成绩:
    小明 92
    小红 87
    小新 82
    小丽 90
    Database成绩:
    小明 95
    小红 81
    小新 89
    小丽 85
    Python成绩:
    小明 82
    小红 83
    小新 94
    小丽 91
    平均成绩如下:
    (小红,83.67)
    (小新,88.33)
    (小明,89.67)
    (小丽,88.67)
    源代码:

    package com.John.Sparkstudy.SparkTest.Test03
    
    import java.io.FileWriter
    
    import org.apache.spark.{SparkConf, SparkContext}
    
    /**
     * @author John
     * @Date 2021/4/1 12:57
     */
    object averageTest {
      def main(args: Array[String]) {
    
        // 配置信息
        val conf = new SparkConf().setAppName("averageTest")
          .set("spark.executor.memory", "512m")
          .setMaster("local")
        val sc = new SparkContext(conf)
    
    
        // 读取文件Algorithm.txt
        val Algorithm = sc.textFile("D://bigdata//Spark分布式计算框架//data//Algorithm.txt")
        // 读取文件Database.txt"
        val Database = sc.textFile("D://bigdata//Spark分布式计算框架//data//Database.txt")
        // 读取文件Python.txt
        val Python = sc.textFile("D://bigdata//Spark分布式计算框架//data//Python.txt")
        // 对三个文件进行整合
        val all = Algorithm ++ Database ++ Python
        // 以空格切割将每个名字作为键,值为一个键值对,该键值对的键为成绩,值为1(用于后面计算平均值计数用)
        // 例如:(小明, (90,1))
        val student_grade = all.map(row=>(row.split(" ")(0),(row.split(" ")(1).toInt,1)))
        // 对上述RDD做聚合,值的聚合返回一个二元组,第一个元素是该学生所有课的成绩求和,第二个元素是该学生选修课的数目,然后再做一个映射
        // 将人名作为第一个元素,所有课的总成绩除以选修课程的数目得到该学生的平均成绩作为第二个元素
        val student_ave = student_grade.reduceByKey((x,y)=>(x._1+y._1,x._2+y._2)).map(x=>(x._1,x._2._1/x._2._2))
        // 将RDD类型的数据转化为数组
        val results = student_ave.collect()
        // 将结果输出到output.txt中
        val out = new FileWriter("D://bigdata//Spark分布式计算框架//data//output.txt",true)
        for(item<-results){
          out.write(item+"\n")
          println(item)
        }
        out.close()
      }