MapReduce is an old technology in nowadays, but its concept is still valid in data processing world, in previous article I've talked about the Map part, today I'm going to talk about the Reduce part in Flink.
In Flink Streaming Application we could put data into window, after we put the data into window, the data will have a boundary, we could calcualte aggregate result based on the limited data in the window
Reduce Function
Reduce Function combine groups of elements to a single value, by taking always two elements and combining them into one. for example we could use Reduce Function to calcualte the min/max value in the window.
the code below shows that we first group the students into group by their year, then calcualte the min score of the year within the window
object ReduceFn {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val stream = env.fromElements(
Student("Amy", 2, 100),
Student("Emily", 3, 60),
Student("Kelly", 2, 80),
Student("Mia", 3, 70),
Student("Selina", 2, 75))
.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[Student] {
override def getCurrentWatermark: Watermark = new Watermark(System.currentTimeMillis())
override def extractTimestamp(t: Student, l: Long): Long = System.currentTimeMillis()
})
.keyBy(_.year)
.timeWindow(Time.seconds(1))
.reduce((s1, s2) => {
if (s1.score > s2.score) {
s2
} else {
s1
}
})
stream.print()
env.execute()
}
}
Aggregate Function
Aggregate Function has more flexibility than Reduce function, it could have an intermediate aggregate state called accumulator
, Values are added to the accumulator, and final aggregates are obtained by finalizing the accumulator state. This supports aggregation functions where the intermediate state needs to be different than the aggregated values and the final result type
the code below shows that we first group the students into group by their year, then calcualte the average score of the year within the window, the Accumulator could hold the intermediate values for the whole window before getResult
is called
object AggregateFn {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val stream = env.fromElements(
Student("Amy", 2, 100),
Student("Emily", 3, 60),
Student("Kelly", 2, 80),
Student("Mia", 3, 70),
Student("Selina", 2, 75))
.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[Student] {
override def getCurrentWatermark: Watermark = new Watermark(System.currentTimeMillis())
override def extractTimestamp(t: Student, l: Long): Long = System.currentTimeMillis()
})
.keyBy(_.year)
.timeWindow(Time.seconds(1))
.aggregate(new AvgScoreFunction())
stream.print()
env.execute()
}
}
class AvgScoreFunction extends AggregateFunction[Student, (Int, Int, Int), (Int, Double)] {
override def createAccumulator(): (Int, Int, Int) = (0, 0, 0) //(year,studentCount, totalScore)
override def add(in: Student, acc: (Int, Int, Int)): (Int, Int, Int) = (in.year, acc._2 + 1, acc._3 + in.score)
override def getResult(acc: (Int, Int, Int)): (Int, Double) = (acc._1, acc._3 / acc._2)
override def merge(acc1: (Int, Int, Int), acc2: (Int, Int, Int)): (Int, Int, Int) = (acc1._1, acc1._2 + acc2._2, acc1._3 +
acc2._3)
}
ProcessWindowFunction
ProcessWindowFunction has the most flexibility , but it is also the most complex reduce/aggregate function to implement, in process
method, it could access an iterable type of input elements and do any logic you want by iterating the elements and use Collector to collect any amout of output you want. compare with ReduceFunction
and AggregateFunction
's state, ProcessWindowFunction
need more space to hold the intermediate state because it need to store all the input elements. so we may need to be careful when using ProcessWindowFunction
.
the code below shows that we first group the students into group by their year, then calcualte both the max score and the min score of the year within the window
object ProcessWindowFn {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val stream = env.fromElements(
Student("Amy", 2, 100),
Student("Emily", 3, 60),
Student("Kelly", 2, 80),
Student("Mia", 3, 70),
Student("Selina", 2, 75))
.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[Student] {
override def getCurrentWatermark: Watermark = new Watermark(System.currentTimeMillis())
override def extractTimestamp(t: Student, l: Long): Long = System.currentTimeMillis()
})
.keyBy(_.year)
.timeWindow(Time.seconds(1))
.process(new MinMaxScoreOfYearFunction())
stream.print()
env.execute()
}
}
case class MinMaxScoreOfYear(year: Int, min: Int, max: Int, endTime: Long)
class MinMaxScoreOfYearFunction extends ProcessWindowFunction[Student, MinMaxScoreOfYear, Int, TimeWindow] {
override def process(key: Int, context: Context,
elements: Iterable[Student],
out: Collector[MinMaxScoreOfYear]): Unit = {
val scores = elements.map(_.score)
val windowEnd = context.window.getEnd
out.collect(MinMaxScoreOfYear(key, scores.min, scores.max, windowEnd))
}
}