- Source code - Github
- Author - Gavin Noronha - [email protected]
This project provides a Ubuntu (16.04) Vagrant Virtual Machine (VM) with Airflow, a data workflow management system from Airbnb.
There are Ansible scripts that automatically install the software when the VM is started.
To start the virtual machine(VM) type
vagrant up
Setup private keys
Connect to the VM
vagrant ssh airflow-master
Setup private keys
Initialize Airflow home
export AIRFLOW_HOME=~/airflow
Setup the sqlite database
airflow initdb
Change to the Airflow directory
Reduce the load on the Airflow system by setting values in airflow.cfg
parallelism = 4 dag_concurrency = 2 celeryd_concurrency = 4
Start the web server
airflow webserver -p 8080
Open a web browser to the UI at
airflow list_dags
List tasks for
DAGairflow list_tasks example_bash_operator
List tasks for
in a tree viewairflow list_tasks example_bash_operator -t
Run the
task on theexample_bash_operator
DAG todayairflow run example_bash_operator runme_0 `date +%Y-%m-%d`
Backfill a DAG
export START_DATE=$(date -d "-2 days" "+%Y-%m-%d") airflow backfill -s $START_DATE example_bash_operator
Clear the history of DAG runs
airflow clear example_bash_operator
Go to the Airflow config directory
cd ~/airflow
Set the airflow dags directory in airflow.cfg by change the line:
dags_folder = /vagrant/airflow/dags
Remove example dags
load_examples = False
Restart the web server
airflow webserver -p 8080
Run the dynamic_dags task
airflow list_dags
Run the dag
airflow trigger_dag dynamic_dags
Run the scheduler to actually run the dag
airflow scheduler
Change to the airflow directory
cd /vagrant/airflow
Set airflow environment
source set_airflow_env.sh
Run airflow without any logging messages
Edit file ~/airflow/airflow.cfg
Set the following:
dags_folder = /vagrant/airflow/dags load_examples = False
Start the webserver & scheduler by running the following
tmuxp load /vagrant/scripts/tmux-webserver-scheduler.yaml
Follow the instructions here
Follow the instructions here
Start the RabbitMQ in a Docker container
export RMQ_IMG=rabbitmq:3.6.10-management docker run -d --rm --hostname airflow-rmq \ --name airflow-rmq -p -p 5672:5672 $RMQ_IMG
Display the list of running Docker instances
docker ps
Go to the RabbitMQ dashboard at
Login using guest/guest
Connect to the RabbitMQ Docker container
export RMQ=$(docker ps -aq --filter name=airflow-rmq)
List queues
docker exec -ti $RMQ rabbitmqctl list_queues
Stop RabbitMQ
docker stop $RMQ
The RabbitMQ web site demonstrates how to connect using Python and the Pika library.
List queues
docker exec -ti $RMQ rabbitmqctl list_queues
Send a message to a RabbitMQ queue called hello
python rmq-send.py
Receive a message from RabbitMQ queue called hello
python rmq-receive.py
List queues displaying the hello queue
docker exec -ti $RMQ rabbitmqctl list_queues
Stop the app
docker exec -ti $RMQ rabbitmqctl stop_app
Start the app
docker exec -ti $RMQ rabbitmqctl start_app
List queues and the hello queue is not displayed
docker exec -ti $RMQ rabbitmqctl list_queues
Start the Celery worker
export PYTHONPATH=/vagrant/scripts celery -A tasks worker --loglevel=info
Call the task
export PYTHONPATH=/vagrant/scripts python -c "from tasks import add; add.delay(2, 3)"
Start the Postgres in the Docker container with the name
export PG_IMG=postgres:9.6.3 export PGPASSWORD=airflow_pg_pass docker run -d --rm --name airflow-pg -p \ -e POSTGRES_PASSWORD=$PGPASSWORD $PG_IMG export PG=$(docker ps -aq --filter name=airflow-pg)
List the Docker container
docker ps --filter id=$PG
Stop Postgres
docker stop $PG
Start the Postres database before running these steps
Connect to the database using psql and create the database test
docker exec -ti $PG psql -U postgres -c "create database test"
Create table test in database test only using Psycopg2
export PGHOST=localhost python /vagrant/scripts/pg-psycopg2.py
Connect to database test using Psycopg2 and SQLAlchemy
python pg-sqlalchemy-read.py
Connect to the postgres database again
docker exec -ti $PG psql -U postgres
List the databases
Connect to the test database
\c test
List objects in the test database
Select all rows from the test database
select id, num, data from test;
Quit the psql utility
Drop database test
docker exec -ti $PG psql -U postgres -c "drop database test"
Change the executor in ~/airflow.cfg file to the following values
executor = LocalExecutor
Change the sql_alchemy_conn in ~/airflow.cfg file to the following values
# Change the meta db configuration sql_alchemy_conn = postgresql+psycopg2://postgres:airflow_pg_pass@localhost/test
Change the executor in ~/airflow.cfg file to the following values
executor = CeleryExecutor
Set the following two values in ~/airflow.cfg file
broker_url = redis://localhost:6379/0 celery_result_backend = redis://localhost:6379/0
Initialze the test database as the Airflow database
airflow initdb
tmuxp load /vagrant/scripts/tmux-airflow-daemons.yaml
5. View the Airflow web server at
6. View the Airflow flower server at
Copy Airflow configuration
rsync -zvh airflow/airflow*.cfg worker:~/airflow/
Initialize Airflow home export AIRFLOW_HOME=~/airflow
Run airflow worker airflow worker
Stop redis sudo systemctl stop redis
Modify Redis configuration sudo vim /etc/redis/redis.conf
Change bind line to the following bind
Start Redis sudo systemctl start redis
- Create postgres Docker container
- Create test database
- Run airflow init
- Run sudo supervisord
- Need to setup logs in airflow-scheduler.conf, airflow-webserver.conf, airflow-worker.conf
9. Setup netdata for monitoring
Edit netdata configuration
vim /opt/netdata/etc/netdata/netdata.conf
Show status of netdata
sudo systemctl status netdata
Start netdata
sudo systemctl start netdata
View the netdata at
Stop netdata
sudo systemctl stop netdata
Netdata stores data in memory and updates every second. To store hours of data without using up memory add the following
[global] update every = 10
Main documentation
Videos on Airflow
Airflow reviews
Airflow tips and tricks
- https://medium.com/handy-tech/airflow-tips-tricks-and-pitfalls-9ba53fba14eb#.i2hu0syug
- Airflow with Postgres + RabbitMQ
- Three tips on using Celery
- Building a Data Pipeline with Airflow
- https://databricks.com/blog/2016/12/08/integrating-apache-airflow-databricks-building-etl-pipelines-apache-spark.html
- http://site.clairvoyantsoft.com/installing-and-configuring-apache-airflow/
- https://gtoonstra.github.io/etl-with-airflow/principles.html
- https://cwiki.apache.org/confluence/display/AIRFLOW/Common+Pitfalls
- http://michal.karzynski.pl/blog/2017/03/19/developing-workflows-with-apache-airflow/
The following software is needed to get the software from github and run Vagrant to set up the Python development environment. The Git environment also provides an SSH client for Windows.
- Oracle VM VirtualBox
- Vagrant version 1.9 or higher
- Git