本文共 5968 字,大约阅读时间需要 19 分钟。
DataSet 是一个强类型,并且类型安全的数据容器,并且提供了结构化查询API和类似RDD一样的命令式API
{ // 创建SparkSessionval spark = new sql.SparkSession.Builder() .appName("hello") .master("local[6]") .getOrCreate()// 导入隐式转换,这里的spark是上面创建的对象import spark.implicits._// 演示创建DataSetval sourceRDD = spark.sparkContext.parallelize(Seq(person("a", 18, 98.0), person("b", 20, 97.0), person("c", 18, 100.0)))val dataset = sourceRDD.toDS()}// 创建样例类case class person(name:String,age:Int,score:Double)
// 1.dataset支持RDD似的强类型API // 每一个item就是一个person类型的对象,直接通过对象获取 dataset.filter(item => item.age < 20 ).show()
// 2.dataset支持弱类型的API // 直接通过结构中的字段名取获取 dataset.filter('age < 20 ).show() dataset.filter($"age" < 20 ).show()
// 3.dataset 支持直接通过SQL表达式查询 dataset.filter("age < 20").show()
dataset.explain(true)
无论是否执行sql,dataset都会被优化器优化
![]()
DataSet最底层处理的是对象的序列化形式。通过查看DataSet生成的物理执行计划,也就是最终处理的RDD,就可以判定DataSet底层处理的是什么形式的数据。
val sourceRDD = spark.sparkContext.parallelize(Seq(person("a", 18, 98.0), person("b", 20, 97.0), person("c", 18, 100.0)))val dataset = sourceRDD.toDS()val execution = dataset.queryExecution.toRdd
dataset.queryExecution.toRdd
这个API可以看到DataSet底层执行的RDD,这个RDD中的泛型是InternalRow// 演示创建DataSet//val sourceRDD = spark.sparkContext.parallelize(Seq(person("a", 18, 98.0), person("b", 20, 97.0), person("c", 18, 100.0))) //val dataset: Dataset[person] = sourceRDD.toDS() val dataset: Dataset[person] = spark.createDataset(Seq(person("a", 18, 98.0), person("b", 20, 97.0), person("c", 18, 100.0)))
// 直接获取到已经分析和解析过的 DataSet 的执行计划,从中拿到 RDD val execution: RDD[InternalRow] = dataset.queryExecution.toRdd
// 将 DataSet 底层的 rdd[InternalRow] 通过 Decoder 转成了和 DataSet 一样类型的 RDD 。 val typeRDD: RDD[person] = dataset.rdd
// toDebugString 查看执行过程 println(execution.toDebugString) println("*****************************") println(typeRDD.toDebugString) dataset.show() typeRDD.collect().foreach(println(_))
DataFrame是SparkSQL中一个表示关系型数据库中 表 的函数式抽象,其作用是让Spark处理大规模结构化数据的时候更加容易:
- 一般DataFrame可以处理结构化数据,或者是半结构化数据,因为这两类数据中都可以获取到Schema信息,也就是说DataFrame中有Schema信息
- 也可以像操作表一样操作DataFrame
- 一个是row的集合,每个row对象表示一个行
- 二是描述DataFrame结构的Schema
// 演示创建DataFrame val personList: Seq[person] = Seq(person("a", 18, 98.0), person("b", 20, 97.0), person("c", 18, 100.0))
// 导入隐式转换,这里的spark是上面创建的对象 import spark.implicits._ // 1.toDF() val dataFrame = personList.toDF() val dataFrame1 = spark.sparkContext.parallelize(personList).toDF()
import spark.implicits._
导入隐式转换的作用:
- 通过隐式转换的内置函数将对应的RDD数据类型,调用toDF()后转为DataFrame类型。
// 2.createDataFrame() val dataFrame2 = spark.createDataFrame(personList)
createDataFrame()方法支持多种类型的数据转换:
// 3.DataFrameReader val dataFrame3 = spark.read.csv("dataset/BeijingPM20100101_20151231.csv")
案例:查看北京PM值每月的统计数量
// 配置环境 val spark = SparkSession.builder() .master("local[6]") .appName("test") .getOrCreate() // 导入隐式转换、sql函数 import spark.implicits._ import org.apache.spark.sql.functions._ // 读取数据 val df = spark.read .option("header",value = true) .csv("dataset/BeijingPM20100101_20151231.csv") df.show() // 提取数据 // 统计每年每月聚合 // 方式一 --- 命令式API df.select('year,'month,'PM_Dongsi) // 查询字段 .where('PM_Dongsi =!= "NA") // 空值处理 .groupBy('year,'month) // 分组 .agg(count("month") as "count") // 聚合 .sort('count desc) // 排序 .show() // 方式二 --- sql语句 df.createOrReplaceTempView("pm") // 创建临时表 spark.sql("select year,month,count(PM_Dongsi) as count from pm where PM_Dongsi!='NA' group by year,month order by count desc") .show() spark.close()
总结:
1.DataFrame表达的含义是一个支持函数式操作的表,而DataSet表达的是一个类似RDD的东西,DataSet可以处理任何对象
2.DataFrame中存放的是Row对象,而DataSet中可以存放任何类型的对象
type DataFrame = Dataset[Row]
// 演示创建DataFrame val personList = Seq(person("a", 18, 98.0), person("b", 20, 97.0), person("c", 18, 100.0)) // 1.DataFrame是弱类型的,DataSet是强类型的 // type DataFrame = Dataset[Row]val DataFrame: DataFrame = personList.toDF()val DataSet: Dataset[person] = personList.toDS()
3.DataFrame的操作方式和DataSet是一样的,但是对于强类型操作而言,他们处理的类型不一样
总结:
Row 对象表示的是一个 行
Row 的操作类似于 Scala 中的 Map 数据类型 Row是一个类型,跟Car、Person这些的类型一样,所有的表结构信息都用Row来表示。// 样例类case class People(name:String,age:Integer)// Row对象必须配合Schema对象才会有列名val p: People = People("Jack",20)val row: Row = Row("Tom",18)
// 类似于数组val name: String = row.getString(0)val age: Int = row.getInt(1)println(name)println(age)val name1: String = p.nameval age1: Int = p.ageprintln(name1)println(age1)
row match { case Row(name,age) => println(name,age)}
case class Person(name:String,age:Int)val spark: SparkSession = new sql.SparkSession.Builder() .appName("hello") .master("local[6]") .getOrCreate()import spark.implicits._val df: DataFrame = Seq(People("zhangsan", 15), People("lisi", 15)).toDF()val ds_fdf: Dataset[People] = df.as[People]val ds: Dataset[People] = Seq(People("zhangsan", 15), People("lisi", 15)).toDS()val df_fds: DataFrame = ds.toDF()
转载地址:http://lweq.baihongyu.com/