允许气流DAG通过Livy:session和/或批处理运行Spark作业。

airflow-livy-operators的Python项目详细描述


气流Livy操作员

Build StatusCode coverage

让气流DAG通过Livy运行Spark作业:

  • 会议
  • 批处理。此模式支持通过Spark/YARN REST API进行附加验证。在

请参阅this blog post以获取更多信息和从气流运行Spark作业的方法的详细比较。在

感兴趣的目录和文件

  • airflow_home/plugins:flow Livy操作员代码。在
  • airflow_home/dags:气流的DAGs示例。在
  • batches:Spark作业代码,将在Livy批处理中使用。在
  • sessions:Livy会话的Spark代码。可以添加模板 以将参数传递到其中。在
  • helper.sh:助手shell脚本。可用于运行样本DAG, 准备开发环境等。 运行它来找出还有哪些命令可用。在

我怎么

…运行示例?

先决条件:

现在

  1. 可选-如果要模拟 机器。打开助手.sh。在init_airflow()函数中,您将看到气流 Livy,Spark和YARN的连接。根据需要重新定义。在
  2. 定义将此repo中的示例批处理文件传递到集群的方式:
    1. 如果使用docker compose群集:重新定义BATCH_DIR变量 视情况而定。在
    2. 如果您使用自己的集群:修改copy_batches()函数,使其 将文件传递到集群可访问的位置(可以是aws s3 cp等)
  3. 运行./helper.sh up以启动整个基础结构。 气流用户界面将在 localhost:8888。在
  4. Ctrl+C停止气流。然后./helper.sh down来处理 剩余的气流过程(如果一切顺利,就不需要了。 如果由于某些非信息性错误而无法再次启动气流,请运行此程序)。在

。。。在我的项目中使用它?

pip install airflow-livy-operators

导入方法如下:

^{pr2}$

请参阅airflow_home/dags下的示例dag以了解如何使用运算符。在

。。。建立开发环境?

好吧,你想做点贡献,并且需要能够在你的机器上运行这些东西, 以及IDE(调试、语法高亮显示)带来的通常的美好。在

  • 运行./helper.sh dev安装所有的开发依赖项。在
  • ./helper.sh updev使用本地操作员代码运行气流(与 从PyPi中提取它们)。对开发有用。在
  • (特定于Pycharm)将Pycharm指向新创建的虚拟环境:转到 "Preferences" -> "Project: airflow-livy-operators" -> "Project interpreter", select "Existing environment"并从venv文件夹中选择python3可执行文件 (venv/bin/python3
  • ./helper.sh cov-使用覆盖率报告运行测试 (将保存到htmlcov/)。在
  • ./helper.sh lint-突出显示代码样式错误。在
  • ./helper.sh format以重新格式化所有代码。 (此项目依赖Black+ isort
  • ./helper.sh pypi-为PyPi生成包。在

。。。调试?

  • (特定于Pycharm)使用airflow test逐步调试 以及在本地运行PySpark批处理作业(以及调试) 通过.idea/runConfigurations下的运行配置支持。 你不应该做任何事情来使用它们-只要打开文件夹 作为一个项目。在
  • 如何在本地Spark上运行批处理的示例:
python ./batches/join_2_files.py \"file:////Users/vpanov/data/vpanov/bigdata-docker-compose/data/grades.csv"\"file:///Users/vpanov/data/vpanov/bigdata-docker-compose/data/ssn-address.tsv"\
-file1_sep=, -file1_header=true\
-file1_schema="\`Last name\` STRING, \`First name\` STRING, SSN STRING, Test1 INT, Test2 INT, Test3 INT, Test4 INT, Final INT, Grade STRING"\
-file1_join_column=SSN -file2_header=false\
-file2_schema="\`Last name\` STRING, \`First name\` STRING, SSN STRING, Address1 STRING, Address2 STRING"\
-file2_join_column=SSN -output_header=true\
-output_columns="file1.\`Last name\` AS LastName, file1.\`First name\` AS FirstName, file1.SSN, file2.Address1, file2.Address2"# Optionally append to save result to file#-output_path="file:///Users/vpanov/livy_batch_example" 

托多

  • 在助手.sh-替换为现代工具(例如pipenv+Docker image)
  • 禁用一些flake8标志以获得更干净的代码

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
Java中的开源字典组件   即使在成功执行删除查询之后,java更新的列表也不会显示在jsp页面中   java Apache:无法启动上下文路径/网站上的失败应用程序   java验证CSV中的特定列   对于具有专用内存的java应用程序,最小堆大小低于最大堆大小有意义吗?   java将数组中的值转换为多维数组   java在给定程序中,垃圾收集器在对象被取消引用之前正在运行。。。使用jre 7(32位)   java在运行时动态刷新文件夹   eclipse如何解决“java.net.BindException:地址已在使用:JVM_Bind”错误?   Java数组与数组   每次任务完成任务时,Java多线程都会安排任务   java部分编译时使用maven编织第三方jar   java Dokku单一回购中的多个应用程序   用apachevelocity生成javac/C++语言文件   java如何使用spring应用程序上下文中的属性文件实例化列表   java访问智能卡文件结构   具有GlobalMethodSecurity的java自定义UserDetailService循环引用   java如何集成Spring和JSF