当前位置 博文首页 > ?:Spark大数据分析与实战:RDD编程初级实践
具体的安装过程在我以前的博客里面有,大家可以通过以下链接进入操作:
Hadoop的安装:https://blog.csdn.net/weixin_47580081/article/details/108647420
Scala及Spark的安装:https://blog.csdn.net/weixin_47580081/article/details/114250894
提示:如果IDEA未构建Spark项目,可以转接到以下的博客:
IDEA使用Maven构建Spark项目:https://blog.csdn.net/weixin_47580081/article/details/115435536
master
slave1
slave2
请到教程官网的“下载专区”的“数据集”中下载chapter5-data1.txt,该数据集包含了某大学计算机系的成绩,数据格式如下所示: Tom,DataBase,80
Tom,Algorithm,50
Tom,DataStructure,60
Jim,DataBase,90
Jim,Algorithm,60
Jim,DataStructure,80
……
请根据给定的实验数据,在spark-shell中通过编程来计算以下内容:
如果找不到数据可以从这下载:
数据集链接:https://pan.baidu.com/s/19lYqDZD2u745TbmVdU5jSQ
提取码:z49l
shell命令:
val lines = sc.textFile("file:///opt/software/Data01.txt")
lines.map(row=>row.split(",")(0)).distinct().count
运行截图:
shell命令:
lines.map(row=>row.split(",")(1)).distinct().count
运行截图:
shell命令:
lines.filter(row=>row.split(",")(0)=="Tom").map(row=>(row.split(",")(0),row.split(",")(2).toInt))
.mapValues(x=>(x,1))
.reduceByKey((x,y) => (x._1+y._1,x._2 + y._2))
.mapValues(x => (x._1 / x._2))
.collect()
运行截图:
shell命令:
lines.map(row=>(row.split(",")(0),1))
.reduceByKey((x,y)=>x+y)
.collect
运行截图:
shell命令:
lines.filter(row=>row.split(",")(1)=="DataBase").count
运行截图:
shell命令:
lines.map(row=>(row.split(",")(1),row.split(",")(2).toInt))
.mapValues(x=>(x,1))
.reduceByKey((x,y) => (x._1+y._1,x._2 + y._2))
.mapValues(x => (x._1 / x._2))
.collect()
运行截图:
shell命令:
val accum = sc.longAccumulator("My Accumulator")
lines.filter(row=>row.split(",")(1)=="DataBase")
.map(row=>(row.split(",")(1),1))
.values
.foreach(x => accum.add(x))
accum.value
运行截图:
对于两个输入文件A和B,编写Spark独立应用程序,对两个文件进行合并,并剔除其中重复的内容,得到一个新文件C。下面是输入文件和输出文件的一个样例,供参考。
输入文件A的样例如下:
20170101 x
20170102 y
20170103 x
20170104 y
20170105 z
20170106 z
输入文件B的样例如下:
20170101 y
20170102 y
20170103 x
20170104 z
20170105 y
根据输入的文件A和B合并得到的输出文件C的样例如下:
20170101 x
20170101 y
20170102 y
20170103 x
20170104 y
20170104 z
20170105 y
20170105 z
20170106 z
源代码:
package com.John.Sparkstudy.SparkTest.Test03
import java.io.FileWriter
import org.apache.spark.{SparkConf, SparkContext}
/**
* @author John
* @Date 2021/4/1 12:43
*/
object distinctTest {
def main(args: Array[String]): Unit = {
// 配置信息
val conf = new SparkConf().setAppName("distinctTest")
.set("spark.executor.memory", "512m")
.setMaster("local")
val sc = new SparkContext(conf)
// 读取文件A.txt
val A = sc.textFile("D://bigdata//Spark分布式计算框架//data//A.txt")
// 读取文件B.txt"
val B = sc.textFile("D://bigdata//Spark分布式计算框架//data//B.txt")
// 对两个文件进行合并
val all = A++B
// 1 用distinct()去重
// 2 以空格切割 例如:(20170101 x)
// 3 根据key排序
val distinct_lines = all.distinct().map(row=>(row.split(" ")(0),row.split(" ")(1))).sortByKey()
// 将RDD类型的数据转化为数组
val results = distinct_lines.collect()
// 将结果输出到C.txt中
val out = new FileWriter("D://bigdata//Spark分布式计算框架//data//C.txt",true)
for(item<-results){
out.write(item+"\n")
println(item)
}
out.close()
}
}
运行截图:
每个输入文件表示班级学生某个学科的成绩,每行内容由两个字段组成,第一个是学生名字,第二个是学生的成绩;编写Spark独立应用程序求出所有学生的平均成绩,并输出到一个新文件中。下面是输入文件和输出文件的一个样例,供参考。
Algorithm成绩:
小明 92
小红 87
小新 82
小丽 90
Database成绩:
小明 95
小红 81
小新 89
小丽 85
Python成绩:
小明 82
小红 83
小新 94
小丽 91
平均成绩如下:
(小红,83.67)
(小新,88.33)
(小明,89.67)
(小丽,88.67)
源代码:
package com.John.Sparkstudy.SparkTest.Test03
import java.io.FileWriter
import org.apache.spark.{SparkConf, SparkContext}
/**
* @author John
* @Date 2021/4/1 12:57
*/
object averageTest {
def main(args: Array[String]) {
// 配置信息
val conf = new SparkConf().setAppName("averageTest")
.set("spark.executor.memory", "512m")
.setMaster("local")
val sc = new SparkContext(conf)
// 读取文件Algorithm.txt
val Algorithm = sc.textFile("D://bigdata//Spark分布式计算框架//data//Algorithm.txt")
// 读取文件Database.txt"
val Database = sc.textFile("D://bigdata//Spark分布式计算框架//data//Database.txt")
// 读取文件Python.txt
val Python = sc.textFile("D://bigdata//Spark分布式计算框架//data//Python.txt")
// 对三个文件进行整合
val all = Algorithm ++ Database ++ Python
// 以空格切割将每个名字作为键,值为一个键值对,该键值对的键为成绩,值为1(用于后面计算平均值计数用)
// 例如:(小明, (90,1))
val student_grade = all.map(row=>(row.split(" ")(0),(row.split(" ")(1).toInt,1)))
// 对上述RDD做聚合,值的聚合返回一个二元组,第一个元素是该学生所有课的成绩求和,第二个元素是该学生选修课的数目,然后再做一个映射
// 将人名作为第一个元素,所有课的总成绩除以选修课程的数目得到该学生的平均成绩作为第二个元素
val student_ave = student_grade.reduceByKey((x,y)=>(x._1+y._1,x._2+y._2)).map(x=>(x._1,x._2._1/x._2._2))
// 将RDD类型的数据转化为数组
val results = student_ave.collect()
// 将结果输出到output.txt中
val out = new FileWriter("D://bigdata//Spark分布式计算框架//data//output.txt",true)
for(item<-results){
out.write(item+"\n")
println(item)
}
out.close()
}