数据工程101 – Apache Airflow中的Python运算符入门
Apache Airflow是数据工程师必备的工具。它使创建和监视所有工作流变得更加容易。当您有多个工作流程时,您更有可能为多个工作流程使用相同的数据库和相同的文件路径。使用变量是在不同工作流程之间定义此类共享信息的最有效方法之一。
我们将在本文中介绍变量的概念,并在Apache Airflow中介绍一个Python运算符的示例。
Apache Airflow
本文是《数据工程101 – Apache Airflow入门》的续篇,其中我们介绍了气流数据库的功能和组件,安装步骤,并创建了基本的DAG。因此,如果您是Apache Airflow的完整入门者,建议您先阅读该文章。
目录
什么是Apache Airflow?
启动气流
Apache Airflow中的Python运算符
Apache Airflow中的变量是什么?
什么是Apache Airflow?
Apache Airflow是一个工作流引擎,可以轻松地计划和运行复杂的数据管道。这将确保数据管道中的每个任务将以正确的顺序执行,并且每个任务都将获取所需的资源。
它将为您提供令人惊叹的用户界面,以监视和修复可能出现的任何问题。
Apache Airflow
启动气流
我们已经在本系列的上一篇文章中讨论了安装步骤。
要启动气流服务器,请打开终端并运行以下命令。默认端口为8080,如果您将该端口用于其他用途,则可以对其进行更改。
气流网络服务器-p 8080
现在,在另一个终端中使用以下命令启动气流调度程序。它将监视您的所有工作流程并在您分配它们时触发它们。
气流调度器
现在,请确保您有一个文件夹名称的DAG中的气流目录中,您将定义DAG和打开网页浏览器,开放式:HTTP://本地主机:8080 /管理/,你会看到这样的事情:
气流仪表板
Apache Airflow中的Python运算符
运算符描述工作流的单个任务,并且运算符为我们提供了许多不同的运算符,例如BashOperator,PythonOperator,EmailOperator,MySqlOperator等。在上一篇文章中,我们学习了如何使用BashOperator来获取实时live分数,在此,我们将看到如何使用PythonOperator。
让我们看下面的例子:
导入库
让我们从导入所需的库开始。这次我们将使用PythonOperator。
定义DAG参数
对于每个DAG,我们需要传递一个参数字典。这是您可以传递的一些参数的描述:
owner:工作流所有者的名称,应为字母数字并且可以带有下划线,但不能包含任何空格。
depends_on_past:如果每次运行工作流时,数据都取决于过去的运行,则将其标记为True,否则将其标记为False。
start_date:工作流程的开始日期
email:您的电子邮件ID,以便任何任务由于任何原因失败时都可以接收电子邮件。
retry_delay:如果任何任务失败,则应等待多少时间才能重试。
定义Python函数
现在,我们将定义python函数,该函数将使用参数打印字符串,此函数稍后将由PythonOperator使用。
定义DAG
现在,我们将创建一个DAG对象,并传递dag_id(它是DAG的名称),并确保您之前没有使用该名称创建任何DAG。传递我们先前定义的参数,并添加描述和schedule_interval,它们将在指定的时间间隔后运行DAG
定义任务
我们的工作流程只有一项任务:
print:在任务中,我们将使用python函数在终端上打印“ Apache Airflow是数据工程师的必备工具”。
我们将task_id传递给PythonOperator对象。您将在DAG的“图形视图”的节点上看到此名称。将python函数名称传递给要运行的参数“ python_callable”,并将函数所使用的参数传递给参数“ op_kwargs”作为字典,最后传递给您要将此任务链接到的DAG对象。
运行DAG
现在,当您刷新Airflow仪表板时,您将在列表中看到新的DAG。
单击DAG并打开图形视图,您将看到类似这样的内容。工作流中的每个步骤都将放在一个单独的框中。在此工作流程中,我们只有一个步骤是打印。运行工作流程,等待其边框变为深绿色,这表示已成功完成。
Python操作员气流
单击节点“ print”以获取有关此步骤的更多详细信息,然后单击Logs,您将看到类似这样的输出。
Apache Airflow中的变量是什么?
我们知道,Airflow可用于创建和管理复杂的工作流程。我们可以同时运行多个工作流程。您的大多数工作流程都有可能使用相同的数据库或相同的文件路径。现在,进行任何更改,例如更改使用保存文件的目录的路径或更改数据库的配置。在这种情况下,您不想单独更新每个DAGS。
Airflow为此提供了一个解决方案,您可以创建变量,以便在运行时在多个DAGS中存储和检索数据。因此,如果发生任何重大变化,您只需编辑变量即可,您的工作流程就很好了。
如何创建变量?
打开Airflow仪表板,从顶部菜单中单击Admin,然后单击Variables。
变数
现在,单击创建以创建一个新变量,然后将打开一个窗口。添加键和值并提交。在这里,我正在创建一个变量,其键名是data_path,值是任何随机文本文件的路径。
python运算符气流数据路径
现在,我们将创建一个DAG,在其中我们将找到此文件中存在的文本数据的字数。当您要使用变量时,需要将其导入。让我们看看如何做到这一点:
然后,我们将定义函数,该函数将使用变量的路径,对其进行读取并计算字数。
其余步骤与我们之前的步骤相同,您需要定义DAG和任务,并且工作流已准备就绪。
您现在可以在日志中查看结果,现在可以在其他任何DAG中使用此变量,也可以随时对其进行编辑,并且所有DAGS都可以更新。
Python操作员气流-DAG更新
尾注
在本文中,我们了解了如何在Apache Airflow中使用Python运算符,诸如分支和变量之类的概念以及如何创建它们。在下一篇文章中,我们将创建一个
机器学习项目并使用Apache Airflow自动化其工作流程。
题库