In a stream application, it's very common to split the stream into multiple streams and apply different logic, in Flink there are 2 ways to do that
SideOutput Stream
This is the preferred way to split Flink stream into multiple streams, the advantange obout SideOutput is that the SideOutput do not need to be the same type as the main stream, and the main stream is not impacted on the side output stream like example below, we have a stream of Student mix of year 2 and year3, we could apply a ProcessFunction to the main stream and split the main stream into 2 sub streams with different tags "year2 students" and "year3 students", the side output stream's type is String
which is different from main stream type Student
, the main stream could continue process without any side effect from the side output streams.
object SideOutputStream {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.fromElements(
Student("Amy", 2, 100),
Student("Emily", 3, 60),
Student("Kelly", 2, 80),
Student("Mia", 3, 70),
Student("Selina", 2, 75))
.process(new StudentYearProcessor())
val year2StudentStream = stream.getSideOutput(new OutputTag[String]("year2 students"))
.map(_.toLowerCase)
val year3StudentStream = stream.getSideOutput(new OutputTag[String]("year3 students"))
.map(_.toUpperCase)
year2StudentStream.print()
year3StudentStream.print()
stream.print()
env.execute()
}
}
class StudentYearProcessor extends ProcessFunction[Student, Student] {
lazy val year2StudentOutput: OutputTag[String] = new OutputTag[String]("year2 students")
lazy val year3StudentOutput: OutputTag[String] = new OutputTag[String]("year3 students")
override def processElement(student: Student,
context: ProcessFunction[Student, Student]#Context,
collector: Collector[Student]): Unit = {
if (student.year == 2) {
context.output(year2StudentOutput, student.name)
}
if (student.year == 3) {
context.output(year3StudentOutput, student.name)
}
collector.collect(student)
}
}
SplitStream
the other way to achive the same thing is to use split
method on DataStream API, we will get a SplitStream , this method is already deprecated since Flink 1.6 , compare with Side OutputStream , SplitStream must use the same type of main stream and after the main stream get splitted, the main stream is gone. see example below
object SplitStream {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val splitStream = env.fromElements(
Student("Amy", 2, 100),
Student("Emily", 3, 60),
Student("Kelly", 2, 80),
Student("Mia", 3, 70),
Student("Selina", 2, 75))
//split is deprecated since flink 1.6 use side out put instead
.split(s => s.year match {
case 2 => Seq("y2")
case 3 => Seq("y3")
})
val y2Stream = splitStream.select("y2")
.map(s => Student(s.name.toLowerCase, s.year, s.score)).setParallelism(2)
val y3Stream = splitStream.select("y3")
.map(s => Student(s.name.toUpperCase, s.year, s.score)).setParallelism(3)
y2Stream.print().setParallelism(2)
y3Stream.print().setParallelism(1)
env.execute("Split stream")
}
}