Python|PyFlink 开发环境利器:Zeppelin Notebook( 二 )


echo \"name: pyflink_tm_envchannels: - conda-forge - defaultsdependencies: - Python=3.7 - pip - pip: - apache-flink==1.13.1 - pandas \"pyflink_tm_env.yml mamba env remove -n pyflink_tm_envmamba env create -f pyflink_tm_env.yml 运行下面的代码打包 PyFlink 的 conda 环境并且上传到 HDFS (注意这里使用的是 zip 格式)
%shrm -rf pyflink_tm_env.zipconda pack --ignore-missing-files --zip-symlinks -n pyflink_tm_env -o pyflink_tm_env.ziphadoop fs -rmr /tmp/pyflink_tm_env.ziphadoop fs -put pyflink_tm_env.zip /tmp# The Python conda tar should be public accessible so need to change permission here.hadoop fs -chmod 644 /tmp/pyflink_tm_env.zip Step 3. 在 PyFlink 中使用 Conda 环境
接下来就可以在 Zeppelin 中使用上面创建的 Conda 环境了 , 首先需要在 Zeppelin 里配置 Flink , 主要配置的选项有:
flink.execution.mode 为 yarn-application 本文所讲的方法只适用于 yarn-application 模式; 指定 yarn.ship-archives , zeppelin.pyflink.Python 以及 zeppelin.interpreter.conda.env.name 来配置 JobManager 侧的 PyFlink Conda 环境; 指定 Python.archives 以及 Python.executable 来指定 TaskManager 侧的 PyFlink Conda 环境; 指定其他可选的 Flink 配置 , 比如这里的 flink.jm.memory 和 flink.tm.memory 。%flink.confflink.execution.mode yarn-applicationyarn.ship-archives /mnt/disk1/jzhang/zeppelin/pyflink_env.tar.gzzeppelin.pyflink.Python pyflink_env.tar.gz/bin/Pythonzeppelin.interpreter.conda.env.name pyflink_env.tar.gzPython.archives hdfs:///tmp/pyflink_tm_env.zipPython.executable pyflink_tm_env.zip/bin/Python3.7flink.jm.memory 2048flink.tm.memory 2048 接下来就可以如一开始所说的那样在 Zeppelin 里使用 PyFlink 以及指定的 Conda 环境了 。 有 2 种场景:
下面的例子里 , 可以在 PyFlink 客户端 (JobManager 侧) 使用上面创建的 JobManager 侧的 Conda 环境 , 比如下边使用了 Matplotlib 。下面的例子是在 PyFlink UDF 里使用上面创建的 TaskManager 侧 Conda 环境里的库 , 比如下面在 UDF 里使用 Pandas 。 三、总结与未来 本文内容就是在 Zeppelin notebook 里利用 Conda 来创建 Python env 自动部署到 Yarn 集群中 , 无需手动在集群上去安装任何 Pyflink 的包 , 并且可以在一个 Yarn 集群里同时使用多个版本的 PyFlink 。
【Python|PyFlink 开发环境利器:Zeppelin Notebook】每个 PyFlink 的环境都是隔离的 , 而且可以随时定制更改 Conda 环境 。 可以下载下面这个 note 并导入到 Zeppelin , 就可以复现今天讲的内容:http://23.254.161.240/#/notebook/2G8N1WTTS
此外还有很多可以改进的地方:
目前我们需要创建 2 个 conda env, 原因是 Zeppelin 支持 tar.gz 格式 , 而 Flink 只支持 zip 格式 。 等后期两边统一之后 , 只要创建一个 conda env 就可以; apache-flink 现在包含了 Flink 的 jar 包 , 这就导致打出来的 conda env 特别大 , yarn container 在初始化的时候耗时会比较长 , 这个需要 Flink 社区提供一个轻量级的 Python 包 (不包含 Flink jar 包) , 就可以大大减小 conda env 的大小 。本文为阿里云原创内容 , 未经允许不得转载 。