大家好,欢迎来到IT知识分享网。
介绍
1. 函数类(Function Classes)
import org.apache.flink.api.common.functions.FilterFunction import org.apache.flink.streaming.api.scala._ object TransFunctionUDFTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val clicks = env .fromElements( Event("Mary", "./home", 1000L), Event("Bob", "./cart", 2000L) ) //通过传入自定义FilterFunction实现过滤 val stream1 = clicks.filter(new FlinkFilter) stream1.print() env.execute() } //自定义FilterFunction函数类 class FlinkFilter extends FilterFunction[Event] {
override def filter(value: Event): Boolean = value.url.contains("home") } }
当然还可以通过匿名类来实现FilterFunction接口:
val filterdStream = stream.filter(new FilterFunction[Event] {
override def filter(value: Event): Boolean = value.url.contains("home") })
stream.filter(new KeywordFilter("home")).print() //自定义FilterFunction函数类,将需要用到的过滤参数作为类的构造参数传入 class KeywordFilter(keyword: String) extends FilterFunction[Event] {
override def filter(value: Event): Boolean = value.url.contains(keyword) }
对于Scala这样的函数式编程语言,更为简单的写法是直接传入一个Lambda表达式: stream.filter(_.url.contains("home")).print()
这样我们用一行代码就可以搞定,显得更加简洁明晰。
2. 富函数类(Rich Function Classes)
import org.apache.flink.api.common.functions.RichMapFunction 64 65 import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.scala._ object RichFunctionTest{
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(2) env.fromElements( Event("Mary", "./home", 1000L), Event("Bob", "./cart", 2000L), Event("Alice", "./prod?id=1", 5 * 1000L), Event("Cary", "./home", 60 * 1000L) ) .map(new RichMapFunction[Event, Long]() {
//在任务生命周期开始时会执行open方法,在控制台打印对应语句 override def open(parameters: Configuration): Unit = {
println("索引为 " + getRuntimeContext.getIndexOfThisSubtask + " 的任务开 始") } // 将点击事件转换成长整型的时间戳输出 override def map(value: Event): Long = value.timestamp //在任务声明周期结束时会执行close方法,在控制台打印对应语句 override def close(): Unit = {
println("索引为 " + getRuntimeContext.getIndexOfThisSubtask + " 的任务结 束") } }) .print() env.execute() } }
输出结果是:
class MyFlatMap extends RichFlatMapFunction[IN,OUT]{
override def open(parameters: Configuration): Unit = {
// 做一些初始化工作 // 例如建立一个和MySQL的连接 } override def flatMap(value: IN, out: Collector[OUT]): Unit = {
// 对数据库进行读写 } override def close(): Unit = {
// 清理工作,关闭和MySQL数据库的连接。 } }
免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://haidsoft.com/125921.html