11 Star 53 Fork 16

Sisyphus / flink-batch-stream

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
Flink入门.md 23.86 KB
一键复制 编辑 原始数据 按行查看 历史
Sisyphus 提交于 2020-07-23 15:27 . flink

Flink入门

Flink入门主要从以下几个方面开始,逐层扩展

  1. What is Apache Flink?
  2. Quick Start
  3. Anatomy of a Flink Program
  4. DataSet API
  5. DataStream API
  6. Table & SQL
  7. Time & Window
  8. Connectors
  9. Deployment
  10. Debugging & Monitoring

What is Apache Flink?

Architecture

Apache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams. Flink has been designed to run in all common cluster environments, perform computations at in-memory speed and at any scale.

Process Unbounded and Bounded Data

type explain
Unbounded streams have a start but no defined end
Bounded streams have a defined start and end

Processing of bounded streams is also known as batch processing.

Deploy Applications Anywhere

Flink integrates with all common cluster resource managers such as Hadoop YARN, Apache Mesos, and Kubernetes but can also be setup to run as a stand-alone cluster.

Applications

Flink Layer

Flink provides three layered APIs. Each API offers a different trade-off between conciseness and expressiveness and targets different use cases.

flink-layer-api

Flink vs Storm vs Spark Streaming

框架 区别
Flink 流式处理为主,批处理是流处理的一个特例
Storm 流式,Tuple
Spark Streaming 结构化流,批处理为主,流式处理是批处理的一个特例(mini batch)

Use Cases(待完善)

Below, we explore the most common types of applications that are powered by Flink and give pointers to real-world examples.

What are typical event-driven applications?

What are typical data analytics applications?

What are typical data pipeline applications?

Flink发展趋势

阿里收购了Flink的母公司,自己在做Blink

如何学习Flink

如何高效的学习Flink(一个新的框架)?

  1. 官网

  2. 源码

    mavne把源码关联上

    github example

Quick Start(快速上手开发第一个Flink应用程序)

此部分代码可以运行code文件夹下的flink-quickstart

开发环境准备

  • JDK
  • Maven
  • IDE

使用Flink开发一个批处理应用程序

pom

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.sisyphus</groupId>
  <artifactId>flink</artifactId>
  <version>1.0-SNAPSHOT</version>

  <name>${project.artifactId}</name>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <flink.version>1.10.1</flink.version>
    <scala.binary.version>2.11</scala.binary.version>
    <scala.version>2.11.8</scala.version>
  </properties>

  <dependencies>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-scala_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
    </dependency>

    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>${scala.version}</version>
    </dependency>

    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-log4j12</artifactId>
      <version>1.7.7</version>
    </dependency>

    <dependency>
      <groupId>log4j</groupId>
      <artifactId>log4j</artifactId>
      <version>1.2.17</version>
    </dependency>

    <dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
      <version>5.1.47</version>
    </dependency>

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-filesystem_2.11</artifactId>
      <version>${flink.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-kafka_2.11</artifactId>
      <version>${flink.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>2.4.1</version>
    </dependency>

  </dependencies>

  <build>
    <sourceDirectory>src/main/scala</sourceDirectory>
    <testSourceDirectory>src/test/scala</testSourceDirectory>
    <plugins>
      <plugin>
        <groupId>org.scala-tools</groupId>
        <artifactId>maven-scala-plugin</artifactId>
        <version>2.15.0</version>
        <executions>
          <execution>
            <goals>
              <goal>compile</goal>
              <goal>testCompile</goal>
            </goals>
            <configuration>
              <args>
                <arg>-dependencyfile</arg>
                <arg>${project.build.directory}/.scala_dependencies</arg>
              </args>
            </configuration>
          </execution>
        </executions>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-surefire-plugin</artifactId>
        <version>2.6</version>
        <configuration>
          <useFile>false</useFile>
          <disableXmlReport>true</disableXmlReport>
          <!-- If you have classpath issue like NoDefClassError,... -->
          <!-- useManifestOnlyJar>false</useManifestOnlyJar -->
          <includes>
            <include>**/*Test.*</include>
            <include>**/*Suite.*</include>
          </includes>
        </configuration>
      </plugin>
    </plugins>
  </build>
</project>

code

import org.apache.flink.api.scala.ExecutionEnvironment

/**
 * 使用Scala开发Flink的批处理应用程序
 * 文件内容:
 * Good Good Study
 * Day Day Up
 */
object BatchWCScalaApp {

  def main(args: Array[String]): Unit = {

    // 1. enviroment
    val env = ExecutionEnvironment.getExecutionEnvironment

    // 2. source
    val path = this.getClass.getClassLoader.getResource("wordcount").getPath
    val input = path
    val data = env.readTextFile(input)

    // 引入隐式转换
    import org.apache.flink.api.scala._

    // 3. transformation
    val res = data.flatMap(_.toLowerCase.split(" "))
      .filter(_.nonEmpty)
      .map((_, 1))
      .groupBy(0)
      .sum(1)
    
    // 4. sink
    res.print()
  }
}

Note:隐式类型转换是什么?

使用Flink开发一个实时处理应用程序

code

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time

/**
 * 使用Scala开发Flink的实时处理应用程序
 *
 * 启动netcat作为输入流: nc -lk 9999
 */
object StreamingWCScalaApp {

  def main(args: Array[String]): Unit = {

    // 1. enviroment
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //    StreamExecutionEnvironment.createLocalEnvironment()

    // 引入隐式转换
    import org.apache.flink.api.scala._

    // 2. source
    val data = env.socketTextStream("localhost", 9999)

    // 3. transformation
    val res = data.flatMap(_.split(","))
      .map((_, 1))
      .keyBy(0)
      .timeWindow(Time.seconds(5))
      .sum(1)

    // 4. sink
    res.print()
      .setParallelism(1)

    // 5. execute
    env.execute("StreamingWCScalaApp")
  }
}

Anatomy of a Flink Program(编程模型及核心概念)

核心概念概述

大数据处理的流程:

框架 流程
MapReduce input —> map(reduce) —> output
Storm input —> Spout/Bolt —> output
Spark input —> transformation/action —> output
Flink input —> transformation/sink —> output

DataSet & DataStream

  1. immutable
  2. 批处理:DataSet
  3. 流处理:DataStream

Flink编程模型

Flink programs look like regular programs that transform DataStreams. Each program consists of the same basic parts:

  1. Obtain an execution environment,
  2. Load/create the initial data,
  3. Specify transformations on this data,
  4. Specify where to put the results of your computations,
  5. Trigger the program execution

延迟执行

All Flink programs are executed lazily: When the program’s main method is executed, the data loading and transformations do not happen directly. Rather, each operation is created and added to a dataflow graph. The operations are actually executed when the execution is explicitly triggered by an execute() call on the execution environment. Whether the program is executed locally or on a cluster depends on the type of execution environment

The lazy evaluation lets you construct sophisticated programs that Flink executes as one holistically planned unit.

这段话描述的内容类似Spark的DAG图,先构建执行计划,当调用execute()时才执行应用程序。

编程如何指定key(Specifying Keys)

Some transformations (join, coGroup, groupBy) require that a key be defined on a collection of elements. Other transformations (Reduce, GroupReduce, Aggregate) allow data being grouped on a key before they are applied.

  1. Define keys for Tuples
  2. Define keys using Field Expressions
  3. Define keys using Key Selector Functions

编程如何指定转换函数(User-Defined Functions)

  1. Implementing an interface
  2. Anonymous classes
  3. Java 8 Lambdas
  4. Rich functions

支持的数据类型

  1. Java Tuples and Scala Case Classes(*)
  2. Java POJOS(*)
  3. Primitive Types(*)
  4. Regular Classes
  5. Values
  6. Hadoop Writables(*)
  7. Special Types

DataSet API

DataSource

File-based:

主要用户开发

  • readTextFile(path) / TextInputFormat
val filePath = ""
env.readTextFile(filePath).print()

// Recursive Traversal
val parameters = new Configuration
parameters.setBoolean("recursive.file.enumeration", true)
env.readTextFile("file:///path/with.nested/files").withParameters(parameters)

可以直接读取以下后缀的压缩文件

Compression method File extensions Parallelizable
DEFLATE .deflate no
GZip .gz, .gzip no
Bzip2 .bz2 no
XZ .xz no
  • readTextFileWithValue(path) / TextValueInputFormat
  • readCsvFile(path) / CsvInputFormat
  • readFileOfPrimitives(path, delimiter) / PrimitiveInputFormat
  • readSequenceFile(Key, Value, path) / SequenceFileInputFormat -

Collection-based:

主要用于测试使用,造一些测试数据。

  • fromCollection(Iterable)
val data = 1 to 10
env.fromCollection(data).print()
  • fromCollection(Iterator)
val filePath = "*.csv"
env.readCsvFile[(String,Int,String)](filePath,ignoreFirstLine=true).print()
env.readCsvFile[(String,Int,String)](filePath,ignoreFirstLine=true,includedFields = Array(0,1)).print()

case class

case class MyCaseClass(name:String,age:Int)
env.readCsvFile[MyCaseClass](filePath,ignoreFirstLine=true,includedFields = Array(0,1)).print()

POJO

public class Person{
	private String name;
    private int age;
    private String work;
    
    get...;
   	set...;
    toString;
}
env.readCsvFile[Person](filePath,ignoreFirstLine=true,includedFields = Array("name","age","work")).print()
  • fromElements(elements: _*)
  • fromParallelCollection(SplittableIterator)
  • generateSequence(from, to)

Example

val env  = ExecutionEnvironment.getExecutionEnvironment

// read text file from local files system
val localLines = env.readTextFile("file:///path/to/my/textfile")

// read text file from an HDFS running at nnHost:nnPort
val hdfsLines = env.readTextFile("hdfs://nnHost:nnPort/path/to/my/textfile")

// read a CSV file with three fields
val csvInput = env.readCsvFile[(Int, String, Double)]("hdfs:///the/CSV/file")

// read a CSV file with five fields, taking only two of them
val csvInput = env.readCsvFile[(String, Double)](
  "hdfs:///the/CSV/file",
  includedFields = Array(0, 3)) // take the first and the fourth field

// CSV input can also be used with Case Classes
case class MyCaseClass(str: String, dbl: Double)
val csvInput = env.readCsvFile[MyCaseClass](
  "hdfs:///the/CSV/file",
  includedFields = Array(0, 3)) // take the first and the fourth field

// read a CSV file with three fields into a POJO (Person) with corresponding fields
val csvInput = env.readCsvFile[Person](
  "hdfs:///the/CSV/file",
  pojoFields = Array("name", "age", "zipcode"))

// create a set from some given elements
val values = env.fromElements("Foo", "bar", "foobar", "fubar")

// generate a number sequence
val numbers = env.generateSequence(1, 10000000)

// read a file from the specified path of type SequenceFileInputFormat
val tuples = env.createInput(HadoopInputs.readSequenceFile(classOf[IntWritable], classOf[Text],
 "hdfs://nnHost:nnPort/path/to/file"))

Transformation

  • map
  • flatMap
  • mapPartition
  • first(n)
  • filter
  • distinct
  • join
  • outerJoin
  • cross

Sink

  • writeAsText() / TextOutputFormat
  • writeAsCsv(...) / CsvOutputFormat
  • print() / printToErr()
  • write() / FileOutputFormat
  • output()/ OutputFormat

计数器

import org.apache.flink.api.common.accumulators.LongCounter
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration
import org.apache.flink.core.fs.FileSystem.WriteMode

/**
  * 基于Flink编程的计数器开发三步曲
  * step1:定义计数器
  * step2: 注册计数器
  * step3: 获取计数器
  */
object CounterApp {
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment

    import org.apache.flink.api.scala._
    val data = env.fromElements("hadoop","spark","flink","pyspark","storm")

    val info = data.map(new RichMapFunction[String,String]() {

      // step1:定义计数器
      val counter = new LongCounter()

      override def open(parameters: Configuration): Unit = {
        // step2: 注册计数器
        getRuntimeContext.addAccumulator("ele-counts-scala", counter)
      }

      override def map(in: String): String = {
        counter.add(1)
        in
      }
    })

    val filePath = "file:///sink-scala-counter-out/"
    info.writeAsText(filePath, WriteMode.OVERWRITE).setParallelism(3)
    val jobResult = env.execute("CounterApp")

    // step3: 获取计数器
    val num = jobResult.getAccumulatorResult[Long]("ele-counts-scala")

    println("num: " + num)
  }
}

分布式缓存

import org.apache.commons.io.FileUtils
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration

/**
  * step1: 注册一个本地/HDFS文件
  * step2:在open方法中获取到分布式缓存的内容即可
  */
object DistributedCacheApp {

  def main(args: Array[String]): Unit = {

    val env = ExecutionEnvironment.getExecutionEnvironment

    val filePath = "file:///hello.txt"

    // step1: 注册一个本地/HDFS文件
    env.registerCachedFile(filePath, "pk-scala-dc")

    import org.apache.flink.api.scala._
    val data = env.fromElements("hadoop","spark","flink","pyspark","storm")


    data.map(new RichMapFunction[String,String] {

      // step2:在open方法中获取到分布式缓存的内容即可
      override def open(parameters: Configuration): Unit = {
        val dcFile = getRuntimeContext.getDistributedCache().getFile("pk-scala-dc")

        val lines = FileUtils.readLines(dcFile)  // java


        /**
          * 此时会出现一个异常:java集合和scala集合不兼容的问题
          */
        import scala.collection.JavaConverters._
        for(ele <- lines.asScala){ // scala
          println(ele)
        }
      }

      override def map(value: String): String = {
        value
      }
    }).print()
  }
}

广播变量(Broadcast Variables)

import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration

import scala.collection.mutable.ArrayBuffer

/**
 * Flink广播变量
 * step1: 创建广播变量集合
 * step2: 使用广播变量
 * step3: 注册广播变量
 */
object BroadCast {

  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment

    import org.apache.flink.api.scala._
    // 1. The DataSet to be broadcast
    val toBroadCast = env.fromElements(1, 2, 3)

    val data = env.fromElements("a")

    data.map(new RichMapFunction[String, ArrayBuffer[Int]]() {
      var broadCastSet: Traversable[Int] = null

      override def open(parameters: Configuration): Unit = {
        import scala.collection.JavaConverters._
        // 3. Access the broadcast DataSet as a Collection
        broadCastSet = getRuntimeContext.getBroadcastVariable[Int]("broadCastSetName").asScala
      }

      override def map(in: String): ArrayBuffer[Int] = {
        var res = ArrayBuffer[Int]()
        for (broad <- broadCastSet) {
          res += broad
        }
        res
      }
    }).withBroadcastSet(toBroadCast, "broadCastSetName") // 2. Broadcast the DataSet
      .print()
  }
}

DataStream API

DataSource

File-based:

和DataSet一样

Socket-based:

  • socketTextStream
env.socketTextStream("localhost",9999)

Collection-based:

和DataSet一样

Custom:

  • addSource

StreamExecutionEnvironment.addSource(sourceFunction)

implementing the SourceFunction for non-parallel sources

implementing the ParallelSourceFunction interface

extending the RichParallelSourceFunction for parallel sources.

Transformation

  • union
  • split
  • select

Sink

基本与DataSet的Sink一样,也可以继承RickSinkFunction实现自定以sink。

Table API & SQL编程(待完善)

flink关系型API

Table & SQL API实现了Flink的关系型API

import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.types.Row

object TableSQLAPI {

  def main(args: Array[String]): Unit = {

    val env = ExecutionEnvironment.getExecutionEnvironment
    val tableEnv = TableEnvironment.getTableEnvironment(env)

    val filePath = "file:///Users/rocky/IdeaProjects/imooc-workspace/data/06/sales.csv"

    import org.apache.flink.api.scala._

    // 已经拿到DataSet
    val csv = env.readCsvFile[SalesLog](filePath,ignoreFirstLine=true)
    //csv.print()

    // DataSet ==> Table
    val salesTable = tableEnv.fromDataSet(csv)

    // Table ==> table
    tableEnv.registerTable("sales", salesTable)

    // sql
    val resultTable = tableEnv.sqlQuery("select customerId, sum(amountPaid) money from sales group by customerId")

    tableEnv.toDataSet[Row](resultTable).print()

  }

  case class SalesLog(transactionId:String,
                      customerId:String,
                      itemId:String,
                      amountPaid:Double)
}

Flink中的Time及Windows操作

Flink中的Time类型

flink-time

  • Event Time
  • Processing Time

Flink中的Windows类型

tumbling windows 滚动窗口

sliding windows 滑动窗口

session windows 会话窗口

global windows 全局窗口

Flink中的watermarks

https://blog.csdn.net/lmalds/article/details/52704170

Connectors

Bundled Connectors

Connectors provide code for interfacing with various third-party systems. Currently these systems are supported:

以上三个是比较常用的

Flink部署及作业提交

下载安装包

Flink可以下载二进制包,直接安装。

也可以下载源码,与hadoop进行编译安装。eg:

mvn clean install -DskipTests -Dfast -Dhadoop.version=2.6.5-custom

Standalone

单机模式

安装完成后就是单机模式

集群模式

分布式搭建后就是集群模式

YARN模式

常用属性

Flink scala shell

Flink监控及调优

HistoryServer

hadoop mapreduce

spark

flink

RSET API

Metrics

常用优化

  1. 资源
  2. 并行度
    1. 默认是1 适当的调整:好几种
  3. 数据倾斜

四种方法优化flink

https://dzone.com/articles/four-ways-to-optimize-your-flink-applications

Flink项目实战

项目背景

功能描述

  1. 统计一分钟内每个域名访问产生的流量

    Flink接收Kafka的数据进行处理

  2. 统计一分钟内每个用户产生的流量

    域名和用户是有对应关系的

    Flink接收Kafka的+Flink读取域名和用户配置的数据进行处理

数据:Mock

项目架构

Flink项目实战架构

功能实现

Mock数据:务必要掌握的

  1. 数据敏感
  2. 多团队协作,依赖了其他团队提供的服务或接口
  3. 通过Mock的方式往Kafka的broker里面发送数据

Flink进行数据的清洗

  1. 读取kafka的数据
  2. 读取mysql的数据
  3. connect

业务逻辑的处理分析:水印 WindowFunction

==> ES 注意数据类型 <= Kibana 图像化的统计结果展示

Kibana:各个环节的监控 监控图形化

已经实现的 + 阿里的CDN业务文档的描述 ==> 扩展

Scala
1
https://gitee.com/sweetdream/flink-batch-stream.git
git@gitee.com:sweetdream/flink-batch-stream.git
sweetdream
flink-batch-stream
flink-batch-stream
master

搜索帮助