MapReduce is an old technology in nowadays, but its concept is still valid in data processing world, in previous article I've talked about the Map part, today I'm going to talk about the Reduce part in Flink.

flink

In Flink Streaming Application we could put data into window, after we put the data into window, the data will have a boundary, we could calcualte aggregate result based on the limited data in the window

Reduce Function

Reduce Function combine groups of elements to a single value, by taking always two elements and combining them into one. for example we could use Reduce Function to calcualte the min/max value in the window.

the code below shows that we first group the students into group by their year, then calcualte the min score of the year within the window

object ReduceFn {

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    val stream = env.fromElements(
      Student("Amy", 2, 100),
      Student("Emily", 3, 60),
      Student("Kelly", 2, 80),
      Student("Mia", 3, 70),
      Student("Selina", 2, 75))
      .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[Student] {
        override def getCurrentWatermark: Watermark = new Watermark(System.currentTimeMillis())

        override def extractTimestamp(t: Student, l: Long): Long = System.currentTimeMillis()
      })
      .keyBy(_.year)
      .timeWindow(Time.seconds(1))
      .reduce((s1, s2) => {
        if (s1.score > s2.score) {
          s2
        } else {
          s1
        }
      })

    stream.print()
    env.execute()
  }
}

Aggregate Function

Aggregate Function has more flexibility than Reduce function, it could have an intermediate aggregate state called accumulator, Values are added to the accumulator, and final aggregates are obtained by finalizing the accumulator state. This supports aggregation functions where the intermediate state needs to be different than the aggregated values and the final result type

the code below shows that we first group the students into group by their year, then calcualte the average score of the year within the window, the Accumulator could hold the intermediate values for the whole window before getResult is called

object AggregateFn {

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    val stream = env.fromElements(
      Student("Amy", 2, 100),
      Student("Emily", 3, 60),
      Student("Kelly", 2, 80),
      Student("Mia", 3, 70),
      Student("Selina", 2, 75))
      .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[Student] {
        override def getCurrentWatermark: Watermark = new Watermark(System.currentTimeMillis())

        override def extractTimestamp(t: Student, l: Long): Long = System.currentTimeMillis()
      })
      .keyBy(_.year)
      .timeWindow(Time.seconds(1))
      .aggregate(new AvgScoreFunction())

    stream.print()
    env.execute()
  }
}

class AvgScoreFunction extends AggregateFunction[Student, (Int, Int, Int), (Int, Double)] {

  override def createAccumulator(): (Int, Int, Int) = (0, 0, 0) //(year,studentCount, totalScore)

  override def add(in: Student, acc: (Int, Int, Int)): (Int, Int, Int) = (in.year, acc._2 + 1, acc._3 + in.score)

  override def getResult(acc: (Int, Int, Int)): (Int, Double) = (acc._1, acc._3 / acc._2)

  override def merge(acc1: (Int, Int, Int), acc2: (Int, Int, Int)): (Int, Int, Int) = (acc1._1, acc1._2 + acc2._2, acc1._3 +
                                                                                                                   acc2._3)
}

ProcessWindowFunction

ProcessWindowFunction has the most flexibility , but it is also the most complex reduce/aggregate function to implement, in process method, it could access an iterable type of input elements and do any logic you want by iterating the elements and use Collector to collect any amout of output you want. compare with ReduceFunction and AggregateFunction's state, ProcessWindowFunction need more space to hold the intermediate state because it need to store all the input elements. so we may need to be careful when using ProcessWindowFunction.

the code below shows that we first group the students into group by their year, then calcualte both the max score and the min score of the year within the window

object ProcessWindowFn {

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    val stream = env.fromElements(
      Student("Amy", 2, 100),
      Student("Emily", 3, 60),
      Student("Kelly", 2, 80),
      Student("Mia", 3, 70),
      Student("Selina", 2, 75))
      .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[Student] {
        override def getCurrentWatermark: Watermark = new Watermark(System.currentTimeMillis())

        override def extractTimestamp(t: Student, l: Long): Long = System.currentTimeMillis()
      })
      .keyBy(_.year)
      .timeWindow(Time.seconds(1))
      .process(new MinMaxScoreOfYearFunction())

    stream.print()
    env.execute()
  }
}

case class MinMaxScoreOfYear(year: Int, min: Int, max: Int, endTime: Long)

class MinMaxScoreOfYearFunction extends ProcessWindowFunction[Student, MinMaxScoreOfYear, Int, TimeWindow] {

  override def process(key: Int, context: Context,
                       elements: Iterable[Student],
                       out: Collector[MinMaxScoreOfYear]): Unit = {
    val scores = elements.map(_.score)
    val windowEnd = context.window.getEnd
    out.collect(MinMaxScoreOfYear(key, scores.min, scores.max, windowEnd))
  }
}