spark学习笔记-RDD基础算子

2019/07/27

Tags: Spark

摘要:学习spark过程中的笔记,记录spark中的基础算子,以及RDD的基本概念。


spark transform operation

源码地址:https://github.com/YaoQi17/sparkLearning/tree/master/sparkRDD


总结

RDD(Resilient Distributed Dataset) 弹性分布式数据集,是一组分布式的数据集合,里面的元素可并行计算,可分区; RDD允许用户在执行多个查询时显示地将工作集缓存在内存中,例如persist();

创建方式

创建RDD的两种方式:

外界文件不局限于系统文件,包括HDFS、HBase等

sparkSession.sparkContext.textFile("sparkRDD/src/main/resources/data.txt")
val sparkSession = getDefaultSparkSession
val dataArray = Array(1, 2, 3, 4, 5, 6)
// 创建一个RDD
val rdd = sparkSession.sparkContext.parallelize(dataArray)

通过并行化的方式创建还可以指定分区的数量

/** Distribute a local Scala collection to form an RDD.
   *
   * @note Parallelize acts lazily. If `seq` is a mutable collection and is altered after the call
   * to parallelize and before the first action on the RDD, the resultant RDD will reflect the
   * modified collection. Pass a copy of the argument to avoid this.
   * @note avoid using `parallelize(Seq())` to create an empty `RDD`. Consider `emptyRDD` for an
   * RDD with no partitions, or `parallelize(Seq[T]())` for an RDD of `T` with empty partitions.
   * @param seq Scala collection to distribute
   * @param numSlices number of partitions to divide the collection into
   * @return RDD representing distributed collection
   */
  def parallelize[T: ClassTag](
      seq: Seq[T],
      numSlices: Int = defaultParallelism): RDD[T] = withScope {
    assertNotStopped()
    new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
  }

RDD编程

RDD中包含两种类型的算子:Transformation和Action;

Transformation 算子

Transformation 转换操作,将一个RDD转化为另外一个RDD,但是该过程具有延迟加载性; Transformation算子不会马上执行,只有当遇到Action算子时才会执行。

常见的Transformation算子:

  /**
    * flatMap实例,flatMap返回的是一组元素,官网说是一个Seq ,而不是一个元素
    */
  def flatMapDemo(): Unit = {
    val sparkSession = getDefaultSparkSession
    val textData = sparkSession.sparkContext.textFile("sparkRDD/src/main/resources/data.txt")
    //    val flatData = textData.flatMap(row => row.split(" "))
    val flatData = textData.map(row => row.split(" "))
    flatData.collect().foreach(println(_))
  }
  /**
    * mapPartitionsWithIndex使用示例
    */
  def mapPartitionsWithIndexDemo(): Unit = {
    val sparkSession = getDefaultSparkSession
    val rddData = sparkSession.sparkContext.parallelize(Array("a", "b", "c", "d", "e"))
    val data = rddData.mapPartitionsWithIndex((index: Int, row: Iterator[String]) => {
      row.toList.map(x => "[partID:" + index + ":" + x + "]").iterator
    })
    data.collect().foreach(println(_))
  }

  /**
    * 求并集示例
    */
  def unionRDD(): Unit = {
    val sparkSession = getDefaultSparkSession
    val rddData1 = sparkSession.sparkContext.parallelize(List("s", "d", "f", "a"))
    val rddData2 = rddData1.map(row => {
      if (row.equals("s")) {
        row + "s"
      } else {
        row
      }
    })
    println("求并集:")
    rddData1.union(rddData2).foreach(println(_))
    println("求交集:")
    rddData1.intersection(rddData2).foreach(println(_))

    val rddData3 = rddData1.union(rddData2)
    println("去重:")
    rddData3.distinct().collect().foreach(println(_))
  }
  /**
    * 分组示例
    */
  def groupByKeyDemo(): Unit = {
    val sparkSession = getDefaultSparkSession
    val textFileRDD = sparkSession.sparkContext.textFile("sparkRDD/src/main/resources/data.txt")
    val rddData = textFileRDD.flatMap(_.split(" ")).map(row => (row, 1))
    rddData.groupByKey().map(row => {
      val count = row._2.sum
      (row._1, count)
    }).collect().foreach(println(_))
    rddData.reduceByKey((x, y) => x + y).collect().foreach(println(_))
    //    rddData.groupByKey().collect().foreach(println(_))
  }

  val textFileRDD = sparkSession.sparkContext.textFile("sparkRDD/src/main/resources/data.txt")
      val rddData = textFileRDD.flatMap(_.split(" ")).map(row => (row, 1))
      rddData.groupByKey().map(row => {
        val count = row._2.sum
        (row._1, count)
      }).collect().foreach(println(_))
  
      rddData.reduceByKey((x, y) => x + y).collect().foreach(println(_))

看完reduceByKey之后再去看看distinct()的源码,就会发现很有意思:

  def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
    map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
  }

先是将一个RDD转化为(x,null) 这种二元结构,然后按照每个key进行规约,这样就能保证key只有一个,而x,y都为null,最后只需要再将规约后的key取出来,就是去重后的RDD了。

  /**
    * 聚合,暂时还没理解
    */
  def aggregateByKeyDemo(): Unit = {
    val sparkSession = getDefaultSparkSession
    val textFileRDD = sparkSession.sparkContext.textFile("sparkRDD/src/main/resources/data.txt")
    val rddData = textFileRDD.flatMap(_.split(" ")).map(row => (row, 1))
    val aggregateRDD = rddData.aggregateByKey(0)(_ + _, _ + _)
    aggregateRDD.collect().foreach(println(_))
  }
  /**
    * 特殊的排序
    */
  def sortByKeyDemo(): Unit = {
    val sparkSession = getDefaultSparkSession
    val rddData2 = sparkSession.sparkContext.parallelize(List("CSDN", "ITEYE", "CNBLOG", "OSCHINA", "GITHUB"))
    val rddData3 = sparkSession.sparkContext.parallelize(1 to rddData2.count().toInt)
    val rddData4 = rddData2.zip(rddData3)
    rddData4.sortByKey().collect().foreach(println(_))
  }
  def sortByDemo(): Unit = {
      val sparkSession = getDefaultSparkSession
      val rddData = sparkSession.sparkContext.parallelize(List(3, 23, 4, 6, 234, 87))
      val newRdd = rddData.mapPartitionsWithIndex((index: Int, row: Iterator[Int]) => {
        row.toList.map(r => (index, r)).iterator
      })
      newRdd.sortBy(_._2, ascending = false).collect().foreach(println(_))
    }
  /**
    * zip 和 join 操作
    */
  def joinDemo(): Unit = {
    val sparkSession = getDefaultSparkSession
    val rddDataName = sparkSession.sparkContext.parallelize(List("Tom", "Rose", "Jack", "Jerry"))
    val rddDataId = sparkSession.sparkContext.parallelize(List("1001", "1002", "1003", "1004"))
    val rddDataAge = sparkSession.sparkContext.parallelize(List(12, 22, 13, 20))
    val rddIdAndName = rddDataId.zip(rddDataName)
    rddIdAndName.collect().foreach(println(_))
    val rddIdAndAge = rddDataId.zip(rddDataAge)
    rddIdAndAge.collect().foreach(println(_))
    val fullRdd = rddIdAndName.join(rddIdAndAge)
    fullRdd.collect().foreach(println(_))
  }

小细节1 引用成员变量

以下代码中map中引用了class中的成员变量;

class MyClass {
  val field = "Hello"
  def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(x => field + x) }
}

但这种方式等价于: scala rdd.map(x => this.field + x) ,这种情况会引用整个this; 正确的做法是这样的:

def doStuff(rdd: RDD[String]): RDD[String] = {
  // 复制一份副本到本地
  val field_ = this.field
  rdd.map(x => field_ + x)
}

在map中引用成员变量,应该在进行转换之前就复制一份副本到本地,然后使用本地的副本而不是去引用成员变量;

小细节2 求和操作

不能在代码中直接使用foreach求和

var counter = 0
var rdd = sc.parallelize(data)

// Wrong: Don't do this!!
rdd.foreach(x => counter += x)

println("Counter value: " + counter)