Flink实操:Flink SQL实现SFTP文件读写操作
=============================
一、背景
公司需要将Doris数据库中的部分表数据同步至SFTP服务器,以供其他合作企业安全读取和使用。目前,平台数据同步功能统一使用Flink引擎进行实时同步、离线同步的工作。因此,希望能够充分利用现有的Flink引擎,并将其复用于这一需求。下图是我们的解决方案的结构示意图:
二、技术调研
- 由于我们选择使用
Flink
引擎来实现需求,我们需要进行调研以确定Flink
是否支持SFTP
和Doris
的连接器。经过查阅当前的Flink
版本(v1.20-SNAPSHOT)
的文档,我们发现Flink Table API
仅提供FileSystem SQL Connector用于操作文件。 - 然而,在文档中并未提及该连接器是否支持
SFTP
协议,同时也没有提供指定SFTP
的主机名、端口和秘钥文件的参数。因此,我们需要进一步验证是否可以使用该连接器来访问SFTP
,并配置相关参数。以下是官方示例:
CREATE TABLE MyUserTable (
column_name1 INT,
column_name2 STRING,
...
part_name1 INT,
part_name2 STRING
) PARTITIONED BY (part_name1, part_name2) WITH (
'connector' = 'filesystem', -- required: specify the connector
'path' = 'file:///path/to/whatever', -- required: path to a directory
'format' = '...', -- required: file system connector requires to specify a format,
-- Please refer to Table Formats
-- section for more details
'partition.default-name' = '...', -- optional: default partition name in case the dynamic partition
-- column value is null/empty string
'source.path.regex-pattern' = '...', -- optional: regex pattern to filter files to read under the
-- directory of `path` option. This regex pattern should be
-- matched with the absolute file path. If this option is set,
-- the connector will recursive all files under the directory
-- of `path` option
-- optional: the option to enable shuffle data by dynamic partition fields in sink phase, this can greatly
-- reduce the number of file for filesystem sink but may lead data skew, the default value is false.
'sink.shuffle-by-partition.enable' = '...',
...
)
- 因此,我们需要进一步确认该连接器是否适用于
SFTP
。首先想到的是Flink
自身的CheckPoint
功能,它支持HDFS、S3
等文件存储系统。底层实现是通过org.apache.flink.core.fs.FileSystem类来进行操作。而这个类同样是FileSystem SQL Connector
的底层实现类。既然如此,Flink#FileSystem
应该支持访问HDFS、S3等其他文件系统,那其内部必然会使用hadoop#FileSystem
的api,而hadoop#FileSystem
自身如果支持SFTP
,则此路可以走通,为了确认这一点,开始查看源码并查看类注释中的相关信息,发现一段有用信息:
/**
* Flink implements and supports some file system types directly (for example the default machine-local file system). Other file * system types are accessed by an implementation that bridges to the suite of file systems supported by Hadoop (such as for
* example HDFS).
*/
// 翻译:Flink直接实现并支持一些文件系统类型(例如默认的机器本地文件系统)。其他文件系统类型由桥接到Hadoop支持的文件系统套件(例如HDFS)的实现访问。
- 看到此处信心倍增,继续翻阅后发现在
Flink#FileSystem
有一个getUnguardedFileSystem函数,如下图:
- 该函数会检测文件
url
路径,如果路径是file://
则会走Flink#FileSystem
的内部实现,如果是hdfs://
,sftp://
这类前缀,则会调用loadHadoopFsFactory函数,如下图:
- 至此找到
Flink#FileSystem
与Hadoop#FileSystem
的桥接处,在该函数中会先加载hadoop#FileSystem
并构建Flink#FileSystem
的子类HadoopFileSystem,而在HadoopFileSystem
类中使用hadoop#FileSystem
提供的能力。 - 那么
hadoop#FileSystem
是否提供了读写SFTP的能力呢?经过调研发现只有Hadoop-2.8.x
版本以上才支持SFTP,JIRA工单
- 至此总结以下:虽然
Flink#FileSystem
原生并未支持sftp
读写,但Flink#FileSystem
中如果遇见不支持的文件前缀如:hdfs://
或者sftp://
,则会桥接到Hadoop#FileSystem
类中,而Hadoop#FileSystem
底层是支持丰富的文件类型,其中的SFTPFileSystem
实现类即可读写SFTP
,逻辑图如下:
三、程序验证
- 有了理论支撑后开始程序验证,先在
sftp
文件系统上放置一个user.csv
文件供FlinkSQL读取,文件内容如下:
1,jack
2,tom
3,lily
- 再在
IDE
中导入相关Flink、Hadoop
相关依赖:
<dependencies>
<!-- flink start -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- flink end -->
<!-- hadoop start -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<!-- hadoop end -->
</dependencies>
- 编写程序:
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class SftpExample {
public static void main(String[] args) {
// 设置执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 定义文件系统连接器
tableEnv.executeSql(
"CREATE TABLE SftpCsvTable (" +
" age INT," +
" name STRING" +
") WITH (" +
" 'connector'='filesystem'," +
" 'path'='sftp://101.230.65.134:21821/user.csv'," +
" 'format'='csv'" +
")"
);
// 执行 SQL 查询操作
tableEnv.executeSql("SELECT * FROM SftpCsvTable").print();
}
}
- 启动后报错如下: 没有格式为
sftp
的FileSystem
类。
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'sftp'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded. For a full list of supported file systems, please see https://nightlies.apache.org/flink/flink-docs-stable/ops/filesystems/.
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:543)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:409)
at org.apache.flink.core.fs.Path.getFileSystem(Path.java:274)
at org.apache.flink.connector.file.src.enumerate.NonSplittingRecursiveEnumerator.enumerateSplits(NonSplittingRecursiveEnumerator.java:82)
at org.apache.flink.connector.file.src.AbstractFileSource.createEnumerator(AbstractFileSource.java:141)
... 34 more
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop File System abstraction does not support scheme 'sftp'. Either no file system implementation exists for that scheme, or the relevant classes are missing from the classpath.
at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:100)
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:526)
... 38 more
Caused by: java.io.IOException: No FileSystem for scheme: sftp
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2798)
at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:98)
... 39 more
- 随后
debug
定位异常代码发现 :在hadoop
程序中默认不开启sftp
的实现类,若要使用则需要在core-site.xml
配置文件中配置fs.sftp.impl
,如下图:
- 此外,光配置
fs.sftp.impl
还不够,生产中访问sftp一般都是指定用户名密码或者秘钥,而这些参数的配置项在SFTPFileSystem
中有变量可以参考,如下:
7. 最终我们将开发环境中hadoop#core-site.xml
配置文件放置在IDE#Resource
目录下,并根据SFTP
相关信息配置好对应K,V,内容如下:
<configuration>
<!-- 配置sftp实现类 -->
<property>
<name>fs.sftp.impl</name>
<value>org.apache.hadoop.fs.sftp.SFTPFileSystem</value>
</property>
<!-- 配置sftp用户名 -->
<property>
<name>fs.sftp.user.101.230.65.134</name>
<value>username</value>
</property>
<!-- 配置sftp秘钥路径 -->
<property>
<name>fs.sftp.keyfile</name>
<value>D:\IdeaProjects\flink-sftp\src\main\resources\uat_sftp_qadmin.rsa</value>
</property>
<!-- 配置sftp密码 -->
<property>
<name>fs.sftp.password.101.230.65.134</name>
<value>password</value>
</property>
</configuration>
- 执行成功,如下图:
- 至此我们可以使用FlinkSQL#FileSystem已经写好的各种文件格式类型以及分区功能,还可以享受Hadoop#SFTPFileSystem读写sftp的能力,可以说完美的解决此需求。
四、总结
通过这次详细排查和研究,对Flink文件系统的实现有了更加深入的理解。起初,我对FlinkSQL是否支持sftp产生了疑问,然而,通过逐步追踪源代码,逐渐揭示了底层逻辑的实现机制:尽管原生的Flink#FileSystem
并没有直接支持SFTP的读写操作,但它通过一个巧妙的桥接机制,将不支持的文件前缀(例如hdfs://
或者sftp://
)重新定向到Hadoop#FileSystem
类来处理。而Hadoop#FileSystem
底层提供了对多种文件类型的广泛支持,只要存在SFTP的实现类,就可以顺利进行操作。这个过程中,深刻体会到了理解底层原理和追踪代码的重要性。
这次经历让我明白,在后续的开发过程中,我们应该保持持续的好奇心,提出更多问题,积极思考,并深入探索底层实现原理,时刻保持探索精神,不断拓展我们的知识和技能,以便在开发过程中能够更加游刃有余地应对各种挑战。
五、相关资料
- Hadoop#JIRA工单
- FileSystem SQL Connector
- org.apache.flink.core.fs.FileSystem
- getUnguardedFileSystem
- loadHadoopFsFactory
- HadoopFileSystem
原文链接: https://juejin.cn/post/7350917259718967323
文章收集整理于网络,请勿商用,仅供个人学习使用,如有侵权,请联系作者删除,如若转载,请注明出处:http://www.cxyroad.com/17294.html