Apache Beam could be used as API layer of Apache Flink, function is the function is the fundamental ops in these 2 frameworks, a stream of data of a bundle of data would be send to functions for processing in parallel across many different nodes. in Beam it use at-least-once strategy to process data in functions which means if a function failed for given stream or bundle of data, these stream or bundle of data will be send to the same function in other nodes to retry, in flink, it provide exactly-once support with it's state management which means flink will guaranttee that any data in the steam of bundle of data would only be processed once this is a very hard goal to achieve. in this article, I will talk about how use function in both beam and flink to process data.

flink

most of flink api support lambda function in both scala and java version, if there is no complex logic in the function, it's really neat to use lambda function , like below, assume we have a case class named Student case class Student(name: String, year: Int, score: Int)

object BasicTransform {

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.fromElements(
      Student("Amy", 2, 100),
      Student("Emily", 3, 60),
      Student("Kelly", 2, 80),
      Student("Mia", 3, 70),
      Student("Selina", 2, 75))
      .map(s => Student(s.name.toLowerCase, s.year, s.score))
      .filter(s => s.score > 70)
      .flatMap(s => List(s, s))
      .print()

    env.execute("basic stream")
  }
}

in the code above, we will map the student's name to lower case and filter out those student whoes score is less than 70 and also double the output by flapmap function.

every flink lambda function will have a related rich functions, the difference between lambda functions and rich functions is that rich functions has open and close methods to override , the open method is called before the actual working methods (like map or join) and thus suitable for one time setup work. For functions that are part of an iteration, this method will be invoked at the beginning of each iteration superstep. the close method called after the last call to the main working methods (e.g. map or join). For functions that are part of an iteration, this method will be invoked after each iteration superstep.

The code below is showing how to use RichFlatMap function, it could have a private attribute named subTaskIndex, it could be initialized in open method and tear down in close method. the function class will be serialized and send across different worker nodes in runtime, so it's critical to make sure the function class is serializable, and makr the private field as transient when needed to avoid serialization effort.

object RichFlatMapFn {

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val stream = env.fromElements(
      Student("Amy", 2, 100),
      Student("Emily", 3, 60),
      Student("Kelly", 2, 80),
      Student("Mia", 3, 70),
      Student("Selina", 2, 75))
      .flatMap(new StudentFlatMapFn)

    stream.print()
    env.execute()
  }
}

class StudentFlatMapFn extends RichFlatMapFunction[Student, (Int, Student)] {

  @transient
  var subTaskIndex = 0

  override def open(config: Configuration): Unit = subTaskIndex = getRuntimeContext.getIndexOfThisSubtask

  override def close(): Unit = super.close()

  override def flatMap(in: Student,
                       collector: Collector[(Int, Student)]): Unit = {
    for (idx <- 0 to subTaskIndex) {
      collector.collect(idx, in)
    }
  }
}

beam functions

Beam functions is more like Flink rich functions, a basic Beam function will be like below, the method which is annotate by @DoFn.Setup will be called before any DoFn which is similar to Flink's open method and the method which is annotate by @DoFn.Teardown will be called to clean up this instance before it is discarded. No other method will be called after a call to the annotated method is made which is similar to Flink's close method

import org.apache.beam.sdk.transforms.DoFn;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * @author ivan
 */
public class XXFn<I, O> extends DoFn<I, O> {

  private static Logger log = LoggerFactory.getLogger(XXFn.class); //NOPMD

  @Setup
  public final void setup() throws Exception {
    log.debug("Setup called");
  }

  @Teardown
  public final void teardown() throws Exception {
    log.info("Teardown complete");
  }

  @ProcessElement
  public final void processElement(final ProcessContext context) throws Exception {
    
  }
}

Beam also provide annotation like @DoFn.StartBundle and @DoFn.FinishBundle to be called before and after a bundle of data has been processed , on the other hand Flink also provide ways for some other fine control, e.g: we could setup Flink's parallelism for each task separately , but Beam API does not provide such capacity to do so.