Flink

参考

Alibaba Blink

Flink 作为流计算引擎的优点

  • Low Latency
  • Exactly Once
  • 流批统一
  • 可以支持足够大数据量和复杂计算

不同于 Spark,Flink 是一个真正意义上的流计算引擎,和 Storm 类似,Flink 是通过流水线数据传输实现低延迟的流处理;

Flink 使用了经典的 Chandy-Lamport 算法,能够在满足低延迟和低 failover 开销的基础之上,完美地解决 exactly once 的目标;

如果要用一套引擎来统一流处理和批处理,那就必须以流处理引擎为基础。Flink 还提供了 SQL/TableAPI 两个 API,为批和流在 Query 层的统一铺平了道路。因此 Flink 是最合适的批和流统一的引擎;

最后,Flink 在设计之初就非常在意性能相关的任务状态 state 和流控等关键技术的设计,这些都使得用 Flink 执行复杂的大规模任务时性能更胜一筹。

Dataflow Programming Model

API 抽象等级

Flink APIs 由低到高共抽象为四层

Low-Level building block - Stateful Stream Processing

最底层的抽象只提供有状态的流(stateful streaming)。通过把 process function 嵌入到 DataStream API 中。他允许用户处理来自一个或多个流的event,并使用一致的容错状态(consistent fault tolerant state)。除此之外,用户还可以注册事件时间(register event time)和处理时间回调(process time callbacks),来实现复杂的计算。

Core APIs - DataStream/DataSet API

实际上大多数情况下不需要低级抽象,而是使用 Core APIs,如 DataStream API(有界/无界流)和 DataSet API(有界数据集)。这些 API 提供了用于数据处理的通用构建块,如各种形式的转换(transform)、连接(join)、聚合(aggregation)、窗口(window)、状态(state)等。

因为是低级 Process Functions 和 DataStream API 集成,因此只能对某些操作进行低级抽象。DataSet API 在有界数据集上提供了额外的 primitives,如循环/迭代。

Declarative DSL - Table API

Table API 是以表为中心的声明性 DSL,可以动态更改表(表示流时)。 Table API 遵循(扩展)关系模型:表附加了一个模式(类似于关系数据库中的表),API 提供了类似的操作,例如 select,project,join,group-by,aggregate 等。Table API 程序以声明方式定义应该执行的逻辑操作,而不是用特定的代码实现具体的操作。尽管 Table API 可以通过各种类型的用户定义函数进行扩展,但它的表现力不如 Core API,但使用起来更简洁(编写的代码更少)。此外,Table API 在执行之前会先经过优化器,根据优化规则进行优化。

可以在表和 DataStream/DataSet 之间无缝转换,允许程序混合 Table API 以及 DataStream 和 DataSet API。

High-level Language - SQL

Flink 提供的最高级抽象是 SQL。这种抽象在语义和表达方面类似于 Table API,但是用 SQL 语句代表程序。 SQL 抽象与 Table API 紧密交互,SQL 查询可以在 Table API 中定义的表上执行。

Distributed Runtime

Distributed Runtime

Basic API Concepts

Basic API Concepts

Flink Watermark

Flink Programs 是实现了 distributed collections,并对其进行 transformations(e.g., filtering, mapping, updating state, joining, grouping, defining windows, aggregating) 的通用程序。Flink Programs 可以运行在各种 contexts,standalone,embedded in other programs(e.g., YARN),Execution 既可以在本地 JVM 执行,也可以在有多个机器构成的集群上运行。

编写一个 Batch Program 还是一个 Streaming Program,取决于要处理的 DataSources 是有界(bounded)的还是无界(unbounded)的,DataSet API 用于编写 Batch Programs,而 DataStream API 用于编写 Streaming Programs。

DataSet & DataStream

Deployment & Clusters

Step1: 配置环境变量

Flink On YARN 需要YARN_CONF_DIR或者HADOOP_CONF_DIR环境变量去获取 YARN 和 HDFS 配置

$ vim ~/.bash_profile
# 如果不配置,可能会报各种 NoClassDefFoundError
export HAOODP_CLASSPATH=`hadoop classpath`
# 如果不配置,会有如下问题
# Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR was set.
# Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set. The Flink YARN Client needs one of these to be set to properly load the Hadoop configuration for accessing YARN.
export HADOOP_CONF_DIR="/etc/hadoop/conf"
export FLINK_HOME="/home/hdfs/flink-1.7.1"
export PATH=$PATH:$FLINK_HOME/bin
$ source ~/.bash_profile

# 修改 flink-conf.yaml
$ vim $FLINK_HOME/flink-conf.yaml
# 默认是内存存储,很容易 OOM
state.backend: filesystem
# 注意给当前文件夹足够的权限,比如 777
state.checkpoints.dir: hdfs:///flink-checkpoints
state.savepoints.dir: hdfs://flink-checkpoints

# Flink 启动 YARN-Session 会话的时候会把所有相关的组件(如依赖包、配置文件等等)上传到该目录
# 保证 Flink 对该目录有足够的访问权限
$ hadoop fs -mkdir -p /user/${current_user:root}/.flink
$ hadoop fs -chmod -R 777 /user/${current_user:root}
# 成功启动一个YARN-Session后,实际生成目录结构如下,可以看到该YARN-Session指定了3个taskManager
-rw-r--r--   3 root hadoop        384 2019-02-02 16:32 /user/root/.flink/application_1548409829387_0057/f11d0b7c-0ea3-4cec-bab8-cb850a27a82e-taskmanager-conf.yaml
-rw-r--r--   3 root hadoop        384 2019-01-29 16:26 /user/root/.flink/application_1548409829387_0057/f834efc9-4c44-4fe8-a6c2-7a59b21e835d-taskmanager-conf.yaml
-rw-r--r--   3 root hadoop        384 2019-01-29 17:50 /user/root/.flink/application_1548409829387_0057/fccff855-216c-4cf0-bc54-f7d1136ba72d-taskmanager-conf.yaml
-rw-r--r--   3 root hadoop     88.9 M 2018-12-15 12:06 /user/root/.flink/application_1548409829387_0057/flink-dist_2.11-1.7.1.jar
drwxr-xr-x   - root hadoop          0 2019-01-29 13:51 /user/root/.flink/application_1548409829387_0057/lib
-rw-r--r--   3 root hadoop      1.9 K 2018-12-11 20:39 /user/root/.flink/application_1548409829387_0057/log4j.properties
-rw-r--r--   3 root hadoop      2.3 K 2018-12-11 20:39 /user/root/.flink/application_1548409829387_0057/logback.xml

Flink Command-Line Interface

Submit job to Flink

一个 YARN-Session 会启动所有必要的 Flink Services(JobManager and TaskManager),这样就可以提交程序到 cluster,每个 session 可以同时运行多个程序。

  • Launch YARN Session: 在 YARN 上启动 Flink 集群
# 查看 yarn-session.sh 使用帮助
$ yarn-session.sh -h
Usage:
   Required
     -n,--container <arg>   Number of YARN container to allocate (=Number of Task Managers)
   Optional
     -D <property=value>             use value for given property

# 启动一个 yarn session
# 分配注意不要超过 yarn 的实际资源,否则会报错!
# 10 TaskManagers, 8192MB memory per TaskManager, 32 processing slots(vcores) per TaskManager
$ ./bin/yarn-session.sh \
--name "Flink YARN Session" \
--container 10 \
--taskManagerMemory 8192 \
--slots 32 \
--detached

# attach to an existing YARN session  
# 连接到 application_1547006608740_4071 
$ ./bin/yarn-session.sh -id application_1548409829387_0024

# Stop the YARN session by stopping the unix process (using CTRL+C) or by entering ‘stop’ into the client.

# 如果不想让 Flink YARN client 一直运行,也可以启动分离(detached)的 YARN-Session。 
# 这种情况下,Flink YARN client 将仅向群集提交 Flink,然后自行关闭,类似 Spark On YARN 的 cluster 模式。
$ ./bin/yarn-session.sh -d
# 但是这样就无法使用 Flink 停止 YARN-Session 了,而是通过下面的命令停止
$ yarn application -kill application_1548409829387_0024

# 查看 YARN-Session 日志,访问 YARN 集群管理界面,查看 application_1548409829387_0024 的启动日志
# Note:另外,提交到该 Session 的 Flink Jobs 的日志也会输出到这里,这里为了方便查看,忽略了部分时间和类信息。
 --------------------------------------------------------------------------------
  Starting YarnSessionClusterEntrypoint (Version: 1.7.1, Rev:89eafb4, Date:14.12.2018 @ 15:48:34 GMT)
  OS current user: yarn
 Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
  Current Hadoop/Kerberos user: root
  JVM: Java HotSpot(TM) 64-Bit Server VM - Oracle Corporation - 1.8/25.181-b13
  Maximum heap size: 406 MiBytes
  JAVA_HOME: /usr/java/jdk1.8.0_181-amd64
  Hadoop version: 2.7.5
  JVM Options:
     -Xmx424m
     -Dlog.file=/hadoop/yarn/log/application_1548409829387_0024/container_e07_1548409829387_0024_01_000001/jobmanager.log
     -Dlogback.configurationFile=file:logback.xml
     -Dlog4j.configuration=file:log4j.properties
  Program Arguments: (none)
  Classpath: lib/flink-python_2.11-1.7.1.jar:lib/flink-shaded-hadoop2-uber-1.7.1.jar:lib/log4j-1.2.17.jar:lib/slf4j-log4j12-1.7.15.jar:log4j.properties:logback.xml:flink.jar:flink-conf.yaml::/usr/hdp/2.6.4.0-91/hadoop/conf:/usr/hdp/2.6.4.0-91/hadoop/azure-data-lake-store-sdk-2.1.4.jar:/usr/hdp/2.6.4.0-91/hadoop/hadoop-annotations-2.7.3.2.6.4.0-91.jar:/usr/hdp/2.6.4.0-91/hadoop/hadoop-annotations.jar:/usr/hdp/2.6.4.0-91/hadoop/hadoop-auth-2.7.3.2.6.4.0-91.jar:/usr/hdp/2.6.4.0-91/hadoop/hadoop-auth.jar:/usr/hdp/2.6.4.0-91/hadoop/hadoop-aws-2.7.3.2.6.4.0-91.jar:/usr/hdp/2.6.4.0-91/hadoop/hadoop-aws.jar:/usr/hdp/2.6.4.0-91/hadoop/hadoop-azure-2.7.3.2.6.4.0-91.jar:/usr/hdp/2.6.4.0-91/hadoop/hadoop-azure-datalake-2.7.3.2.6.4.0-91.jar:/usr/hdp/2.6.4.0-91/hadoop/hadoop-azure-datalake.jar:/usr/hdp/2.6.4.0-91/hadoop/hadoop-azure.jar:/usr/hdp/2.6.4.0-91/hadoop/hadoop-common-2.7.3.2.6.4.0-91-tests.jar:/usr/hdp/2.6.4.0-91/hadoop/hadoop-common-2.7.3.2.6.4.0-91.jar:/usr/hdp/2.6.4.0-91/hadoop/hadoop-common-tests.jar:/usr/hdp/2.6.4.0-91/hadoop/hadoop-common.jar:/usr/hdp/2.6.4.0-91/hadoop/hadoop-nfs-2.7.3.2.6.4.0-91.jar:/usr/hdp/2.6.4.0-91/hadoop/hadoop-nfs.jar:/usr/hdp/2.6.4.0-91/hadoop/lib/ojdbc6.jar:/usr/hdp/2.6.4.0-91/hadoop/lib/jackson-annotations-2.2.3.jar:/usr/hdp/2.6.4.0-91/hadoop/lib/ranger-hdfs-plugin-shim-0.7.0.2.6.4.0-91.jar:/usr/hdp/2.6.4.0-91/hadoop/lib/jackson-core-2.2.3.jar:/usr/hdp/2.6.4.0-91/hadoop/lib/ranger-plugin-classloader-0.7.0.2.6.4.0-91.jar:/usr/hdp/2.6.4.0-91/hadoop/lib/jcip-annotations-1.0.jar:/usr/hdp/2.6.4.0-91/hadoop/lib/ranger-yarn-plugin-shim-0.7.0.2.6.4.0-91.jar:/usr/hdp/2.6.4.0-91/hadoop/lib/xmlenc-0.52.jar:/usr/hdp/2.6.4.0-91/hadoop/lib/activation-1.1.jar:/usr/hdp/2.6.4.0-91/hadoop/lib/jettison-1.1.jar:/usr/hdp/2.6.4.0-91/hadoop/lib/apacheds-i18n-2.0.0-M15.jar:/usr/hdp/2.6.4.0-91/hadoop/lib/jackson-databind-2.2.3.jar:/usr/hdp/2.6.4.0-91/hadoop/lib/apacheds-kerberos-codec-2.0.0-M15.jar:/usr/hdp/2.6.4.0-91/hadoop/lib/jetty-6.1.26.hwx.jar:/usr/hdp/2.6.4.0-91/hadoop/lib/api-asn1-api-1.0.0-M20.jar:/usr/hdp/2.6.4.0-91/hadoop/lib/jetty-sslengine-6.1.26.hwx.jar:/usr/hdp/2.6.4.0-91/hadoop/lib/api-util-1.0.0-M20.jar:/usr/hdp/2.6.4.0-91/hadoop/lib/asm-3.2.jar:/usr/hdp/2.6.4.0-91/hadoop/lib/xz-1.0.jar:/usr/hdp/2.6.4.0-91/hadoop/lib/avro-1.7.4.jar:/usr/hdp/2.6.4.0-91/hadoop/lib/jackson-jaxrs-1.9.13.jar:/usr/hdp/2.6.4.0-91/hadoop/lib/aws-java-sdk-core-1.10.6.jar:/usr/hdp/2.6.4.0-91/hadoop/lib/jetty-util-6.1.26.hwx.jar:/usr/hdp/2.6.4.0-91/hadoop/lib/aws-java-sdk-kms-1.10.6.jar:/usr/hdp/2.6.4.0-91/hadoop/lib/joda-time-2.9.4.jar:/usr/hdp/2.6.4.0-91/hadoop/lib/aws-java-sdk-s3-1.10.6.jar:/usr/hdp/2.6.4.0-91/hadoop/lib/jackson-mapper-asl-1.9.13.jar:/usr/hdp/2.6.4.0-91/hadoop/lib/azure-keyvault-core-0.8.0.jar:/usr/hdp/2.6.4.0-91/hadoop/lib/jsch-0.1.54.jar:/usr/hdp/2.6.4.0-91/hadoop/lib/azure-storage-5.4.0.jar:/usr/hdp/2.6.4.0-91/hadoop/lib/json-smart-1.1.1.jar:/usr/hdp/2.6.4.0-91/hadoop/lib/commons-beanutils-1.7.0.jar:/usr/hdp/2.6.4.0-91/hadoop/lib/commons-beanutils-core-1.8.0.jar:/usr/hdp/2.6.4.0-91/hadoop/lib/commons-cli-1.2.jar:/usr/hdp/2.6.4.0-91/hadoop/lib/jsp-api-2.1.jar:/usr/hdp/2.6.4.0-91/hadoop/lib/commons-codec-1.4.jar:/usr/hdp/2.6.4.0-91/hadoop/lib/jersey-json-1.9.jar:/usr/hdp/2.6.4.0-91/hadoop/lib/commons-collections-3.2.2.jar:/usr/hdp/2.6.4.0-91/hadoop/lib/jsr305-3.0.0.jar:/usr/hdp/2.6.4.0-91/hadoop/lib/commons-compress-1.4.1.jar:/usr/hdp/2.6.4.0-91/hadoop/lib/jersey-server-1.9.jar:/usr/hdp/2.6.4.0-91/hadoop/lib/commons-configuration-1.6.jar:/usr/hdp/2.6.4.0-91/hadoop/lib/junit-4.11.jar:/usr/hdp/2.6.4.0-91/hadoop/lib/commons-digester-1.8.jar:/usr/hdp/2.6.4.0-91/hadoop/lib/commons-io-2.4.jar:/usr/hdp/2.6.4.0-91/hadoop/lib/log4j-1.2.17.jar:/usr/hdp/2.6.4.0-91/hadoop/lib/commons-lang-2.6.jar:/usr/hdp/2.6.4.0-91/hadoop/lib/mockito-all-1.8.5.jar:/usr/hdp/2.6.4.0-91/hadoop/lib/commons-lang3-3.4.jar:/usr/hdp/2.6.4.0-91/hadoop/lib/nimbus-jose-jwt-3.9.jar:/usr/hdp/2.6.4.0-91/hadoop/lib/commons-logging-1.1.3.jar:/usr/hdp/2.6.4.0-91/hadoop/lib/netty-3.6.2.Final.jar:/usr/hdp/2.6.4.0-91/hadoop/lib/commons-math3-3.1.1.jar:/usr/hdp/2.6.4.0-91/hadoop/lib/commons-net-3.1.jar:/usr/hdp/2.6.4.0-91/hadoop/lib/paranamer-2.3.jar:/usr/hdp/2.6.4.0-91/hadoop/lib/curator-client-2.7.1.jar:/usr/hdp/2.6.4.0-91/hadoop/lib/protobuf-java-2.5.0.jar:/usr/hdp/2.6.4.0-91/hadoop/lib/curator-framework-2.7.1.jar:/usr/hdp/2.6.4.0-91/hadoop/lib/servlet-api-2.5.jar:/usr/hdp/2.6.4.0-91/hadoop/lib/curator-recipes-2.7.1.jar:/usr/hdp/2.6.4.0-91/hadoop/lib/gson-2.2.4.jar:/usr/hdp/2.6.4.0-91/hadoop/lib/guava-11.0.2.jar:/usr/hdp/2.6.4.0-91/hadoop/lib/slf4j-api-1.7.10.jar:/usr/hdp/2.6.4.0-91/hadoop/lib/hamcrest-core-1.3.jar:/usr/hdp/2.6.4.0-91/hadoop/lib/jets3t-0.9.0.jar:/usr/hdp/2.6.4.0-91/hadoop/lib/htrace-core-3.1.0-incubating.jar:/usr/hdp/2.6.4.0-91/hadoop/lib/slf4j-log4j12-1.7.10.jar:/usr/hdp/2.6.4.0-91/hadoop/lib/httpclient-4.5.2.jar:/usr/hdp/2.6.4.0-91/hadoop/lib/httpcore-4.4.4.jar:/usr/hdp/2.6.4.0-91/hadoop/lib/jackson-core-asl-1.9.13.jar:/usr/hdp/2.6.4.0-91/hadoop/lib/jackson-xc-1.9.13.jar:/usr/hdp/2.6.4.0-91/hadoop/lib/snappy-java-1.0.4.1.jar:/usr/hdp/2.6.4.0-91/hadoop/lib/java-xmlbuilder-0.4.jar:/usr/hdp/2.6.4.0-91/hadoop/lib/jaxb-api-2.2.2.jar:/usr/hdp/2.6.4.0-91/hadoop/lib/stax-api-1.0-2.jar:/usr/hdp/2.6.4.0-91/hadoop/lib/jaxb-impl-2.2.3-1.jar:/usr/hdp/2.6.4.0-91/hadoop/lib/jersey-core-1.9.jar:/usr/hdp/2.6.4.0-91/hadoop/lib/zookeeper-3.4.6.2.6.4.0-91.jar:/usr/hdp/current/hadoop-hdfs-client/hadoop-hdfs-2.7.3.2.6.4.0-91-tests.jar:/usr/hdp/current/hadoop-hdfs-client/hadoop-hdfs-2.7.3.2.6.4.0-91.jar:/usr/hdp/current/hadoop-hdfs-client/hadoop-hdfs-nfs-2.7.3.2.6.4.0-91.jar:/usr/hdp/current/hadoop-hdfs-client/hadoop-hdfs-nfs.jar:/usr/hdp/current/hadoop-hdfs-client/hadoop-hdfs-tests.jar:/usr/hdp/current/hadoop-hdfs-client/hadoop-hdfs.jar:/usr/hdp/current/hadoop-hdfs-client/lib/asm-3.2.jar:/usr/hdp/current/hadoop-hdfs-client/lib/commons-cli-1.2.jar:/usr/hdp/current/hadoop-hdfs-client/lib/commons-codec-1.4.jar:/usr/hdp/current/hadoop-hdfs-client/lib/commons-daemon-1.0.13.jar:/usr/hdp/current/hadoop-hdfs-client/lib/commons-io-2.4.jar:/usr/hdp/current/hadoop-hdfs-client/lib/commons-lang-2.6.jar:/usr/hdp/current/hadoop-hdfs-client/lib/commons-logging-1.1.3.jar:/usr/hdp/current/hadoop-hdfs-client/lib/guava-11.0.2.jar:/usr/hdp/current/hadoop-hdfs-client/lib/htrace-core-3.1.0-incubating.jar:/usr/hdp/current/hadoop-hdfs-client/lib/jackson-annotations-2.2.3.jar:/usr/hdp/current/hadoop-hdfs-client/lib/jackson-core-2.2.3.jar:/usr/hdp/current/hadoop-hdfs-client/lib/jackson-core-asl-1.9.13.jar:/usr/hdp/current/hadoop-hdfs-client/lib/jackson-databind-2.2.3.jar:/usr/hdp/current/hadoop-hdfs-client/lib/jackson-mapper-asl-1.9.13.jar:/usr/hdp/current/hadoop-hdfs-client/lib/jersey-core-1.9.jar:/usr/hdp/current/hadoop-hdfs-client/lib/jersey-server-1.9.jar:/usr/hdp/current/hadoop-hdfs-client/lib/jetty-6.1.26.hwx.jar:/usr/hdp/current/hadoop-hdfs-client/lib/jetty-util-6.1.26.hwx.jar:/usr/hdp/current/hadoop-hdfs-client/lib/jsr305-3.0.0.jar:/usr/hdp/current/hadoop-hdfs-client/lib/leveldbjni-all-1.8.jar:/usr/hdp/current/hadoop-hdfs-client/lib/log4j-1.2.17.jar:/usr/hdp/current/hadoop-hdfs-client/lib/netty-3.6.2.Final.jar:/usr/hdp/current/hadoop-hdfs-client/lib/netty-all-4.0.52.Final.jar:/usr/hdp/current/hadoop-hdfs-client/lib/okhttp-2.4.0.jar:/usr/hdp/current/hadoop-hdfs-client/lib/okio-1.4.0.jar:/usr/hdp/current/hadoop-hdfs-client/lib/protobuf-java-2.5.0.jar:/usr/hdp/current/hadoop-hdfs-client/lib/servlet-api-2.5.jar:/usr/hdp/current/hadoop-hdfs-client/lib/xercesImpl-2.9.1.jar:/usr/hdp/current/hadoop-hdfs-client/lib/xml-apis-1.3.04.jar:/usr/hdp/current/hadoop-hdfs-client/lib/xmlenc-0.52.jar:/usr/hdp/current/hadoop-yarn-client/hadoop-yarn-api-2.7.3.2.6.4.0-91.jar:/usr/hdp/current/hadoop-yarn-client/hadoop-yarn-api.jar:/usr/hdp/current/hadoop-yarn-client/hadoop-yarn-applications-distributedshell-2.7.3.2.6.4.0-91.jar:/usr/hdp/current/hadoop-yarn-client/hadoop-yarn-applications-distributedshell.jar:/usr/hdp/current/hadoop-yarn-client/hadoop-yarn-applications-unmanaged-am-launcher-2.7.3.2.6.4.0-91.jar:/usr/hdp/current/hadoop-yarn-client/hadoop-yarn-applications-unmanaged-am-launcher.jar:/usr/hdp/current/hadoop-yarn-client/hadoop-yarn-client-2.7.3.2.6.4.0-91.jar:/usr/hdp/current/hadoop-yarn-client/hadoop-yarn-client.jar:/usr/hdp/current/hadoop-yarn-client/hadoop-yarn-common-2.7.3.2.6.4.0-91.jar:/usr/hdp/current/hadoop-yarn-client/hadoop-yarn-common.jar:/usr/hdp/current/hadoop-yarn-client/hadoop-yarn-registry-2.7.3.2.6.4.0-91.jar:/usr/hdp/current/hadoop-yarn-client/hadoop-yarn-registry.jar:/usr/hdp/current/hadoop-yarn-client/hadoop-yarn-server-applicationhistoryservice-2.7.3.2.6.4.0-91.jar:/usr/hdp/current/hadoop-yarn-client/hadoop-yarn-server-applicationhistoryservice.jar:/usr/hdp/current/hadoop-yarn-client/hadoop-yarn-server-common-2.7.3.2.6.4.0-91.jar:/usr/hdp/current/hadoop-yarn-client/hadoop-yarn-server-common.jar:/usr/hdp/current/hadoop-yarn-client/hadoop-yarn-server-nodemanager-2.7.3.2.6.4.0-91.jar:/usr/hdp/current/hadoop-yarn-client/hadoop-yarn-server-nodemanager.jar:/usr/hdp/current/hadoop-yarn-client/hadoop-yarn-server-resourcemanager-2.7.3.2.6.4.0-91.jar:/usr/hdp/current/hadoop-yarn-client/hadoop-yarn-server-resourcemanager.jar:/usr/hdp/current/hadoop-yarn-client/hadoop-yarn-server-sharedcachemanager-2.7.3.2.6.4.0-91.jar:/usr/hdp/current/hadoop-yarn-client/hadoop-yarn-server-sharedcachemanager.jar:/usr/hdp/current/hadoop-yarn-client/hadoop-yarn-server-tests-2.7.3.2.6.4.0-91.jar:/usr/hdp/current/hadoop-yarn-client/hadoop-yarn-server-tests.jar:/usr/hdp/current/hadoop-yarn-client/hadoop-yarn-server-timeline-pluginstorage-2.7.3.2.6.4.0-91.jar:/usr/hdp/current/hadoop-yarn-client/hadoop-yarn-server-timeline-pluginstorage.jar:/usr/hdp/current/hadoop-yarn-client/hadoop-yarn-server-web-proxy-2.7.3.2.6.4.0-91.jar:/usr/hdp/current/hadoop-yarn-client/hadoop-yarn-server-web-proxy.jar:/usr/hdp/current/hadoop-yarn-client/lib/activation-1.1.jar:/usr/hdp/current/hadoop-yarn-client/lib/aopalliance-1.0.jar:/usr/hdp/current/hadoop-yarn-client/lib/jsch-0.1.54.jar:/usr/hdp/current/hadoop-yarn-client/lib/apacheds-i18n-2.0.0-M15.jar:/usr/hdp/current/hadoop-yarn-client/lib/jersey-core-1.9.jar:/usr/hdp/current/hadoop-yarn-client/lib/apacheds-kerberos-codec-2.0.0-M15.jar:/usr/hdp/current/hadoop-yarn-client/lib/jetty-6.1.26.hwx.jar:/usr/hdp/current/hadoop-yarn-client/lib/api-asn1-api-1.0.0-M20.jar:/usr/hdp/current/hadoop-yarn-client/lib/jetty-sslengine-6.1.26.hwx.jar:/usr/hdp/current/hadoop-yarn-client/lib/api-util-1.0.0-M20.jar:/usr/hdp/current/hadoop-yarn-client/lib/asm-3.2.jar:/usr/hdp/current/hadoop-yarn-client/lib/avro-1.7.4.jar:/usr/hdp/current/hadoop-yarn-client/lib/java-xmlbuilder-0.4.jar:/usr/hdp/current/hadoop-yarn-client/lib/azure-keyvault-core-0.8.0.jar:/usr/hdp/current/hadoop-yarn-client/lib/jetty-util-6.1.26.hwx.jar:/usr/hdp/current/hadoop-yarn-client/lib/azure-storage-5.4.0.jar:/usr/hdp/current/hadoop-yarn-client/lib/json-smart-1.1.1.jar:/usr/hdp/current/hadoop-yarn-client/lib/commons-beanutils-1.7.0.jar:/usr/hdp/current/hadoop-yarn-client/lib/javassist-3.18.1-GA.jar:/usr/hdp/current/hadoop-yarn-client/lib/commons-beanutils-core-1.8.0.jar:/usr/hdp/current/hadoop-yarn-client/lib/commons-cli-1.2.jar:/usr/hdp/current/hadoop-yarn-client/lib/jsp-api-2.1.jar:/usr/hdp/current/hadoop-yarn-client/lib/commons-codec-1.4.jar:/usr/hdp/current/hadoop-yarn-client/lib/javax.inject-1.jar:/usr/hdp/current/hadoop-yarn-client/lib/commons-collections-3.2.2.jar:/usr/hdp/current/hadoop-yarn-client/lib/jsr305-3.0.0.jar:/usr/hdp/current/hadoop-yarn-client/lib/commons-compress-1.4.1.jar:/usr/hdp/current/hadoop-yarn-client/lib/jersey-guice-1.9.jar:/usr/hdp/current/hadoop-yarn-client/lib/commons-configuration-1.6.jar:/usr/hdp/current/hadoop-yarn-client/lib/leveldbjni-all-1.8.jar:/usr/hdp/current/hadoop-yarn-client/lib/commons-digester-1.8.jar:/usr/hdp/current/hadoop-yarn-client/lib/commons-io-2.4.jar:/usr/hdp/current/hadoop-yarn-client/lib/log4j-1.2.17.jar:/usr/hdp/current/hadoop-yarn-client/lib/commons-lang-2.6.jar:/usr/hdp/current/hadoop-yarn-client/lib/metrics-core-3.0.1.jar:/usr/hdp/current/hadoop-yarn-client/lib/commons-lang3-3.4.jar:/usr/hdp/current/hadoop-yarn-client/lib/netty-3.6.2.Final.jar:/usr/hdp/current/hadoop-yarn-client/lib/commons-logging-1.1.3.jar:/usr/hdp/current/hadoop-yarn-client/lib/nimbus-jose-jwt-3.9.jar:/usr/hdp/current/hadoop-yarn-client/lib/commons-math3-3.1.1.jar:/usr/hdp/current/hadoop-yarn-client/lib/commons-net-3.1.jar:/usr/hdp/current/hadoop-yarn-client/lib/objenesis-2.1.jar:/usr/hdp/current/hadoop-yarn-client/lib/curator-client-2.7.1.jar:/usr/hdp/current/hadoop-yarn-client/lib/paranamer-2.3.jar:/usr/hdp/current/hadoop-yarn-client/lib/curator-framework-2.7.1.jar:/usr/hdp/current/hadoop-yarn-client/lib/protobuf-java-2.5.0.jar:/usr/hdp/current/hadoop-yarn-client/lib/curator-recipes-2.7.1.jar:/usr/hdp/current/hadoop-yarn-client/lib/fst-2.24.jar:/usr/hdp/current/hadoop-yarn-client/lib/gson-2.2.4.jar:/usr/hdp/current/hadoop-yarn-client/lib/guava-11.0.2.jar:/usr/hdp/current/hadoop-yarn-client/lib/guice-3.0.jar:/usr/hdp/current/hadoop-yarn-client/lib/servlet-api-2.5.jar:/usr/hdp/current/hadoop-yarn-client/lib/guice-servlet-3.0.jar:/usr/hdp/current/hadoop-yarn-client/lib/jersey-json-1.9.jar:/usr/hdp/current/hadoop-yarn-client/lib/htrace-core-3.1.0-incubating.jar:/usr/hdp/current/hadoop-yarn-client/lib/snappy-java-1.0.4.1.jar:/usr/hdp/current/hadoop-yarn-client/lib/httpclient-4.5.2.jar:/usr/hdp/current/hadoop-yarn-client/lib/httpcore-4.4.4.jar:/usr/hdp/current/hadoop-yarn-client/lib/jersey-server-1.9.jar:/usr/hdp/current/hadoop-yarn-client/lib/jackson-annotations-2.2.3.jar:/usr/hdp/current/hadoop-yarn-client/lib/stax-api-1.0-2.jar:/usr/hdp/current/hadoop-yarn-client/lib/jackson-core-2.2.3.jar:/usr/hdp/current/hadoop-yarn-client/lib/xmlenc-0.52.jar:/usr/hdp/current/hadoop-yarn-client/lib/jackson-core-asl-1.9.13.jar:/usr/hdp/current/hadoop-yarn-client/lib/xz-1.0.jar:/usr/hdp/current/hadoop-yarn-client/lib/jackson-databind-2.2.3.jar:/usr/hdp/current/hadoop-yarn-client/lib/zookeeper-3.4.6.2.6.4.0-91.jar:/usr/hdp/current/hadoop-yarn-client/lib/jackson-jaxrs-1.9.13.jar:/usr/hdp/current/hadoop-yarn-client/lib/jets3t-0.9.0.jar:/usr/hdp/current/hadoop-yarn-client/lib/jackson-mapper-asl-1.9.13.jar:/usr/hdp/current/hadoop-yarn-client/lib/zookeeper-3.4.6.2.6.4.0-91-tests.jar:/usr/hdp/current/hadoop-yarn-client/lib/jackson-xc-1.9.13.jar:/usr/hdp/current/hadoop-yarn-client/lib/jaxb-api-2.2.2.jar:/usr/hdp/current/hadoop-yarn-client/lib/jaxb-impl-2.2.3-1.jar:/usr/hdp/current/hadoop-yarn-client/lib/jcip-annotations-1.0.jar:/usr/hdp/current/hadoop-yarn-client/lib/jersey-client-1.9.jar:/usr/hdp/current/hadoop-yarn-client/lib/jettison-1.1.jar:/usr/hdp/current/ext/hadoop/*
 --------------------------------------------------------------------------------
 Registered UNIX signal handlers for [TERM, HUP, INT]
 YARN daemon is running as: root Yarn client user obtainer: root
 Loading configuration property: rest.port, 8081
 Loading configuration property: internal.cluster.execution-mode, NORMAL
 Loading configuration property: parallelism.default, 1
 Loading configuration property: high-availability.cluster-id, application_1548409829387_0024
 Loading configuration property: jobmanager.rpc.address, localhost
 Loading configuration property: taskmanager.numberOfTaskSlots, 3
 Loading configuration property: jobmanager.rpc.port, 6123
 Loading configuration property: taskmanager.heap.size, 4096m
 Loading configuration property: jobmanager.heap.size, 1024m
 Setting directories for temporary files to: /hadoop/yarn/local/usercache/root/appcache/application_1548409829387_0024,/data/hadoop/yarn/local/usercache/root/appcache/application_1548409829387_0024
 Starting YarnSessionClusterEntrypoint.
 Install default filesystem.
 Hadoop user set to root (auth:SIMPLE)
 Initializing cluster services.
 Trying to start actor system at hadoop4:0
 Slf4jLogger started
 Starting remoting
 Remoting started; listening on addresses :[akka.tcp://[email protected]:36701]
 Actor system started at akka.tcp://[email protected]:36701
 Created BLOB server storage directory /data/hadoop/yarn/local/usercache/root/appcache/application_1548409829387_0024/blobStore-3478c7e8-dec7-4762-b86e-8ba6f8c735be
 Started BLOB server at 0.0.0.0:37214 - max concurrent requests: 50 - max backlog: 1000
 No metrics reporter configured, no metrics will be exposed/reported.
 Trying to start actor system at hadoop4:0
 Slf4jLogger started
 Starting remoting
 Actor system started at akka.tcp://[email protected]:43522
 Remoting started; listening on addresses :[akka.tcp://[email protected]:43522]
 Initializing FileArchivedExecutionGraphStore: Storage directory /hadoop/yarn/local/usercache/root/appcache/application_1548409829387_0024/executionGraphStore-6fee972e-9636-4313-b7d4-060c1f2c8900, expiration time 3600000, maximum cache size 52428800 bytes.
 Created BLOB cache storage directory /data/hadoop/yarn/local/usercache/root/appcache/application_1548409829387_0024/blobStore-6b334ec7-a3be-4889-a88d-589df96a1504
 Upload directory /tmp/flink-web-d19ff756-c114-4c5e-a352-c5a36dc588f5/flink-web-upload does not exist, or has been deleted externally. Previously uploaded files are no longer available.
 Created directory /tmp/flink-web-d19ff756-c114-4c5e-a352-c5a36dc588f5/flink-web-upload for file uploads.
 Starting rest endpoint.
 Determined location of main cluster component log file: /hadoop/yarn/log/application_1548409829387_0024/container_e07_1548409829387_0024_01_000001/jobmanager.log
 Determined location of main cluster component stdout file: /hadoop/yarn/log/application_1548409829387_0024/container_e07_1548409829387_0024_01_000001/jobmanager.out
 Rest endpoint listening at hadoop4:40450
 http://hadoop4:40450 was granted leadership with leaderSessionID=00000000-0000-0000-0000-000000000000
 Web frontend listening at http://hadoop4:40450.
 Starting RPC endpoint for org.apache.flink.yarn.YarnResourceManager at akka://flink/user/resourcemanager .
 Starting RPC endpoint for org.apache.flink.runtime.dispatcher.StandaloneDispatcher at akka://flink/user/dispatcher .
 Connecting to ResourceManager at hadoop2/192.168.51.22:8030
 Recovered 0 containers from previous attempts ([]).
 yarn.client.max-cached-nodemanagers-proxies : 0
 ResourceManager akka.tcp://[email protected]:36701/user/resourcemanager was granted leadership with fencing token 00000000000000000000000000000000
 Starting the SlotManager.
 Dispatcher akka.tcp://[email protected]:36701/user/dispatcher was granted leadership with fencing token 00000000-0000-0000-0000-000000000000
 Recovering all persisted jobs.
  • Run Flink Job: 提交 Flink Job
# 查看 flink run 使用方法
$ flink run -h

Action "run" compiles and runs a program.

  Syntax: run [OPTIONS] <jar-file> <arguments>
  "run" action options:
     -c,--class <classname>               Class with the program entry point
                                          ("main" method or "getPlan()" method.
                                          Only needed if the JAR file does not
                                          specify the class in its manifest.
...

# 将 flink job 提交到 yarn session
# YARN Session 方式不能在提交 Flink Job 的时候动态改变 taskManager 的内存参数,因为这些只在第一次启动 YARN Session
# 的时候生效,并且多个 Job 生成的日志也比较混乱。
$ flink run \
    --class com.rich.apps_streaming.BaseDataStreamingJob \
    --yarnname "Base data streaming to hbase" \
    --yarnapplicationId "application_1548409829387_0024" \
    --parallelism 10 \
    --yarnslots 3 \
    --yarntaskManagerMemory 4096 \
    --detached \
    --yarnstreaming \
    violent_search.jar

Run a single flink job on YARN

# 如果不指定 YARN Session ApplicationId,则为 Flink Job 单独启动一个 YARN Session <推荐>
# 这种方式启动的任务更加灵活,方便管理、查看日志和调试
# 坏处是每次提交任务都会在 hdfs:///user/${START_USER}/.flink/ 下生成一个 application_xxxxxxxxxxxxx_xxxx,包含 flink 相关的依赖包(220M+)
$ flink run \
    --class ${main_class} \
    --yarnname "\"$yarn_name\"" \
    --jobmanager yarn-cluster \
    --yarnslots ${yarn_slots} \
    --yarndetached \
    --parallelism ${parallelism} \
    -yD taskmanager.heap.size=${taskManager_heap_memory} \
    ${jar_path} \
    "--topic=${topic}" \
    "--groupId=${groupId}" \
    "--htable=${htable}" \
    "--columnFamily=${columnFamily}" \
    "--windowSize=${windowSize}" \
    "--walStrategy=${walStrategy}" \
    > ${log_path} 2>&1 &

yarn-session.sh的启动日志中可以找到类似JobManager Web Interface: http://hadoop2:38710信息

Step4: 查看日志

对于 YARN-Session 提交的 Flink Job 的日志会输出在 YARN-Session Application 中。可以通过yarn logs -applicationId application_1548409829387_0024查看运行日志。也可以在 Flink Web Dashboard Web UI http://hadoop2:38710/#/jobmanager/log中查看

对于 client 端输出的用户日志(非分布式任务中的日志),则在$FLINK_HOME/log/flink-${START_USER}-client-${START_HOST}.log中查看。

Connectors

Apache Kafka Connector

Kafka Connector

// streaming
import org.apache.flink.streaming.api.scala._
// set up the streaming execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
// batch
import org.apache.flink.api.scala._
// set up the batch execution environment
val env = ExecutionEnvironment.getExecutionEnvironment

打赏一个呗

取消

感谢您的支持,我会继续努力的!

扫码支持
扫码支持
扫码打赏,一毛也是爱

打开支付宝扫一扫,即可进行扫码打赏哦