Automation serves as a strategic function to reduce human efforts and enhance productivity in different lines of operations. Nevertheless, most businesses fail to automate the processes, and these human-driven processes consume much time. As an IT professional, you know how boring it can get to collect data, process it, and render reports consistently.
Enter Apache Airflow, a powerful, open-source workflow management tool that makes it possible to automate complex workflows. We’ll guide you through the initiation steps of getting started with Airflow so that we can be able to automatize the workflows, in particular, how to send emails simply using Airflow EmailOperator. Ready to simplify your work? Let’s get started!
What is Apache Airflow?
Started in 2014 at Airbnb as a solution, Apache Airflow has now turned into a trusted Open-Source Workflow Management platform. Written in Python, the popular workflow engine simplifies complex data pipelines and automates management tasks. Further, the tool ensures each task is processed and executed in the correct order. For managing the workflow orchestration, Airflow makes use of Directed Acyclic Graphs (DAGs) that run as per a schedule or when an external event triggers.
The platform helps visualize the success status, data pipelines’ dependencies, code, logs, and progress. It also helps in troubleshooting issues whenever needed. Apache Airflow is one of the flexible and scalable workflow platforms designed to manage the orchestration of complex business logic.
Key Features of Apache Airflow
Some of the key features of Apache Airflow are as follows:
- It is easy to use if you have a fundamental understanding of Python.
- Apache Airflow is a free, scalable open-source workflow management platform.
- It can easily integrate with other platforms like Amazon AWS, Microsoft Azure, Google Cloud, etc.
- It uses python to write code and simplify complex pipelines and workflows.
- The rich user interface helps in monitoring and managing complex workflows. Further, it helps keep track of the ongoing tasks and status.
Ditch the manual process of writing long commands to connect your data sources and choose Hevo’s no-code platform to streamline your data migration.
With Hevo:
- Easily migrate different data types like CSV, JSON etc.
- 150+ connectors(including 60+ free sources).
- Eliminate the need of manual schema mapping with the auto-mapping feature.
Experience Hevo and see why 2000+ data professionals including customers, such as Thoughtspot, Postman, and many more, have rated us 4.3/5 on G2.
Move PostgreSQL Data for Free
What is EmailOperator?
Like the DAGs in airflow are used to define the workflow, operators are the building blocks that decide the actual work. These operators define the work or state the actions that one needs to perform at each step. There are different operators for general tasks, including:
- PythonOperator
- MySqlOperator
- EmailOperator
- BashOperator
Talking about the Airflow EmailOperator, they perform to deliver email notifications to the stated recipient. It is the direct method for Airflow send emails to the recipient. These can be task-related emails or alerts to notify users. The only disadvantage of using Airflow Email Operator is that this operator is not customizable. Here is the code:
t4= EmailOperator(
task_id=t4,
to='test@mail.com',
subject='Alert Mail',
html_content=""" Mail Test """,
dag=dag
)
EmailOperator
is used to send an email within an Airflow DAG.
task_id='t4'
assigns a unique identifier to this task.
to='test@mail.com'
specifies the recipient of the email.
- The email will have the subject “Alert Mail” and the body “Mail Test”.
dag=dag
links the email task to the specified DAG (Directed Acyclic Graph).
How to Send Emails using Airflow EmailOperator
Email Automation feature helps business stakeholders to send logs on time, provide alerts on error files, and share results for Data Analysis. It further helps improve engagement and creates a better experience for the recipient. Also, by automating email, the recipient timely receives a notification about the task specifying if the data pipeline failed or is still running. Overall, the process helps save time and reduces the manual burden of experts.
For testing the email operator job, one must add a DAG file to run the python function. Once the python function is well-executed, the EmailOperator Airflow will send the email to the recipient. To perform this function properly, one must install Apache Airflow or Ubuntu in the virtual machine. Follow the below-listed steps to send an email from Airflow using the Airflow EmailOperator.
Step 1: Login to the Gmail Account
Change the Google Accounts Settings and allow it to use less secure apps before you begin using Airflow. This step is necessary so that Google allows access to your code. Once the code is live, you can switch back to the changed settings for security reasons.
To change the settings, go to the Google Account => Setting => Less secure app access => Turn it on.
Python supports the smtplib module that defines the SMTP Client Session Object allowed to send mail to any machine over the Internet. It uses an SMTP or ESMTP listener daemon to forward the alert or message.
Integrate Asana to Databricks
Integrate Aftership to BigQuery
Integrate Confluent Cloud to MySQL
Step 2: Enable IMAP for the SMTP
- Go to the settings using the gear symbol in your Gmail Account.
- Now, click on the ‘Forwarding and POP/IMAP‘ tab under settings.
- Lastly, Enable the IMAP radio button from the sub-section “IMAP access“.
Step 3: Update SMTP details in Airflow
In this step, to use the Airflow EmailOperator, you need to update SMTP details in the airflow/ airflow /airflow/airflow.cfg config file.
- Now using any editor, open the Airflow.cfg file.
- Add the following configuration in [smtp]
# If you want airflow to send emails on retries, failure, and you want to use
# the airflow.utils.email.send_email_smtp function, you have to configure an
# smtp server here
smtp_host = smtp.gmail.com
smtp_starttls = True
smtp_ssl = False
# Example: smtp_user = airflow
smtp_user = your gmail id
# Example: smtp_password = airflow
# smtp_password = your gmail password
smtp_port = 25
smtp_mail_from = give the email, from which email id you want send the mails(your mail id )
- Use the following command to create a DAG file in /airflow/dags folder:
sudo gedit emailoperator_demo.py
- Once the DAG file is created, it is time to write a DAG file.
Step 4: Import modules for the Workflow
You now need to import Python dependencies for the workflow. You can refer to the following code:
import airflow
from datetime import timedelta
from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.python_operator import PythonOperator
from airflow.operators.email_operator import EmailOperator
Step 5: Define the Default Arguments
Next up, you can define the default and DAG-specific arguments:
default_args = {
'owner': 'airflow',
#'start_date': airflow.utils.dates.days_ago(2),
# 'end_date': datetime(),
# 'depends_on_past': False,
# 'email': ['airflow@example.com'],
# 'email_on_failure': False,
#'email_on_retry': False,
# If a task fails, retry it once after waiting
# at least 5 minutes
#'retries': 1, 'retry_delay': timedelta(minutes=5),
}
Step 6: Instantiate a DAG
In this step, generate a DAG name, set settings, and configure the schedule.
dag_email = DAG(
dag_id = 'emailoperator_demo',
default_args=default_args,
schedule_interval='@once',
dagrun_timeout=timedelta(minutes=60),
description='use case of email operator in airflow',
start_date = airflow.utils.dates.days_ago(1))
- A DAG (Directed Acyclic Graph) named
emailoperator_demo
is being created in Airflow.
default_args=default_args
sets the default parameters for the DAG tasks.
schedule_interval='@once'
ensures the DAG runs only once upon creation.
dagrun_timeout=timedelta(minutes=60)
limits the DAG run time to 60 minutes.
start_date=airflow.utils.dates.days_ago(1)
sets the DAG’s start date to one day ago from the current date.
Step 7: Setting up Tasks
This step involves setting up workflow tasks. Below are the task codes generated by instantiating.
def start_task():
print("task started")
start_task = PythonOperator(
task_id='executetask',
python_callable=start_task,
dag=dag_email)
send_email = EmailOperator(
task_id='send_email',
to='vamshikrishnapamucv@gmail.com',
subject='ingestion complete',
html_content="Date: {{ ds }}",
dag=dag_email)
- A function
start_task()
is defined to print “task started”.
start_task
is a PythonOperator
that runs the start_task
function within the Airflow DAG dag_email
.
send_email
is an EmailOperator
that sends an email upon task completion.
- The email’s subject is “ingestion complete”, and its content includes the current date (
{{ ds }}
).
- Both tasks are part of the
dag_email
workflow.
Step 8: Set Dependencies
Set dependencies for the tasks that need to be executed. A DAG file only organizes the task. Follow these ways to define dependencies between tasks and create a complete data pipeline:
send_email.set_upstream(start_task)
if __name__ == "__main__":
dag_spark.cli()
send_email.set_upstream(start_task)
establishes a dependency, ensuring the email task runs only after the start_task
is complete.
- The
if __name__ == "__main__":
condition checks if the script is being executed directly.
dag_spark.cli()
starts the command-line interface for the dag_spark
, allowing for interaction and management of the DAG.
- This code effectively sets up task execution order and allows running the DAG from the command line.
As per the code, the send email task will execute after localspark_submit.
Step 9: Task Verification
- Unpause the email_operator_demo dag file as shown in the screenshot.
- Select the “email_operator_demo” and look for the dag log file. Now, select Graph View. Here you will be represented two tasks – execute_task python task and send_email task.
- Click on the execute task from the graph view to see how the query ran in the log file. As you click, a new window will display on the screen.
- Now, to check all the log files, select the log tab. When you click on the log tab, a list of active tasks will show up on your screen.
- Here is how the task output will display. For the Send_email task, follow the same steps.
- Here is how the send email task output will display when an email is sent.
How to use EmailOperator for Handling failures?
You can also define custom callbacks using the on_failure_callback for sending emails when a task fails. Here’s an example using the send_smtp_notification function:
rom airflow.providers.smtp.notifications.smtp import send_smtp_notification
on_failure_callback=[
send_smtp_notification(
from_email='alert@example.com',
to='user@example.com',
subject='Task Failed',
html_content='<p>The task has failed.</p>'
)
]
- The code imports the
send_smtp_notification
function from Airflow’s SMTP provider to send email alerts.
on_failure_callback
is defined as a list that will trigger actions when a task fails.
send_smtp_notification
is called to set up the email notification details.
from_email='alert@example.com'
specifies the sender’s email address.
- The email is sent to
user@example.com
with the subject “Task Failed” and a message indicating that the task has failed.
Migrate Data seamlessly Within Minutes!
Best Practices for Email Notifications in Airflow
- Set Up Reliable Email Configurations: Before using the EmailOperator, ensure that you’ve configured your SMTP (Simple Mail Transfer Protocol) settings correctly in the Airflow airflow.cfg file.
- Set Up Callbacks for Dynamic Notifications: Use conditional logic (
on_failure_callback,
on_success_callback
) to trigger emails only when specific conditions are met.
- Avoid Duplicate Notifications: This can be achieved by configuring alerts at the appropriate level (DAG or task) and using conditional logic within your callback functions.
- Limit Email Frequency with SLAs: Use the
sla_miss_callback
to trigger email notifications only when tasks exceed a certain execution time.
- Implement Error Handling: Wrap the EmailOperator in try-except blocks or use Airflow’s retry mechanisms to resend emails in case of temporary failures.
Conclusion
In conclusion, Apache Airflow’s EmailOperator is a powerful feature for automating task-related email notifications, ensuring timely updates for stakeholders, and improving workflow efficiency. By incorporating Airflow’s email automation, businesses can enhance engagement and maintain seamless communication across teams.
As your data needs expand with business growth, managing and integrating data from various sources becomes critical. Leveraging cloud-based ETL tools like Hevo Data can simplify this process, automating data integration and transformation and enabling your business to focus on analytics and decision-making without the heavy engineering overhead.
Frequently Asked Questions
1. Do people still use Airflow?
Yes, Airflow is still widely used for orchestrating complex workflows and ETL pipelines, especially in data engineering and data science.
2. How to use Branchpythonoperator in Airflow?
Use BranchPythonOperator
to choose between multiple downstream tasks based on logic in a Python function. It returns the task ID(s) to execute next.
3. What is the difference between Pythonoperators and Bashoperators?
PythonOperator runs Python code, while BashOperator runs shell commands. Both are used for task automation in Airflow but execute different types of code.
Hitesh is a skilled freelance writer in the data industry, known for his engaging content on data analytics, machine learning, AI, big data, and business intelligence. With a robust Linux and Cloud Computing background, he combines analytical thinking and problem-solving prowess to deliver cutting-edge insights. Hitesh leverages his Docker, Kubernetes, AWS, and Azure expertise to architect scalable data solutions that drive business growth.