0

0

2021年最新最全Flink系列教程_Flink原理初探和流批一体API(二)

看不見的法師

看不見的法師

发布时间:2025-09-17 08:09:01

|

347人浏览过

|

来源于php中文网

原创

今天我们将深入探讨flink的流处理概念和流批一体的api,了解如何使用flink进行数据处理。以下是今天的学习目标:

  • 流处理概念(理解):我们将学习流处理的基本概念,包括数据的时效性以及流处理和批处理的区别。
  • 程序结构之数据源Source(掌握):掌握如何从不同数据源读取数据,包括文件和数据库。
  • 程序结构之数据转换Transformation(掌握):学习如何对数据进行转换操作,如过滤、映射和聚合。
  • 程序结构之数据落地Sink(掌握):了解如何将处理后的数据输出到不同的存储介质。
  • Flink连接器Connectors(理解):了解Flink如何与外部系统如Kafka和Redis进行集成。

流处理概念

流处理强调数据处理的及时性,适合处理如网站数据访问、被爬虫爬取等实时数据。流处理数据是无界的,通过窗口操作可以划分数据的边界进行计算。相比之下,批处理数据是有界的。Flink在1.12版本时开始支持流批一体,既可以处理流数据也可以处理批数据。

2021年最新最全Flink系列教程_Flink原理初探和流批一体API(二)

程序结构

Flink的编程模型包括三个主要部分:Source、Transformation和Sink。

2021年最新最全Flink系列教程_Flink原理初探和流批一体API(二)

  • Source

    Source是数据的入口,Flink支持从文件、数据库等多种数据源读取数据。以下是基于文件和数据集合的Source示例:

    package cn.itcast.sz22.day02;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    /** 
     * Author itcast 
     * Date 2021/5/5 9:50 
     * env.readTextFile(本地/HDFS文件/文件夹);//压缩文件也可以 
     */
    public class FileSourceDemo {
        public static void main(String[] args) throws Exception {
            //创建执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            //读取文件 hdfs://node1:8020/user/root/xxx.txt
            //读取通过 gzip 压缩的 gz 文件
            DataStreamSource source1 = env.readTextFile("data/hello.txt");
            DataStreamSource source2 = env.readTextFile("D:\\_java_workspace\\sz22\\data\\hello.txt.gz");
            //打印文本
            source1.print();
            source2.print("source2:");
            //执行流环境
            env.execute();
        }
    }

    基于数据集合的Source:

    package cn.itcast.sz22.day02;
    import lombok.AllArgsConstructor;
    import lombok.Data;
    import lombok.NoArgsConstructor;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
    import java.sql.Connection;
    import java.sql.DriverManager;
    import java.sql.PreparedStatement;
    import java.sql.ResultSet;
    /** 
     * Author itcast 
     * Date 2021/5/5 10:32 
     * Desc TODO 
     */
    public class MySQLSourceDemo {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            //1.env 设置并行度为 1
            env.setParallelism(1);
            //2.source,创建连接MySQL数据源 数据源,每2秒钟生成一条数据
            DataStreamSource source = env.addSource(new RichSourceFunction() {
                Connection conn;
                PreparedStatement ps;
                boolean flag = true;
                @Override
                public void open(Configuration parameters) throws Exception {
                    //连接数据源
                    conn = DriverManager.getConnection("jdbc:mysql://192.168.88.100:3306/bigdata?useSSL=false"
                            , "root", "123456");
                    //编写读取数据表的sql
                    String sql = "select `id`,`name`,age from t_student";
                    //准备 preparestatement SQL
                    ps = conn.prepareStatement(sql);
                }
                @Override
                public void run(SourceContext ctx) throws Exception {
                    while (flag) {
                        ResultSet rs = ps.executeQuery();
                        while (rs.next()) {
                            int id = rs.getInt("id");
                            String name = rs.getString("name");
                            int age = rs.getInt("age");
                            Student student = new Student(id, name, age);
                            ctx.collect(student);
                        }
                    }
                }
                @Override
                public void cancel() {
                    flag = false;
                }
                @Override
                public void close() throws Exception {
                    ps.close();
                    conn.close();
                }
            });
            //3.打印数据源
            //4.执行
            //创建静态内部类 Student ,字段为 id name age
            //创建静态内部类 MySQLSource 继承RichParallelSourceFunction
            // 实现 open 方法
            // 获取数据库连接 mysql5.7版本  jdbc:mysql://192.168.88.100:3306/bigdata?useSSL=false
            // 实现 run 方法
            source.print();
            env.execute();
        }
        @Data
        @AllArgsConstructor
        @NoArgsConstructor
        public static class Student {
            private int id;
            private String name;
            private int age;
        }
    }
  • Transformation

    Transformation是数据处理的核心,Flink提供了多种转换操作,如map、filter、reduce等。以下是一个示例,展示如何对流数据中的单词进行统计并排除敏感词:

    LobeHub
    LobeHub

    LobeChat brings you the best user experience of ChatGPT, OLLaMA, Gemini, Claude

    下载
    package cn.itcast.sz22.day02;
    import org.apache.flink.api.common.typeinfo.Types;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.util.Collector;
    import java.util.Arrays;
    /** 
     * Author itcast 
     * Date 2021/5/5 9:59 
     * 1.filter过滤 将单词中  heihei 单词过滤掉 
     * 2.reduce聚合 
     */
    public class SocketSourceFilterDemo {
        public static void main(String[] args) throws Exception {
            //1.env
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            //2.source socketSource
            DataStreamSource source = env.socketTextStream("192.168.88.100", 9998);
            //3.处理数据-transformation
            SingleOutputStreamOperator> result = source
                    .flatMap((String value, Collector out) -> Arrays
                            .stream(value.split(" ")).forEach(out::collect))
                    .returns(Types.STRING)
                    //过滤掉 包含 heihei 单词的所有信息 boolean filter(T value)
                    .filter(word-> !word.equals("heihei"))
                    .map(value -> Tuple2.of(value, 1))
                    .returns(Types.TUPLE(Types.STRING, Types.INT))
                    .keyBy(t -> t.f0)
                    //.sum(1);
                    //T reduce(T value1, T value2)
                    // hadoop,1 hadoop,1 => hadoop,1+1
            .reduce((Tuple2 a,Tuple2 b)->Tuple2.of(a.f0,a.f1+b.f1));
            //3.1每一行数据按照空格切分成一个个的单词组成一个集合
            //3.2对集合中的每个单词记为1
            //3.3对数据按照单词(key)进行分组
            //3.4对各个组内的数据按照数量(value)进行聚合就是求sum
            //4.输出结果-sink
            result.print();
            //5.触发执行-execute
            env.execute();
        }
    }

    合并和拆分操作也是Transformation的一部分:

    import org.apache.flink.api.common.RuntimeExecutionMode;
    import org.apache.flink.streaming.api.datastream.ConnectedStreams;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.co.CoMapFunction;
    /** 
     * Author itcast 
     * Date 2021/5/5 11:24 
     * 将两个String类型的流进行union 
     * 将一个String类型和一个Long类型的流进行connect 
     * */
    public class UnionAndConnectDemo {
        public static void main(String[] args) throws Exception {
            //1.env
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
            env.setParallelism(1);
            //2.Source
            DataStream ds1 = env.fromElements("hadoop", "spark", "flink");
            DataStream ds2 = env.fromElements("hadoop", "spark", "flink");
            DataStream ds3 = env.fromElements(1L, 2L, 3L);
            //3. transformation
            //3.1 union
            DataStream union = ds1.union(ds2);
            union.print("union:");
            //3.2 connect
            ConnectedStreams connect = ds1.connect(ds3);
            SingleOutputStreamOperator source2 = connect.map(new CoMapFunction() {
                @Override
                public String map1(String value) throws Exception {
                    return "string->string:" + value;
                }
                @Override
                public String map2(Long value) throws Exception {
                    return "Long->Long:" + value;
                }
            });
            //打印输出
            source2.print("connect:");
            env.execute();
        }
    }

    拆分数据流的示例:

    package cn.itcast.sz22.day02;
    import org.apache.flink.api.common.RuntimeExecutionMode;
    import org.apache.flink.api.common.typeinfo.TypeInformation;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.ProcessFunction;
    import org.apache.flink.util.Collector;
    import org.apache.flink.util.OutputTag;
    /** 
     * Author itcast 
     * Date 2021/5/5 11:35 
     * 对流中的数据按照奇数和偶数进行分流,并获取分流后的数据 
     */
    public class SplitStreamDemo {
        public static void main(String[] args) throws Exception {
            //1.env
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
            env.setParallelism(1);
            //2.Source 比如 1-20之间的数字
            DataStreamSource source = env.fromSequence(1, 20);
            //定义两个输出tag 一个奇数 一个偶数,指定类型为Long
            OutputTag odd = new OutputTag("odd", TypeInformation.of(Long.class));
            OutputTag even = new OutputTag("even", TypeInformation.of(Long.class));
            //对source的数据进行process处理区分奇偶数
            SingleOutputStreamOperator processDS = source.process(new ProcessFunction() {
                @Override
                public void processElement(Long value, Context ctx, Collector out) throws Exception {
                    if (value % 2 == 0) {
                        ctx.output(even, value);
                    } else {
                        ctx.output(odd, value);
                    }
                }
            });
            //3.获取两个侧输出流
            DataStream evenDS = processDS.getSideOutput(even);
            DataStream oddDS = processDS.getSideOutput(odd);
            //4.sink打印输出
            evenDS.printToErr("even");
            oddDS.print("odd");
            //5.execute
            env.execute();
        }
    }
  • Sink

    Sink是数据的出口,Flink支持将数据输出到控制台、文件、数据库等多种存储介质。以下是将数据sink到MySQL的示例:

    package cn.itcast.sz22.day02;
    import lombok.AllArgsConstructor;
    import lombok.Data;
    import lombok.NoArgsConstructor;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
    import java.sql.Connection;
    import java.sql.DriverManager;
    import java.sql.PreparedStatement;
    /** 
     * Author itcast 
     * Date 2021/5/5 16:00 
     * 将 Student 集合数据sink到MySQL数据库中 
     */
    public class SinkMySQLDemo01 {
        public static void main(String[] args) throws Exception {
            //1.env
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            //2.Source 定义 Student 对象
            DataStream studentDS = env.fromElements(new Student(null, "tonyma", 18));
            studentDS.addSink(new RichSinkFunction() {
                Connection conn;
                PreparedStatement ps;
                boolean flag = true;
                @Override
                public void open(Configuration parameters) throws Exception {
                    //初始化操作,添加连接MySQL
                    conn = DriverManager.getConnection("jdbc:mysql://192.168.88.100:3306/bigdata?useSSL=false"
                            , "root", "123456");
                    String sql="INSERT INTO t_student(`id`,`name`,`age`) values(null,?,?)";
                    ps = conn.prepareStatement(sql);
                }
                @Override
                public void invoke(Student value, Context context) throws Exception {
                    ps.setString(1,value.getName());
                    ps.setInt(2,value.getAge());
                    ps.executeUpdate();
                }
                @Override
                public void close() throws Exception {
                    ps.close();
                    conn.close();
                }
            });
            //3.Transformation 暂时不需要
            //4.Sink 实现自定义 MySQL sink
            //5.execute
            //创建 Student 类,包含3个字段 id name age
            //创建 MySQLSink 类继承 RichSinkFunction
            //实现 open invoke close 方法
            env.execute();
        }
        @Data
        @AllArgsConstructor
        @NoArgsConstructor
        public static class Student {
            private String id;
            private String name;
            private int age;
        }
    }

Flink连接器Connectors

Flink提供了多种连接器,可以与外部系统如JDBC、Kafka、Redis等进行集成。以下是使用Kafka和Redis的示例:

  • Kafka

    package cn.itcast.sz22.day02;
    import org.apache.flink.api.common.serialization.SimpleStringSchema;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
    import java.util.Properties;
    /** 
     * Author itcast 
     * Date 2021/5/5 17:23 
     * 需求:使用flink-connector-kafka_2.12中的FlinkKafkaConsumer消费Kafka中的数据做WordCount 
     * 需要设置如下参数: 
     * 1.订阅的主题 
     * 2.反序列化规则 
     * 3.消费者属性-集群地址 
     * 4.消费者属性-消费者组id(如果不设置,会有默认的,但是默认的不方便管理) 
     * 5.消费者属性-offset重置规则,如earliest/latest... 
     * 6.动态分区检测(当kafka的分区数变化/增加时,Flink能够检测到!) 
     * 7.如果没有设置Checkpoint,那么可以设置自动提交offset,后续学习了Checkpoint会把offset随着做Checkpoint的时候提交到Checkpoint和默认主题中 
     */
    public class FlinkKafkaConsumerDemo {
        public static void main(String[] args) throws Exception {
            //1.env
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            //开启checkpoint
            env.enableCheckpointing(5000);
            //2.Source
            Properties props  = new Properties();
            props.setProperty("bootstrap.servers", "node1:9092");
            props.setProperty("group.id", "flink");
            props.setProperty("auto.offset.reset","latest");
            props.setProperty("flink.partition-discovery.interval-millis","5000");//会开启一个后台线程每隔5s检测一下Kafka的分区情况
            props.setProperty("enable.auto.commit", "true");
            props.setProperty("auto.commit.interval.ms", "2000");
            FlinkKafkaConsumer consumer = new FlinkKafkaConsumer("flink_kafka"
                    , new SimpleStringSchema(), props);
            consumer.setStartFromEarliest();
            DataStreamSource source = env.addSource(consumer).setParallelism(1);
            source.print();
            env.execute();
        }
    }
  • Redis

    //-1.创建RedisSink之前需要创建RedisConfig
    //连接单机版Redis
    FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder()
            .setHost("node1").build();
    result.addSink(new RedisSink<>(config, new RedisMapperEx()));
    env.execute();
    // * 最后将结果保存到Redis 实现 FlinkJedisPoolConfig
    // * 注意:存储到Redis的数据结构:使用hash也就是map
    // * key value
    // * WordCount (单词,数量)
    

    public static class RedisMapperEx implements RedisMapper> { @Override public RedisCommandDescription getCommandDescription() { return new RedisCommandDescription(RedisCommand.HSET, "hashOpt"); } @Override public String getKeyFromData(Tuple2 data) { return data.f0; } @Override public String getValueFromData(Tuple2 data) { return data.f1 + ""; } }

通过以上内容的学习,您将能够掌握Flink的基本使用方法,并能够利用Flink进行流处理和批处理的开发。

相关专题

更多
mysql修改数据表名
mysql修改数据表名

MySQL修改数据表:1、首先查看数据库中所有的表,代码为:‘SHOW TABLES;’;2、修改表名,代码为:‘ALTER TABLE 旧表名 RENAME [TO] 新表名;’。php中文网还提供MySQL的相关下载、相关课程等内容,供大家免费下载使用。

664

2023.06.20

MySQL创建存储过程
MySQL创建存储过程

存储程序可以分为存储过程和函数,MySQL中创建存储过程和函数使用的语句分别为CREATE PROCEDURE和CREATE FUNCTION。使用CALL语句调用存储过程智能用输出变量返回值。函数可以从语句外调用(通过引用函数名),也能返回标量值。存储过程也可以调用其他存储过程。php中文网还提供MySQL创建存储过程的相关下载、相关课程等内容,供大家免费下载使用。

246

2023.06.21

mongodb和mysql的区别
mongodb和mysql的区别

mongodb和mysql的区别:1、数据模型;2、查询语言;3、扩展性和性能;4、可靠性。本专题为大家提供mongodb和mysql的区别的相关的文章、下载、课程内容,供大家免费下载体验。

281

2023.07.18

mysql密码忘了怎么查看
mysql密码忘了怎么查看

MySQL是一个关系型数据库管理系统,由瑞典MySQL AB 公司开发,属于 Oracle 旗下产品。MySQL 是最流行的关系型数据库管理系统之一,在 WEB 应用方面,MySQL是最好的 RDBMS 应用软件之一。那么mysql密码忘了怎么办呢?php中文网给大家带来了相关的教程以及文章,欢迎大家前来阅读学习。

515

2023.07.19

mysql创建数据库
mysql创建数据库

MySQL是一个关系型数据库管理系统,由瑞典MySQL AB 公司开发,属于 Oracle 旗下产品。MySQL 是最流行的关系型数据库管理系统之一,在 WEB 应用方面,MySQL是最好的 RDBMS 应用软件之一。那么mysql怎么创建数据库呢?php中文网给大家带来了相关的教程以及文章,欢迎大家前来阅读学习。

255

2023.07.25

mysql默认事务隔离级别
mysql默认事务隔离级别

MySQL是一种广泛使用的关系型数据库管理系统,它支持事务处理。事务是一组数据库操作,它们作为一个逻辑单元被一起执行。为了保证事务的一致性和隔离性,MySQL提供了不同的事务隔离级别。php中文网给大家带来了相关的教程以及文章欢迎大家前来学习阅读。

386

2023.08.08

sqlserver和mysql区别
sqlserver和mysql区别

SQL Server和MySQL是两种广泛使用的关系型数据库管理系统。它们具有相似的功能和用途,但在某些方面存在一些显著的区别。php中文网给大家带来了相关的教程以及文章,欢迎大家前来学习阅读。

530

2023.08.11

mysql忘记密码
mysql忘记密码

MySQL是一种关系型数据库管理系统,关系数据库将数据保存在不同的表中,而不是将所有数据放在一个大仓库内,这样就增加了速度并提高了灵活性。那么忘记mysql密码我们该怎么解决呢?php中文网给大家带来了相关的教程以及其他关于mysql的文章,欢迎大家前来学习阅读。

599

2023.08.14

Java编译相关教程合集
Java编译相关教程合集

本专题整合了Java编译相关教程,阅读专题下面的文章了解更多详细内容。

9

2026.01.21

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
MySQL 教程
MySQL 教程

共48课时 | 1.8万人学习

MySQL 初学入门(mosh老师)
MySQL 初学入门(mosh老师)

共3课时 | 0.3万人学习

简单聊聊mysql8与网络通信
简单聊聊mysql8与网络通信

共1课时 | 805人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号