用户自定义函数(UDF)

用户自定义函数(UDF)本文介绍了 ApacheFlink 中 DataStreamAP 的编程风格 重点讲解了用户自定义函数 UDF 的使用 包括函数类 匿名类和 Lambda 表达式

大家好,欢迎来到IT知识分享网。

介绍

1. 函数类(Function Classes)

import org.apache.flink.api.common.functions.FilterFunction import org.apache.flink.streaming.api.scala._ object TransFunctionUDFTest { 
    def main(args: Array[String]): Unit = { 
    val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val clicks = env .fromElements( Event("Mary", "./home", 1000L), Event("Bob", "./cart", 2000L) ) //通过传入自定义FilterFunction实现过滤  val stream1 = clicks.filter(new FlinkFilter) stream1.print() env.execute() } //自定义FilterFunction函数类  class FlinkFilter extends FilterFunction[Event] { 
    override def filter(value: Event): Boolean = value.url.contains("home") } } 

当然还可以通过匿名类来实现FilterFunction接口:

val filterdStream = stream.filter(new FilterFunction[Event] { 
    override def filter(value: Event): Boolean = value.url.contains("home") }) 
stream.filter(new KeywordFilter("home")).print() //自定义FilterFunction函数类,将需要用到的过滤参数作为类的构造参数传入  class KeywordFilter(keyword: String) extends FilterFunction[Event] { 
    override def filter(value: Event): Boolean = value.url.contains(keyword) } 

对于Scala这样的函数式编程语言,更为简单的写法是直接传入一个Lambda表达式: stream.filter(_.url.contains("home")).print()

这样我们用一行代码就可以搞定,显得更加简洁明晰。

2. 富函数类(Rich Function Classes)

import org.apache.flink.api.common.functions.RichMapFunction 64 65 import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.scala._ object RichFunctionTest{ 
    def main(args: Array[String]): Unit = { 
    val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(2) env.fromElements( Event("Mary", "./home", 1000L), Event("Bob", "./cart", 2000L), Event("Alice", "./prod?id=1", 5 * 1000L), Event("Cary", "./home", 60 * 1000L) ) .map(new RichMapFunction[Event, Long]() { 
    //在任务生命周期开始时会执行open方法,在控制台打印对应语句  override def open(parameters: Configuration): Unit = { 
    println("索引为 " + getRuntimeContext.getIndexOfThisSubtask + " 的任务开 始") } // 将点击事件转换成长整型的时间戳输出  override def map(value: Event): Long = value.timestamp //在任务声明周期结束时会执行close方法,在控制台打印对应语句  override def close(): Unit = { 
    println("索引为 " + getRuntimeContext.getIndexOfThisSubtask + " 的任务结 束") } }) .print() env.execute() } } 

输出结果是:

class MyFlatMap extends RichFlatMapFunction[IN,OUT]{ 
    override def open(parameters: Configuration): Unit = { 
    // 做一些初始化工作  // 例如建立一个和MySQL的连接  } override def flatMap(value: IN, out: Collector[OUT]): Unit = { 
    // 对数据库进行读写  } override def close(): Unit = { 
    // 清理工作,关闭和MySQL数据库的连接。  } } 

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://haidsoft.com/125921.html

(0)
上一篇 2025-09-22 22:20
下一篇 2025-09-22 22:26

相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注微信