Flink入门主要从以下几个方面开始,逐层扩展
Apache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams. Flink has been designed to run in all common cluster environments, perform computations at in-memory speed and at any scale.
Process Unbounded and Bounded Data
type | explain |
---|---|
Unbounded streams | have a start but no defined end |
Bounded streams | have a defined start and end |
Processing of bounded streams is also known as batch processing.
Deploy Applications Anywhere
Flink integrates with all common cluster resource managers such as Hadoop YARN, Apache Mesos, and Kubernetes but can also be setup to run as a stand-alone cluster.
Flink Layer
Flink provides three layered APIs. Each API offers a different trade-off between conciseness and expressiveness and targets different use cases.
框架 | 区别 |
---|---|
Flink | 流式处理为主,批处理是流处理的一个特例 |
Storm | 流式,Tuple |
Spark Streaming | 结构化流,批处理为主,流式处理是批处理的一个特例(mini batch) |
Below, we explore the most common types of applications that are powered by Flink and give pointers to real-world examples.
What are typical event-driven applications?
What are typical data analytics applications?
What are typical data pipeline applications?
阿里收购了Flink的母公司,自己在做Blink
如何高效的学习Flink(一个新的框架)?
官网
源码
mavne把源码关联上
github example
此部分代码可以运行
code
文件夹下的flink-quickstart
开发环境准备
pom
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.sisyphus</groupId>
<artifactId>flink</artifactId>
<version>1.0-SNAPSHOT</version>
<name>${project.artifactId}</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.10.1</flink.version>
<scala.binary.version>2.11</scala.binary.version>
<scala.version>2.11.8</scala.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-filesystem_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.1</version>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.0</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
<configuration>
<args>
<arg>-dependencyfile</arg>
<arg>${project.build.directory}/.scala_dependencies</arg>
</args>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.6</version>
<configuration>
<useFile>false</useFile>
<disableXmlReport>true</disableXmlReport>
<!-- If you have classpath issue like NoDefClassError,... -->
<!-- useManifestOnlyJar>false</useManifestOnlyJar -->
<includes>
<include>**/*Test.*</include>
<include>**/*Suite.*</include>
</includes>
</configuration>
</plugin>
</plugins>
</build>
</project>
code
import org.apache.flink.api.scala.ExecutionEnvironment
/**
* 使用Scala开发Flink的批处理应用程序
* 文件内容:
* Good Good Study
* Day Day Up
*/
object BatchWCScalaApp {
def main(args: Array[String]): Unit = {
// 1. enviroment
val env = ExecutionEnvironment.getExecutionEnvironment
// 2. source
val path = this.getClass.getClassLoader.getResource("wordcount").getPath
val input = path
val data = env.readTextFile(input)
// 引入隐式转换
import org.apache.flink.api.scala._
// 3. transformation
val res = data.flatMap(_.toLowerCase.split(" "))
.filter(_.nonEmpty)
.map((_, 1))
.groupBy(0)
.sum(1)
// 4. sink
res.print()
}
}
Note:隐式类型转换是什么?
code
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
/**
* 使用Scala开发Flink的实时处理应用程序
*
* 启动netcat作为输入流: nc -lk 9999
*/
object StreamingWCScalaApp {
def main(args: Array[String]): Unit = {
// 1. enviroment
val env = StreamExecutionEnvironment.getExecutionEnvironment
// StreamExecutionEnvironment.createLocalEnvironment()
// 引入隐式转换
import org.apache.flink.api.scala._
// 2. source
val data = env.socketTextStream("localhost", 9999)
// 3. transformation
val res = data.flatMap(_.split(","))
.map((_, 1))
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1)
// 4. sink
res.print()
.setParallelism(1)
// 5. execute
env.execute("StreamingWCScalaApp")
}
}
大数据处理的流程:
框架 | 流程 |
---|---|
MapReduce | input —> map(reduce) —> output |
Storm | input —> Spout/Bolt —> output |
Spark | input —> transformation/action —> output |
Flink | input —> transformation/sink —> output |
Flink programs look like regular programs that transform DataStreams
. Each program consists of the same basic parts:
execution environment
,All Flink programs are executed lazily: When the program’s main method is executed, the data loading and transformations do not happen directly. Rather, each operation is created and added to a dataflow graph. The operations are actually executed when the execution is explicitly triggered by an execute()
call on the execution environment. Whether the program is executed locally or on a cluster depends on the type of execution environment
The lazy evaluation lets you construct sophisticated programs that Flink executes as one holistically planned unit.
这段话描述的内容类似Spark的DAG图,先构建执行计划,当调用execute()时才执行应用程序。
Some transformations (join, coGroup, groupBy) require that a key be defined on a collection of elements. Other transformations (Reduce, GroupReduce, Aggregate) allow data being grouped on a key before they are applied.
File-based:
主要用户开发
readTextFile(path)
/ TextInputFormat
val filePath = "" env.readTextFile(filePath).print() // Recursive Traversal val parameters = new Configuration parameters.setBoolean("recursive.file.enumeration", true) env.readTextFile("file:///path/with.nested/files").withParameters(parameters)可以直接读取以下后缀的压缩文件
Compression method File extensions Parallelizable DEFLATE .deflate
no GZip .gz
,.gzip
no Bzip2 .bz2
no XZ .xz
no
readTextFileWithValue(path)
/ TextValueInputFormat
readCsvFile(path)
/ CsvInputFormat
readFileOfPrimitives(path, delimiter)
/ PrimitiveInputFormat
readSequenceFile(Key, Value, path)
/ SequenceFileInputFormat
-Collection-based:
主要用于测试使用,造一些测试数据。
fromCollection(Iterable)
val data = 1 to 10 env.fromCollection(data).print()
fromCollection(Iterator)
val filePath = "*.csv" env.readCsvFile[(String,Int,String)](filePath,ignoreFirstLine=true).print() env.readCsvFile[(String,Int,String)](filePath,ignoreFirstLine=true,includedFields = Array(0,1)).print()case class
case class MyCaseClass(name:String,age:Int) env.readCsvFile[MyCaseClass](filePath,ignoreFirstLine=true,includedFields = Array(0,1)).print()POJO
public class Person{ private String name; private int age; private String work; get...; set...; toString; }env.readCsvFile[Person](filePath,ignoreFirstLine=true,includedFields = Array("name","age","work")).print()
fromElements(elements: _*)
fromParallelCollection(SplittableIterator)
generateSequence(from, to)
Example
val env = ExecutionEnvironment.getExecutionEnvironment
// read text file from local files system
val localLines = env.readTextFile("file:///path/to/my/textfile")
// read text file from an HDFS running at nnHost:nnPort
val hdfsLines = env.readTextFile("hdfs://nnHost:nnPort/path/to/my/textfile")
// read a CSV file with three fields
val csvInput = env.readCsvFile[(Int, String, Double)]("hdfs:///the/CSV/file")
// read a CSV file with five fields, taking only two of them
val csvInput = env.readCsvFile[(String, Double)](
"hdfs:///the/CSV/file",
includedFields = Array(0, 3)) // take the first and the fourth field
// CSV input can also be used with Case Classes
case class MyCaseClass(str: String, dbl: Double)
val csvInput = env.readCsvFile[MyCaseClass](
"hdfs:///the/CSV/file",
includedFields = Array(0, 3)) // take the first and the fourth field
// read a CSV file with three fields into a POJO (Person) with corresponding fields
val csvInput = env.readCsvFile[Person](
"hdfs:///the/CSV/file",
pojoFields = Array("name", "age", "zipcode"))
// create a set from some given elements
val values = env.fromElements("Foo", "bar", "foobar", "fubar")
// generate a number sequence
val numbers = env.generateSequence(1, 10000000)
// read a file from the specified path of type SequenceFileInputFormat
val tuples = env.createInput(HadoopInputs.readSequenceFile(classOf[IntWritable], classOf[Text],
"hdfs://nnHost:nnPort/path/to/file"))
writeAsText()
/ TextOutputFormat
writeAsCsv(...)
/ CsvOutputFormat
print()
/ printToErr()
write()
/ FileOutputFormat
output()
/ OutputFormat
import org.apache.flink.api.common.accumulators.LongCounter
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration
import org.apache.flink.core.fs.FileSystem.WriteMode
/**
* 基于Flink编程的计数器开发三步曲
* step1:定义计数器
* step2: 注册计数器
* step3: 获取计数器
*/
object CounterApp {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
val data = env.fromElements("hadoop","spark","flink","pyspark","storm")
val info = data.map(new RichMapFunction[String,String]() {
// step1:定义计数器
val counter = new LongCounter()
override def open(parameters: Configuration): Unit = {
// step2: 注册计数器
getRuntimeContext.addAccumulator("ele-counts-scala", counter)
}
override def map(in: String): String = {
counter.add(1)
in
}
})
val filePath = "file:///sink-scala-counter-out/"
info.writeAsText(filePath, WriteMode.OVERWRITE).setParallelism(3)
val jobResult = env.execute("CounterApp")
// step3: 获取计数器
val num = jobResult.getAccumulatorResult[Long]("ele-counts-scala")
println("num: " + num)
}
}
import org.apache.commons.io.FileUtils
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration
/**
* step1: 注册一个本地/HDFS文件
* step2:在open方法中获取到分布式缓存的内容即可
*/
object DistributedCacheApp {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val filePath = "file:///hello.txt"
// step1: 注册一个本地/HDFS文件
env.registerCachedFile(filePath, "pk-scala-dc")
import org.apache.flink.api.scala._
val data = env.fromElements("hadoop","spark","flink","pyspark","storm")
data.map(new RichMapFunction[String,String] {
// step2:在open方法中获取到分布式缓存的内容即可
override def open(parameters: Configuration): Unit = {
val dcFile = getRuntimeContext.getDistributedCache().getFile("pk-scala-dc")
val lines = FileUtils.readLines(dcFile) // java
/**
* 此时会出现一个异常:java集合和scala集合不兼容的问题
*/
import scala.collection.JavaConverters._
for(ele <- lines.asScala){ // scala
println(ele)
}
}
override def map(value: String): String = {
value
}
}).print()
}
}
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration
import scala.collection.mutable.ArrayBuffer
/**
* Flink广播变量
* step1: 创建广播变量集合
* step2: 使用广播变量
* step3: 注册广播变量
*/
object BroadCast {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
// 1. The DataSet to be broadcast
val toBroadCast = env.fromElements(1, 2, 3)
val data = env.fromElements("a")
data.map(new RichMapFunction[String, ArrayBuffer[Int]]() {
var broadCastSet: Traversable[Int] = null
override def open(parameters: Configuration): Unit = {
import scala.collection.JavaConverters._
// 3. Access the broadcast DataSet as a Collection
broadCastSet = getRuntimeContext.getBroadcastVariable[Int]("broadCastSetName").asScala
}
override def map(in: String): ArrayBuffer[Int] = {
var res = ArrayBuffer[Int]()
for (broad <- broadCastSet) {
res += broad
}
res
}
}).withBroadcastSet(toBroadCast, "broadCastSetName") // 2. Broadcast the DataSet
.print()
}
}
File-based:
和DataSet一样
Socket-based:
env.socketTextStream("localhost",9999)
Collection-based:
和DataSet一样
Custom:
StreamExecutionEnvironment.addSource(sourceFunction)
implementing the
SourceFunction
for non-parallel sourcesimplementing the
ParallelSourceFunction
interfaceextending the
RichParallelSourceFunction
for parallel sources.
基本与DataSet的Sink一样,也可以继承RickSinkFunction
实现自定以sink。
Table & SQL API实现了Flink的关系型API
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.types.Row
object TableSQLAPI {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
val filePath = "file:///Users/rocky/IdeaProjects/imooc-workspace/data/06/sales.csv"
import org.apache.flink.api.scala._
// 已经拿到DataSet
val csv = env.readCsvFile[SalesLog](filePath,ignoreFirstLine=true)
//csv.print()
// DataSet ==> Table
val salesTable = tableEnv.fromDataSet(csv)
// Table ==> table
tableEnv.registerTable("sales", salesTable)
// sql
val resultTable = tableEnv.sqlQuery("select customerId, sum(amountPaid) money from sales group by customerId")
tableEnv.toDataSet[Row](resultTable).print()
}
case class SalesLog(transactionId:String,
customerId:String,
itemId:String,
amountPaid:Double)
}
tumbling windows 滚动窗口
sliding windows 滑动窗口
session windows 会话窗口
global windows 全局窗口
https://blog.csdn.net/lmalds/article/details/52704170
Bundled Connectors
Connectors provide code for interfacing with various third-party systems. Currently these systems are supported:
以上三个是比较常用的
下载安装包
Flink可以下载二进制包,直接安装。
也可以下载源码,与hadoop进行编译安装。eg:
mvn clean install -DskipTests -Dfast -Dhadoop.version=2.6.5-custom
单机模式
安装完成后就是单机模式
集群模式
分布式搭建后就是集群模式
hadoop mapreduce
spark
flink
四种方法优化flink
https://dzone.com/articles/four-ways-to-optimize-your-flink-applications
统计一分钟内每个域名访问产生的流量
Flink接收Kafka的数据进行处理
统计一分钟内每个用户产生的流量
域名和用户是有对应关系的
Flink接收Kafka的+Flink读取域名和用户配置的数据进行处理
数据:Mock
Mock数据:务必要掌握的
Flink进行数据的清洗
业务逻辑的处理分析:水印 WindowFunction
==> ES 注意数据类型 <= Kibana 图像化的统计结果展示
Kibana:各个环节的监控 监控图形化
已经实现的 + 阿里的CDN业务文档的描述 ==> 扩展
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。