Flink⼊门之Flink程序开发步骤(java语⾔)
⽂章⽬录
注:本篇章的flink学习均是基于java开发语⾔
我们如果要使⽤flink进⾏计算开发,⼀个完整的开发步骤是怎样的呢?
前情回顾:什么叫有界数据流,什么叫⽆界数据流(何为流处理,何为批处理)?
- Batch Analytics,右边是 Streaming Analytics。批量计算: 统⼀收集数据->存储到DB->对数据进⾏批量处理,对数据实时性邀请不⾼,⽐如⽣成离线报表、⽉汇总,⽀付宝年度账单(⼀年结束批处理计算)
- Streaming Analytics 流式计算,顾名思义,就是对数据流进⾏处理,如使⽤流式分析引擎如 Storm,Flink 实时处理分析数据,应⽤较多的场景如 实时报表、车辆实时报警计算等等。
(0)开发程序所需依赖
<properties>
<encoding>UTF-8</encoding>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<mavenpiler.source>1.8</mavenpiler.source>
<mavenpiler.target>1.8</mavenpiler.target>
<java.version>1.8</java.version>
<scala.version>2.12</scala.version>
<flink.version>1.12.2</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
动车与高铁的区别<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>三年级上册数学计划
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/java</sourceDirectory>
<plugins>
<!-- 编译插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<!--<encoding>${project.build.sourceEncoding}</encoding>-->
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.18.1</version>
<configuration>
<useFile>false</useFile>
<disableXmlReport>true</disableXmlReport>
<includes>
<include>**/*Test.*</include>
<include>**/*Suite.*</include>
</includes>
</configuration>
</plugin>
<!-- 打包插件(会包含所有依赖) -->
房产证号查询<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<!--
zip -d learn_spark.jar META-INF/*.RSA META-INF/*.DSA META-INF/*.SF -->
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.source.ManifestResourceTransformer">
<!-- 设置jar包的⼊⼝类(可选) -->
<mainClass></mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
(1)获取执⾏环境
flink程序开发,⾸要的便是需要获取其执⾏环境!
ex:
ExecutionEnvironment env = ExecutionEnvironment();
或者:
StreamExecutionEnvironment env = ExecutionEnvironment();
如果使⽤StreamExecutionEnvironment 默认便是流式处理环境
但是flink1.12 开始,流批⼀体,我们可以⾃⼰指定当前计算程序的环境模式
指定为⾃动模式:AUTOMATIC
此设置后,flink将会⾃动识别数据源类型
有界数据流,则会采⽤批⽅式进⾏数据处理
⽆界束流,则会采⽤流⽅式进⾏数据处理
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
强制指定为批数据处理模式:BATCH
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
强制指定为流数据处理模式:STREAMING
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
注意点:
在flink中,有界与⽆界数据流都可以强指定为流式运⾏环境,但是,如果明知⼀个数据来源为流式数据,就必须设置环境为AUTOMATIC
或STREAMING,不可以指定为BATCH否则程序会报错!
(2)加载/创建数据源
flink,是⼀个计算框架,在计算的前提,肯定是要有数据来源啊!
flink可以从多种场景读取加载数据,例如 各类DB 如Mysql、SQL SERVER、MongoDB、各类MQ 如Kafka、RabbitMQ、以及很多常⽤数据存储场景 如redis、⽂件(本地⽂件/HDFS)、scoket…
我们在加载数据源的时候,便知道,该数据是有界还是⽆界了!
ex:
flink读取rabbitMQ消息,是有界还是⽆界呢?当然是⽆界!因为flink程序启动时,能通过连接知道什么时候MQ中有数据,什么时候没有数据吗?不知道,因为本⾝MQ中是否有消息或者消息有多少就是⼀个不能肯定确定的因素,因此其不得不保持⼀个类似于长连接的形式,⼀直等待MQ中有数据到来,然后处理。
flink读取指定某个⽂件中的数据,那么此数据源是有界还是⽆界呢?当然是有界!因为⽂件中数据,flink读取会做记录,当⽂件内容读完了,数据源就相当于没有新的数据来到了嘛!
ex:
从集合中读取数据:
DataStream<String> elementsSource = env.fromElements("java,scala,php,c++","java,scala,php","java,scala","java");
那么,这是⽆界数据还是有界数据呢?很明显,有界数据!因为数据就这么多,当前数据源在读取时不会再凭空产⽣数据了。
从scoket中读取数据:
DataStreamSource<String> elementsSource= env.socketTextStream("10.50.40.131", 9999);
这是⽆界数据还是有界数据呢?很明显,⽆界数据!因为scoket⼀旦连接,flink不会知道其数据源什么时候会数据结束,其不得不保持⼀个类似于长连接的状态,⼀直等待Scoket中有数据到来,然后处理。
(3)数据转换处理
数据转换处理,就是flink使⽤算⼦,对从数据源中获取的数据进⾏数据加⼯处理(例如 数据转换,计算等等)
例如:开窗⼝、低阶处理函数ProcessFuction、各种算⼦:map(映射,与java8流中Map效果类似),flatmap(元素摊平,与java8流中Map效果类似)等等。
demo⽰例:
DataStreamSource<String> elementsSource = env.fromElements("java,scala,php,c++",
"java,scala,php","java,scala","java");
// 数据处理
DataStream<String> flatMap = elementsSource.flatMap(new FlatMapFunction<String, String>(){
@Override
public void flatMap(String element, Collector<String> out)throws Exception {
String[] wordArr = element.split(",");
for(String word : wordArr){
}
}
});
flatMap.map(new MapFunction<String, String>(){
@Override
伤感唯美歌曲
public String map(String value)throws Exception {
UpperCase();
}
});
(4)处理后数据放置/输出
将计算后的数据,进⾏放置(输出/存储),可以很地⽅,从什么地⽅读取数据,⾃然也可以将计算结果输出到该地点。
例如:输出到⽂件,输出到控制台,输出到MQ,输出到DB,输出到scoket…
ex:输出到控制台
source.print();
(5)执⾏计算程序
flink程序需要启动才能执⾏任务,正如,spring-boot启动程序需要nohup java -jar xxxx.jar & 或者编译器中点击图标按钮启动启动⽰例:
// 1.准备环境
水管漏水怎么办StreamExecutionEnvironment env = ExecutionEnvironment();
// 设置模式(流、批、⾃动)
// 2.加载数据源
// 3.数据转换
// 4.数据输出
恭喜订婚短句八个字// 5.执⾏程序
//或者 ute("指定当前计算程序名");
(6)完整⽰例
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论