During my job at Drivy as a Data Engineer, I had the chance to write close to 100 main Airflow DAGs. In this quick blog post, I’ll share what’s it’s worth testing according to me.
Custom operators
If you’re using several times the same operator in different DAGs with a similar construction method, I would recommend about either:
- creating a custom Airflow operator thanks to the plugin mechanism
- creating a Python class that will act as a factory to create the underlying Airflow operator with the common arguments you’re using
Python logic
If you’re using a non trivial logic from a PythonOperator
, I would recommend about extracting this logic into a Python module named after the DAG ID. With this, you’ll be able to keep your Python logic away from Airflow internals and it’ll be easier to test it. You’ll just need to perform a single function call from your DAG’s PythonOperator
after.
Smoke test
Finally, the last test I would recommend writing is a smoke test that will target all DAGs. This test will make sure that:
- each DAG can be loaded by the Airflow scheduler without any failure. It’ll show in your CI environment if some DAGs expect a specific state (a CSV file to be somewhere, a network connection to be opened) to be able to be loaded or if you need to define environment / Airflow variables for example
- a single file defining multiple DAGs loads fast enough
- Airflow email alerts are properly defined on all DAGs
Here is an example test file to test this. It relies heavily on the code provided by WePay in this blog post.
# -*- coding: utf-8 -*- import unittest from airflow.models import DagBag class TestDags(unittest.TestCase): """ Generic tests that all DAGs in the repository should be able to pass. """ AIRFLOW_ALERT_EMAIL = '[email protected]' LOAD_SECOND_THRESHOLD = 2 def setUp(self): self.dagbag = DagBag() def test_dagbag_import(self): """ Verify that Airflow will be able to import all DAGs in the repository. """ self.assertFalse( len(self.dagbag.import_errors), 'There should be no DAG failures. Got: {}'.format( self.dagbag.import_errors ) ) def test_dagbag_import_time(self): """ Verify that files describing DAGs load fast enough """ stats = self.dagbag.dagbag_stats slow_files = filter(lambda d: d.duration > self.LOAD_SECOND_THRESHOLD, stats) res = ', '.join(map(lambda d: d.file[1:], slow_files)) self.assertEquals( 0, len(slow_files), 'The following files take more than {threshold}s to load: {res}'.format( threshold=self.LOAD_SECOND_THRESHOLD, res=res ) ) def test_dagbag_emails(self): """ Verify that every DAG register alerts to the appropriate email address """ for dag_id, dag in self.dagbag.dags.iteritems(): email_list = dag.default_args.get('email', []) msg = 'Alerts are not sent for DAG {id}'.format(id=dag_id) self.assertIn(self.AIRFLOW_ALERT_EMAIL, email_list, msg)
The DAG logic
I would say that it’s not worth testing an end to end DAG logic because:
- it’ll be often very hard to do as you’ll likely need various components (databases, external systems, files) and can make your test suite slow
- You should embrace the power of Airflow to define DAGs with Python code and treat them as just wiring pieces you’ve tested individually together. DAGs are not the main piece of the logic.
That said, the logic of the DAG should be tested in your dev / staging environment before running it in production if you want to avoid bad surprises.
Tests in production
Your DAGs are running happily in production without throwing error emails. Fine? Not so sure. You can sleep peacefully if you have:
- set DAG timeouts and SLA targets to be alerted if your DAGs run too slowly
- general monitoring and alerting on the Airflow servers (webserver, scheduler and workers) to make sure that they are fine
- Data quality checkers that will make sure that the data you have in production respects some predicates