Each time we deploy our new software, we will check the log file twice a day to see whether there is an issue or exception in the following one or two weeks. One colleague asked me is there a way to monitor the errors and send alert automatically if a certain error occurs more than 3 times. I am following the Airflow course now, it's a perfect use case to build a data pipeline with Airflow to monitor the exceptions.

What's Airflow?

Airflow is an open-source workflow management platform, It started at Airbnb in October 2014 and later was made open-source, becoming an Apache Incubator project in March 2016. Airflow is designed under the principle of "configuration as code". [1]

In Airflow, a DAG — or a Directed Acyclic Graph — is a collection of all the tasks you want to run, organized in a way that reflects their relationships and dependencies.[2]

Airflow uses Python language to create its workflow/DAG file, it's quite convenient and powerful for the developer.

Analysis

Our log files are saved in the server, there are several log files. We can fetch them by the sftp command. After downloading all the log files into one local folder, we can use the grep command to extract all lines containing exceptions or errors. The following is an example of an error log:

/usr/local/airflow/data/20200723/loginApp.log:140851:[[]] 23 Jul 2020/13:23:19,196 ERROR SessionId : u0UkvLFDNMsMIcbuOzo86Lq8OcU= [loginApp] dao.AbstractSoapDao - getNotificationStatus - service Exception: java.net.SocketTimeoutException: Read timed out

Next, we need to parse the error message line by line and extract the fields. Like the above example, we want to know the file name, line number, date, time, session id, app name, module name, and error message. We will extract all this information into a database table, later on, we can use the SQL query to aggregate the information. If any type of error happens more than 3 times, it will trigger sending an email to the specified mailbox.

The whole process is quite straightforward as following:

None
Workflow to monitor error logs

Airflow Operators

Airflow provides a lot of useful operators. An operator is a single task, which provides a simple way to implement certain functionality. For example, BashOperator can execute a Bash script, command, or set of commands. SFTPOperator can access the server via an SSH session. Furthermore, Airflow allows parallelism amongst tasks, since an operator corresponds to a single task, which means all the operators can run in parallel. Airflow also provides a very simple way to define dependency and concurrency between tasks, we will talk about it later.

Implementation

Normally, Airflow is running in a docker container. Apache publishes Airflow images in Docker Hub. A more popular Airflow image is released by Puckel which is configurated well and ready to use. We can retrieve the docker file and all configuration files from Puckel's Github repository.

After installing Docker client and pulling the Puckel's repository, run the following command line to start the Airflow server:

docker-compose -f ./docker-compose-LocalExecutor.yml up -d
None
First time to run Airflow

When it's the first time to run the script, it will download Puckel's Airflow image and Postgres image from Docker Hub, then start two docker containers.

Airflow has a nice UI, it can be accessed from http://localhost:8080.

None
Airflow UI portal

From the Airflow UI portal, it can trigger a DAG and show the status of the tasks currently running.

Let's start to create a DAG file. It's pretty easy to create a new DAG. Firstly, we define some default arguments, then instantiate a DAG class with a DAG name monitor_errors, the DAG name will be shown in Airflow UI.

The first step in the workflow is to download all the log files from the server. Airflow supports concurrency of running tasks. We create one downloading task for one log file, all the tasks can be running in parallel, and we add all the tasks into one list. SFTPOperator needs an SSH connection id, we will config it in the Airflow portal before running the workflow.

After that, we can refresh the Airflow UI to load our DAG file. Now we can see our new DAG - monitor_errors - appearing on the list:

None
New DAG showing in Airflow

Click the DAG name, it will show the graph view, we can see all the download tasks here:

None
All download tasks in the graph view

Before we trigger a DAG batch, we need to config the SSH connection, so that SFTPOperator can use this connection. Click the Admin menu then select Connections to create a new SSH connection.

None
Create an SSH connection

To access an SSH server without inputting a password, it needs to use the public key to log in. Assume the public key has already been put into server and the private key is located in /usr/local/airflow/.ssh/id_rsa. Leave Password field empty, and put the following JSON data into the Extra field.

{
  "key_file": "/usr/local/airflow/.ssh/id_rsa",
  "timeout": "10",
  "compress": "false",
  "no_host_key_check": "false",
  "allow_host_key_change": "false"
}

Ok, let's enable the DAG and trigger it, some tasks turn green which means they are in running state, the other tasks are remaining grey since they are in the queue.

None
Tasks are running
None
All tasks finished

When all tasks finished, they are shown in dark green. Let's check the files downloaded into the data/ folder. It will create the folder with the current date.

None
All logs are downloaded into the folder

Looks good.

Next, we will extract all lines containing "exception" in the log files then write these lines into a file(errors.txt) in the same folder. grep command can search certain text in all the files in one folder and it also can include the file name and line number in the search result.

Airflow checks the bash command return value as the task's running result. grep command will return -1 if no exception is found. Airflow treats non-zero return value as a failure task, however, it's not. No error means we're all good. We check the errors.txt file generated by grep. If the file exists, no matter it's empty or not, we will treat this task as a successful one.

None
grep exception

Refresh the DAG and trigger it again, the graph view will be updated as above. Let's check the output file errors.txt in the folder.

None
List last 5 exceptions in the errors.txt

Next, we will parse the log line by line and extract the fields we are interested in. We use a PythonOperator to do this job using a regular expression.

The extracted fields will be saved into a database for later on the queries. Airflow supports any type of database backend, it stores metadata information in the database, in this example, we will use Postgres DB as backend.

We define a PostgresOperator to create a new table in the database, it will delete the table if it's already existed. In a real scenario, we may append data into the database, but we shall be cautious if some tasks need to be rerun due to any reason, it may add duplicated data into the database.

To use the Postgres database, we need to config the connection in the Airflow portal. We can modify the existing postgres_default connection, so we don't need to specify connection id when using PostgresOperator or PostgresHook.

None
Modify postgres_default connection
None
Config postgres_default connection

Great, let's trigger the DAG again.

None
Parse the error logs

The tasks ran successfully, all the log data are parsed and stored in the database. Airflow provides a handy way to query the database. Choose "Ad Hoc Query" under the "Data Profiling" menu then type SQL query statement.

None
Ad Hoc Query
None
Error logs in Postgres database

Next, we can query the table and count the error of every type, we use another PythonOperator to query the database and generate two report files. One contains all the error records in the database, another is a statistics table to show all types of errors with occurrences in descending order.

Ok, trigger the DAG again.

None
Generate reports

Two report files are generated in the folder.

None
Two reports

In error_logs.csv, it contains all the exception records in the database.

None
Report with all exceptions

In error_stats.csv, it lists different types of errors with occurrences.

None
Report with different types of exception

At last step, we use a branch operator to check the top occurrences in the error list, if it exceeds the threshold, says 3 times, it will trigger to send an email, otherwise, end silently. We can define the threshold value in the Airflow Variables, then read the value from the code. So that we can change the threshold later without modifying the code.

None
Create a variable in Airflow

BranchPythonOperator returns the next task's name, either to send an email or do nothing. We use the EmailOperator to send an email, it provides a convenient API to specify to, subject, body fields, and easy to add attachments. And we define an empty task by DummyOperator.

To use the email operator, we need to add some configuration parameters in the YAML file. Here we define configurations for a Gmail account. You may put your password here or use App Password for your email client which provides better security.

- AIRFLOW__SMTP__SMTP_HOST=smtp.gmail.com
- AIRFLOW__SMTP__SMTP_PORT=587
- AIRFLOW__SMTP__SMTP_USER=<your-email-id>@gmail.com
- AIRFLOW__SMTP__SMTP_PASSWORD=<your-app-password>
- AIRFLOW__SMTP__SMTP_MAIL_FROM=<your-email-id>@gmail.com

So far, we create all the tasks in the workflow, we need to define the dependency among these tasks. Airflow provides a very intuitive way to describe dependencies.

dl_tasks >> grep_exception >> create_table >> parse_log >> gen_reports >> check_threshold >> [send_email, dummy_op]

Now, we finish all our coding part, let's trigger the workflow again to see the whole process.

None
Send an email when the number of any type of error exceeds the threshold

In our case, there are two types of error, both of them exceeds the threshold, it will trigger sending the email at the end. Two reports are attached to the email.

None
Email alert

We change the threshold variable to 60 and run the workflow again.

None
Change threshold to 60
None
The workflow ends without sending the email

As you can see, it doesn't trigger sending the email since the number of errors is less than 60. The workflow ends silently.

Let's go back to the DAG view.

None
The DAG view

It lists all the active or inactive DAGs and the status of each DAG, in our example, you can see, our monitor_errors DAG has 4 successful runs, and in the last run, 15 tasks are successful and 1 task is skipped which is the last dummy_op task, it's an expected result.

Now our DAG is scheduled to run every day, we can change the scheduling time as we want, e.g. every 6 hours or at a specific time every day.

Airflow is a powerful ETL tool, it's been widely used in many tier-1 companies, like Airbnb, Google, Ubisoft, Walmart, etc. And it's also supported in major cloud platforms, e.g. AWS, GCP, Azure. It plays a more and more important role in data engineering and data processing.

Code

https://github.com/kyokin78/airflow

References

[1] https://en.wikipedia.org/wiki/Apache_Airflow

[2] https://airflow.apache.org/docs/stable/concepts.html

[3] https://github.com/puckel/docker-airflow