Apache Airflow Features & Use cases

BigData & Cloud Practice
4 min readDec 9, 2020

--

Article by Ankita Anil Shah, Big Data & Cloud Developer

Purpose:
While using Apache Airflow tool in one of the projects, we faced some challenges but despite Apache Airflow being an open-source tool, we found out that the official documentation is not sufficient enough to solve our challenges and also the community support for the Apache Airflow is not exhaustive enough. So the intent of this blog is to share our experience while using Apache Airflow.

Introduction to Apache Airflow:
Apache Airflow is developed by Airbnb. Apache Airflow is primarily used for orchestrating workflows which is mainly used to extract, transform, load, and store the data. Workflows are expressed as a DAG(Directed Acyclic Graph). DAG is nothing but the python script having multiple tasks. A task is an executable piece of the operator. Below are the most common operators used while writing the workflows, there are other operators as well you can always check them out on the official document.

BashOperator — executes a bash command
PythonOperator — calls an arbitrary Python function
EmailOperator — send an email
SimpleHttpOperator — send an HTTP request

Install Airflow (Ubuntu Environment):
In the official document, there are only four steps given to install Apache Airflow, but that unfortunately didn’t work in our case. We found out another way to install Apache Airflow i.e by creating a Python virtual environment. The steps are as follows:

  1. Install and activate a Python virtual environment. This will allow us to install and update python packages without affecting the core machine’s python libraries.
sudo pip install virtualenv
source venv/bin/activate

2. Install Apache Airflow Server

(venv)>pip install "apache-airflow"

3. Initialize the Apache Airflow database. Airflow documentation recommends MySQL or Postgres.

(venv)>airflow initdb

4. Launch the scheduler and the webserver

(venv)>airflow scheduler -D
(venv)>airflow webserver -D

Now, we can see the Airflow admin page on port 8080, which contains the example DAGs.

Usecase:
We are using Apache Airflow to check whether a new file is present in HDFS or not, if a new file is present then we will process that file and store data into the Hive table.

Let’s write the code at once:

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
hive_table_name = test //global variable
def new_file_check(**kwargs):
//code that will check file present in HDFS or not.
def ingestdata(**kwargs):
//code to pull the file name from the first task, and save the data into hive table.
// Hive table name will be passed as a variable to this function.
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': ['abc@gmail.com'],
'email_on_failure': ['abc@gmail.com']
}
dag = DAG(
'Demo',
description='Demo Dag',
schedule_interval='0 1 * * *',
start_date=datetime(2020, 4, 28),
default_args=default_args,
catchup=False )
newFileCheckTask= PythonOperator(
task_id='file_check',
python_callable=new_file_check,
provide_context=True,
dag=dag )
ingestDataTask= PythonOperator(
task_id='ingest_data',
python_callable=ingestdata,
provide_context=True,
dag=dag )
newFileCheckTask >> ingestDataTask

Description of parameters:
owner: Owner of the pipeline
depends_on_past: Whether the current pipeline is dependent on any previous pipeline or not, if the first pipeline gets failed then the current pipeline will not start.
email: email to notify
email_on_failure: Sends the email only if the pipeline fails
schedule_interval: Cron frequency to schedule the pipeline
start_date: Start date of the pipeline
catchup: Whether to run the all scheduled run of the pipeline, if the start date is from the past.
task_id: Name of the task
python_callable: Python function which we want to execute
provide_context: If it is true then Airflow passes the variables to the function.

Issue Faced:

In this case, we want to pass the filename from one task to the next (ingest_data) task. How to pass the variables from one task to another task? How to communicate between the tasks?

Let’s fix It:

Passing variables from one to another task is also called inter-task communication and for this, we need to use the XCOM variable.

XCOM Push:

Which stores the variable in the form of key-value pairs. This data gets stored in one of the metadata tables of Apache Airflow. Xcom_push() method makes the variables available to other tasks.

Syntax:

kwargs[‘ti’].xcom_push(key = <name of key > ,value = < value of key >)

XCOM Pull:

While retrieving the variable we have to give the dag_id which is nothing but the name of dag, task_id which is nothing but the name of the task where xcom_push() method is called and at last we have to give the key name.

Syntax:

kwargs[‘ti’].xcom_pull(dag_id = <name of dag>, task_ids = < name of task id where we did xcom push>, key= < name of key >)

After fixing the issue the code will look like the below:

def new_file_check(**kwargs):
//code that will checknew file present in HDFS or not.
kwargs['ti'].xcom_push(key='file_name',value=<file_name>)
def ingestdata(**kwargs):
//code to pull the file name from the first task, and save the data into hive table.
// Hive table name will be passed as a variable to this function.
file_name = kwargs['ti'].xcom_pull(dag_id='Demo',task_ids='file_check',key="file_name")

Conclusion:

XCOM is a straight-forward concept, which helps to pass the variables between tasks as well as between DAG.

--

--

BigData & Cloud Practice
BigData & Cloud Practice

Written by BigData & Cloud Practice

Abzooba is an AI and Data Company. BD&C Practice is one of the fastest growing groups in Abzooba helping several fortune 500 clients in there cognitive journey

No responses yet