Flink 使用之连接 Hive metastore

Flink 使用介绍相关文档目录

Flink 使用介绍相关文档目录

前言

Flink SQL很大程度上简化了业务的开发工作量。但是Flink默认的配置,维护表的元数据信息仍然有局限性。Flink默认使用GenericInMemoryCatalog。所有的元数据仅在session范围内存活,一旦作业遇到故障恢复或者是停机等(session被kill掉),所有表信息都会丢失,造成很大的不便。我们需要一个稳定独立的外部组件来存储表的元数据信息。Hadoop生态系统经过多年的发展,Hive metastore事实上已成为元数据存储中心。无论Hive自身,还是Flink, Spark,都采用Hive metastore作为元数据存储。从根本上解决了上述问题。本篇为大家分享如何使用Flink连接Hive metastore并查询Hive表。

环境信息

  • Flink 1.17.2
  • Hive 3.1.0

准备工作

根据需要配合使用的Hive版本,添加对应的依赖到Flink的lib目录中。具体参见官网:https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/table/hive/overview/#user-defined-dependencies

这里以支持Hive 3.1.0为例,需要准备如下jar包:

  • flink-sql-connector-hive-3.1.3_2.12-1.17.2.jar
  • hive-exec-3.1.0.jar
  • libfb303-0.9.3.jar
  • antlr-runtime-3.5.2.jar

除此之外还需要添加mapreduce的client-common,client-core和client-jobclient 3个jar,例如:

  • hadoop-mapreduce-client-common-3.1.1.3.0.1.0-187.jar
  • hadoop-mapreduce-client-core-3.1.1.3.0.1.0-187.jar
  • hadoop-mapreduce-client-jobclient-3.1.1.3.0.1.0-187.jar

最后互换如下两个文件的位置:

mv $FLINK_HOME/opt/flink-table-planner_2.12-1.17.2.jar $FLINK_HOME/lib/flink-table-planner_2.12-1.17.2.jar
mv $FLINK_HOME/lib/flink-table-planner-loader-1.17.2.jar $FLINK_HOME/opt/flink-table-planner-loader-1.17.2.jar

SQL Client的使用

SQL Client的使用请参考Flink 使用之 SQL Client

Yaml 方式配置

Yaml方式的好处是启动Flink SQL client的时候自动加载Hive metastore的配置,不需要每次启动的时候去创建。使用起来类似于Spark SQL & Hive metastore。

编辑$FLINK_HOME/conf/sql-client-defaults.yaml,添加如下内容:

catalogs:
   - name: myhive
     type: hive
     default-database: default
     hive-conf-dir: /usr/hdp/3.0.1.0-187/hive/conf/

配置项的解释如下:

  • name: catalog名称,在Flink SQL client内执行show catalogs;可以查看到。
  • type: catalog类型,这里需要设置为hive。表示使用Hive metastore。
  • default-database: 设置该catalog为默认的catalog时候,默认使用的database。
  • hive-conf-dir: hive-site.xml文件位置,支持使用HDFS路径,本地路径。如果使用本地路径,需要启动的时候能在本地访问到。如果不指定此配置项,Flink默认从classpath中读取Hive配置文件。

然后启动sql client:

export HADOOP_CLASSPATH=`hadoop classpath`

# 启动yarn session
./yarn-session -d -s 2 -jm 2048 -tm 2048

./sql-client.sh embedded -s yarn-session

启动的时候注意这两行日志:

Searching for '/path/to/flink_home/conf/sql-client-defaults.yaml'...found.
Reading default environment from: file:/path/to/flink_home/conf/sql-client-defaults.yaml

看到这两行日志并且启动没有异常,说明Hive catalog配置成功。

我们查看一下hive catalog是否已经被加载。执行下面SQL列出目前存在的catalog:

Flink SQL> show catalogs;
+-----------------+
|    catalog name |
+-----------------+
| default_catalog |
|          myhive |
+-----------------+
2 rows in set

这里myhive就是上面配置的Hive catalog。

接下来使用下面SQL,切换到Hive catalog:

Flink SQL> use catalog myhive;
[INFO] Execute statement succeed.

然后我们找一张Hive表查询,观察是否可以获取到数据:

show tables;

select * from table_xxx;

SQL 方式配置

这种方式不需要额外的配置文件。但是每次使用Flink SQL的时候都需要创建,才可以使用。

启动SQL client的方式和上面的相同。启动成功之后,执行创建catalog的SQL语句。

CREATE CATALOG myhive WITH (
    'type' = 'hive',
    'default-database' = 'mydatabase',
    'hive-conf-dir' = '/opt/hive-conf'
);

-- 使用myhive catalog
USE CATALOG myhive;

create catalog语句的参数解释和上面Yaml配置文件的参数含义相同,不再赘述。

经过上面的配置,我们可以在myhive这个catalog中操作Hive表,或者是在Flink默认的default_catalog中使用Flink表,这些表的元数据信息会存放在Hive metastore中。

Flink 1.13之后版本的sql client支持使用初始化脚本,即在启动sql client的时候自动执行用户指定的SQL文件。我们可以利用这个特性,将创建和使用Hive catalog的语句写入到SQL文件,并配置为初始化脚本。

例如我们的初始化脚本为conf/init.sql

CREATE CATALOG myhive WITH (
    'type' = 'hive',
    'default-database' = 'mydatabase',
    'hive-conf-dir' = '/opt/hive-conf'
);
USE CATALOG myhive;
SET 'execution.runtime-mode' = 'batch';
SET 'table.sql-dialect'='hive';

可使用如下命令启动sql client:

bin/sql-client.sh embedded -s yarn-session -i conf/init.sql

SQL client启动成功之后,无需再执行创建hive catalog和相关配置语句,可直接使用Hive表。

Hive常用参数配置

Flink支持流式读取Hive表,相关配置如下:

  • streaming-source.enable: 是否流式读取Hive,相当于总开关。默认为false。批模式读取Hive,Flink只返回在查询执行的那一刻时候的Hive数据。流模式则不同,除了读取查询启动那一刻的数据外,还会定时监控数据的变化,在发生变化时返回增量数据。
  • streaming-source.partition.include:流式读取哪些分区。默认是all即所有分区,还有一个值latest表示读取最近的一个分区。何为最近由streaming-source.partition-order配置项指定。latest配置项的作用是可以使流式读取只关注保存最新数据的分区,比如使用create_date作为分区字段的时候。
  • streaming-source.monitor-interval:监控Hive数据变更的时间间隔。例如可配置1 min,即每1分钟检查一次数据是否变更。
  • streaming-source.partition-order:支持三个配置create-time, partition-timepartition-name。默认值为partition-namecreate-time比较分区数据文件的修改时间。partition-time比较分区字段值中提取出来的时间值。partition-name将分区字段值按照字符顺序排序。
  • streaming-source.consume-start-offset:流式消费数据的开始偏移量。对于create-timepartition-time,需要配置为时间戳(yyyy-[m]m-[d]d [hh:mm:ss])。对于partition-name,需要配置为分区名字符串,例如pt_year=2020/pt_mon=10/pt_day=01

读取Hive表并行度配置。Flink根据Hive表的文件数和文件的block数自动推断最优的读取并行度。但是这个并行度不一定使最适合当前集群配置运行的。用户可通过如下配置手动干预:

  • table.exec.hive.infer-source-parallelism:是否自动推断并行度。默认为true。
  • table.exec.hive.infer-source-parallelism.max:自动推断并行度的最大值。默认为1000。

写入Hive表时Flink支持文件压缩,避免生成过多的小文件影响读取性能。相关配置如下:

  • auto-compaction:是否开启自动压缩。默认为false不开启。
  • compaction.small-files.avg-size:小文件的界定值默认为16MB。如果某些文件大小小于该值,Flink会合并压缩他们。
  • compaction.file-size:Flink合并后的文件大小的目标值,期待合并后的文件大小在该值附近。
  • compaction.parallelism:压缩操作的并行度。

可以使用SQL hints在执行SQL的时候临时增加/修改一些配置,不影响全局。例如:

SELECT * 
FROM hive_table 
/*+ OPTIONS('streaming-source.enable'='true', 'streaming-source.consume-start-offset'='2020-05-20') */;

参考文献

https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/table/hive/overview/

https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/table/hive/hive_read_write/

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容