今天我们将深入探讨flink的流处理概念和流批一体的api,了解如何使用flink进行数据处理。以下是今天的学习目标:
- 流处理概念(理解):我们将学习流处理的基本概念,包括数据的时效性以及流处理和批处理的区别。
- 程序结构之数据源Source(掌握):掌握如何从不同数据源读取数据,包括文件和数据库。
- 程序结构之数据转换Transformation(掌握):学习如何对数据进行转换操作,如过滤、映射和聚合。
- 程序结构之数据落地Sink(掌握):了解如何将处理后的数据输出到不同的存储介质。
- Flink连接器Connectors(理解):了解Flink如何与外部系统如Kafka和Redis进行集成。
流处理概念
流处理强调数据处理的及时性,适合处理如网站数据访问、被爬虫爬取等实时数据。流处理数据是无界的,通过窗口操作可以划分数据的边界进行计算。相比之下,批处理数据是有界的。Flink在1.12版本时开始支持流批一体,既可以处理流数据也可以处理批数据。

程序结构
Flink的编程模型包括三个主要部分:Source、Transformation和Sink。

-
Source
Source是数据的入口,Flink支持从文件、数据库等多种数据源读取数据。以下是基于文件和数据集合的Source示例:
package cn.itcast.sz22.day02; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; /** * Author itcast * Date 2021/5/5 9:50 * env.readTextFile(本地/HDFS文件/文件夹);//压缩文件也可以 */ public class FileSourceDemo { public static void main(String[] args) throws Exception { //创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //读取文件 hdfs://node1:8020/user/root/xxx.txt //读取通过 gzip 压缩的 gz 文件 DataStreamSourcesource1 = env.readTextFile("data/hello.txt"); DataStreamSource source2 = env.readTextFile("D:\\_java_workspace\\sz22\\data\\hello.txt.gz"); //打印文本 source1.print(); source2.print("source2:"); //执行流环境 env.execute(); } } 基于数据集合的Source:
package cn.itcast.sz22.day02; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; /** * Author itcast * Date 2021/5/5 10:32 * Desc TODO */ public class MySQLSourceDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //1.env 设置并行度为 1 env.setParallelism(1); //2.source,创建连接MySQL数据源 数据源,每2秒钟生成一条数据 DataStreamSourcesource = env.addSource(new RichSourceFunction () { Connection conn; PreparedStatement ps; boolean flag = true; @Override public void open(Configuration parameters) throws Exception { //连接数据源 conn = DriverManager.getConnection("jdbc:mysql://192.168.88.100:3306/bigdata?useSSL=false" , "root", "123456"); //编写读取数据表的sql String sql = "select `id`,`name`,age from t_student"; //准备 preparestatement SQL ps = conn.prepareStatement(sql); } @Override public void run(SourceContext ctx) throws Exception { while (flag) { ResultSet rs = ps.executeQuery(); while (rs.next()) { int id = rs.getInt("id"); String name = rs.getString("name"); int age = rs.getInt("age"); Student student = new Student(id, name, age); ctx.collect(student); } } } @Override public void cancel() { flag = false; } @Override public void close() throws Exception { ps.close(); conn.close(); } }); //3.打印数据源 //4.执行 //创建静态内部类 Student ,字段为 id name age //创建静态内部类 MySQLSource 继承RichParallelSourceFunction // 实现 open 方法 // 获取数据库连接 mysql5.7版本 jdbc:mysql://192.168.88.100:3306/bigdata?useSSL=false // 实现 run 方法 source.print(); env.execute(); } @Data @AllArgsConstructor @NoArgsConstructor public static class Student { private int id; private String name; private int age; } } -
Transformation
Transformation是数据处理的核心,Flink提供了多种转换操作,如map、filter、reduce等。以下是一个示例,展示如何对流数据中的单词进行统计并排除敏感词:
package cn.itcast.sz22.day02; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; import java.util.Arrays; /** * Author itcast * Date 2021/5/5 9:59 * 1.filter过滤 将单词中 heihei 单词过滤掉 * 2.reduce聚合 */ public class SocketSourceFilterDemo { public static void main(String[] args) throws Exception { //1.env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //2.source socketSource DataStreamSourcesource = env.socketTextStream("192.168.88.100", 9998); //3.处理数据-transformation SingleOutputStreamOperator > result = source .flatMap((String value, Collector out) -> Arrays .stream(value.split(" ")).forEach(out::collect)) .returns(Types.STRING) //过滤掉 包含 heihei 单词的所有信息 boolean filter(T value) .filter(word-> !word.equals("heihei")) .map(value -> Tuple2.of(value, 1)) .returns(Types.TUPLE(Types.STRING, Types.INT)) .keyBy(t -> t.f0) //.sum(1); //T reduce(T value1, T value2) // hadoop,1 hadoop,1 => hadoop,1+1 .reduce((Tuple2 a,Tuple2 b)->Tuple2.of(a.f0,a.f1+b.f1)); //3.1每一行数据按照空格切分成一个个的单词组成一个集合 //3.2对集合中的每个单词记为1 //3.3对数据按照单词(key)进行分组 //3.4对各个组内的数据按照数量(value)进行聚合就是求sum //4.输出结果-sink result.print(); //5.触发执行-execute env.execute(); } } 合并和拆分操作也是Transformation的一部分:
import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.streaming.api.datastream.ConnectedStreams; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.CoMapFunction; /** * Author itcast * Date 2021/5/5 11:24 * 将两个String类型的流进行union * 将一个String类型和一个Long类型的流进行connect * */ public class UnionAndConnectDemo { public static void main(String[] args) throws Exception { //1.env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); env.setParallelism(1); //2.Source DataStreamds1 = env.fromElements("hadoop", "spark", "flink"); DataStream ds2 = env.fromElements("hadoop", "spark", "flink"); DataStream ds3 = env.fromElements(1L, 2L, 3L); //3. transformation //3.1 union DataStream union = ds1.union(ds2); union.print("union:"); //3.2 connect ConnectedStreams connect = ds1.connect(ds3); SingleOutputStreamOperator source2 = connect.map(new CoMapFunction () { @Override public String map1(String value) throws Exception { return "string->string:" + value; } @Override public String map2(Long value) throws Exception { return "Long->Long:" + value; } }); //打印输出 source2.print("connect:"); env.execute(); } } 拆分数据流的示例:
package cn.itcast.sz22.day02; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.util.Collector; import org.apache.flink.util.OutputTag; /** * Author itcast * Date 2021/5/5 11:35 * 对流中的数据按照奇数和偶数进行分流,并获取分流后的数据 */ public class SplitStreamDemo { public static void main(String[] args) throws Exception { //1.env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); env.setParallelism(1); //2.Source 比如 1-20之间的数字 DataStreamSourcesource = env.fromSequence(1, 20); //定义两个输出tag 一个奇数 一个偶数,指定类型为Long OutputTag odd = new OutputTag("odd", TypeInformation.of(Long.class)); OutputTag even = new OutputTag("even", TypeInformation.of(Long.class)); //对source的数据进行process处理区分奇偶数 SingleOutputStreamOperator processDS = source.process(new ProcessFunction () { @Override public void processElement(Long value, Context ctx, Collector out) throws Exception { if (value % 2 == 0) { ctx.output(even, value); } else { ctx.output(odd, value); } } }); //3.获取两个侧输出流 DataStream evenDS = processDS.getSideOutput(even); DataStream oddDS = processDS.getSideOutput(odd); //4.sink打印输出 evenDS.printToErr("even"); oddDS.print("odd"); //5.execute env.execute(); } } -
Sink
Sink是数据的出口,Flink支持将数据输出到控制台、文件、数据库等多种存储介质。以下是将数据sink到MySQL的示例:
package cn.itcast.sz22.day02; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; /** * Author itcast * Date 2021/5/5 16:00 * 将 Student 集合数据sink到MySQL数据库中 */ public class SinkMySQLDemo01 { public static void main(String[] args) throws Exception { //1.env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //2.Source 定义 Student 对象 DataStreamstudentDS = env.fromElements(new Student(null, "tonyma", 18)); studentDS.addSink(new RichSinkFunction () { Connection conn; PreparedStatement ps; boolean flag = true; @Override public void open(Configuration parameters) throws Exception { //初始化操作,添加连接MySQL conn = DriverManager.getConnection("jdbc:mysql://192.168.88.100:3306/bigdata?useSSL=false" , "root", "123456"); String sql="INSERT INTO t_student(`id`,`name`,`age`) values(null,?,?)"; ps = conn.prepareStatement(sql); } @Override public void invoke(Student value, Context context) throws Exception { ps.setString(1,value.getName()); ps.setInt(2,value.getAge()); ps.executeUpdate(); } @Override public void close() throws Exception { ps.close(); conn.close(); } }); //3.Transformation 暂时不需要 //4.Sink 实现自定义 MySQL sink //5.execute //创建 Student 类,包含3个字段 id name age //创建 MySQLSink 类继承 RichSinkFunction //实现 open invoke close 方法 env.execute(); } @Data @AllArgsConstructor @NoArgsConstructor public static class Student { private String id; private String name; private int age; } }
Flink连接器Connectors
Flink提供了多种连接器,可以与外部系统如JDBC、Kafka、Redis等进行集成。以下是使用Kafka和Redis的示例:
-
Kafka
package cn.itcast.sz22.day02; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import java.util.Properties; /** * Author itcast * Date 2021/5/5 17:23 * 需求:使用flink-connector-kafka_2.12中的FlinkKafkaConsumer消费Kafka中的数据做WordCount * 需要设置如下参数: * 1.订阅的主题 * 2.反序列化规则 * 3.消费者属性-集群地址 * 4.消费者属性-消费者组id(如果不设置,会有默认的,但是默认的不方便管理) * 5.消费者属性-offset重置规则,如earliest/latest... * 6.动态分区检测(当kafka的分区数变化/增加时,Flink能够检测到!) * 7.如果没有设置Checkpoint,那么可以设置自动提交offset,后续学习了Checkpoint会把offset随着做Checkpoint的时候提交到Checkpoint和默认主题中 */ public class FlinkKafkaConsumerDemo { public static void main(String[] args) throws Exception { //1.env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //开启checkpoint env.enableCheckpointing(5000); //2.Source Properties props = new Properties(); props.setProperty("bootstrap.servers", "node1:9092"); props.setProperty("group.id", "flink"); props.setProperty("auto.offset.reset","latest"); props.setProperty("flink.partition-discovery.interval-millis","5000");//会开启一个后台线程每隔5s检测一下Kafka的分区情况 props.setProperty("enable.auto.commit", "true"); props.setProperty("auto.commit.interval.ms", "2000"); FlinkKafkaConsumerconsumer = new FlinkKafkaConsumer("flink_kafka" , new SimpleStringSchema(), props); consumer.setStartFromEarliest(); DataStreamSource source = env.addSource(consumer).setParallelism(1); source.print(); env.execute(); } } -
Redis
//-1.创建RedisSink之前需要创建RedisConfig //连接单机版Redis FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder() .setHost("node1").build(); result.addSink(new RedisSink<>(config, new RedisMapperEx())); env.execute(); // * 最后将结果保存到Redis 实现 FlinkJedisPoolConfig // * 注意:存储到Redis的数据结构:使用hash也就是map // * key value // * WordCount (单词,数量)public static class RedisMapperEx implements RedisMapper
> { @Override public RedisCommandDescription getCommandDescription() { return new RedisCommandDescription(RedisCommand.HSET, "hashOpt"); } @Override public String getKeyFromData(Tuple2 data) { return data.f0; } @Override public String getValueFromData(Tuple2 data) { return data.f1 + ""; } }
通过以上内容的学习,您将能够掌握Flink的基本使用方法,并能够利用Flink进行流处理和批处理的开发。










