Apache Airflow Tips

Software Engineering, Data Engineering 3950 views

Here are a few Apache Airflow tips I sent to a collegue:

Resources

First off, I always recommend this book from authors Bas Harenslak and Julian Rutger de Ruiter:

It's a great Airflow book, chapters 8 Building Custom Components & 9 Testing are especially pertinent.

Bas also has several good resources related to testing in Airflow:

Airflow 2 Structure

In moving from Airflow 1 to 2, I've realized that many of the API-based Operators I wrote should've really been split into a Hook for the API portion, and an Operator for any specific use case of something which needs to use the API (Hook).

So for example:

oanda_to_xxx_operator.py # (API code + task code) → oanda_hook.py (API code only) + oanda_to_xxx_operator.py (task code only)
servicenow_ #...
samsara_ #...
greenroad_ #...

This helps reinforce the Single Responsibility principle. And supports scenarios where an API can fulfill multiple distinct use cases (different Operators which do different things).

Also, in Airflow 2 the "plugins" directory is not recommended any longer.

"It is no longer considered best practice to use the Airflow Plugins mechanism for hooks, operators or sensors. Please see this guide on the best way to implement these." -- from Using Apache Airflow Plugins - Hooks & Operators
"Custom hooks and operators are a powerful way to extend Airflow to meet your needs. There is however some confusion on the best way to implement them. According to the Airflow documentation, they can be added using Airflow’s Plugins mechanism. This however, over complicates the issue and leads to confusion for many people. Airflow is even considering deprecating using the Plugins mechanism for hooks and operators going forward." -- from Importing Custom Hooks & Operators

This reinforces the idea from the book that custom Hooks, Operators, Sensors, etc. should be in their own Python package. The package can be a separate repo, or incorporated into the same repo with the DAGs. Likewise, any tests related to the package can live in the package repo, or in the same repo as the DAGs if that's where the package resides.

Hook Design

I like to peruse the existing providers Hooks source code (airflow/airflow/providers/*/hooks) examples to get ideas of how a Hook can be modeled. The "when in Rome, do as the Romans do" sentiment.

If you're using the requests library for http requests, one pattern I saw somewhere which I've adopted, is to split out the Session and base url when getting a connection. This is helpful if you need to add to the Session and also when crafting end-points off the base url.

def get_conn(self, headers: Optional[Dict[Any, Any]] = None) -> (Session, str):
    """Returns a requests Session and the Xyz API base URL.

    :param headers: Any optional headers to use
    :type headers: Optional[Dict[Any, Any]]
    :return: The request Session and the Xyz API base URL
    :rtype: (Session, str)
    """
    if self._session is None:
        # Get the connection
        config = self.get_connection(self.http_conn_id)
        if config.conn_type != 'http':
            raise ValueError(f'connection type for {self.http_conn_id} must be http, {config.conn_type} found')
        # Build the session
        self._session = requests.Session()
        if config.login:
            self._session.auth = (config.login, config.password)
        else:
            if config.extra:
                try:
                    self._session.headers.update(config.extra_dejson)
                except TypeError:
                    self.log.warning('Connection to %s has invalid extra field.', config.host)
        # Build the base url
        if self._base_url is None:
            schema = config.schema or self.DEFAULT_SCHEMA
            host = config.host or self.DEFAULT_HOST
            self._base_url = f"{schema}://{host}"

    if headers:
        self._session.headers.update(headers)

    return self._session, self._base_url

Another pattern I use often, I've borrowed from Ruby. Many enumerables in Ruby support a method which returns all of the data, and a method which yields each result or row. This is useful with dealing with large datasets, or if you need to provide a progress status, or also if you want to deal with failures on an individual or batch basis. In python this can also be done using yield. For example, one Hook could have methods like this:

def request_all(self, table_name: str, fields: Optional[str] = None, start_datetime=None, end_datetime=None)
def request_each(self, table_name: str, fields: Optional[str] = None, start_datetime=None, end_datetime=None)
def request_each_with_index(self, table_name: str, fields: Optional[str] = None, start_datetime=None, end_datetime=None)

All returns all, each yields for each request, and each_with_index yields for each batch. So, if you specify a batch size of 1000, index 1 will be the first batch.

If APIs are more broad in their scope, and less singular, then you would have specific end-points that could call the generic request method, or pagination method.

The API-based Hook's only responsibility is dealing with the API. That could also include accommodating (or hiding) any complexities of the returned JSON, pagination, etc.

// generic pagination method used by all end-points:
def get_paginated_data(self, session: Session, url: str, data_key: str = 'data') -> List[Dict[str, any]]
//  specific end-points:
def get_fleet_safety_events(self, start_time: str, end_time: str) -> List[Dict[str, any]]
// ...
// dealing with API complexities...
def fleet_safety_events_url_with_start_end_times(base_url: str, start_time: str, end_time: str) -> str
def decode_content_and_deserialize_json(content: bytes) -> Dict
def parse_error_from_json_content(content: bytes) -> str
def log_and_raise_page_response_error(self, page: int, response: Response) -> None

Testing

When testing Hooks, Operators et. al., I usually consider two approaches:

  1. Testing against a test environment (e.g. test database with test data, sandbox environment, etc.)
  2. Mocking or stubbing resources

Of course there are also basic tests which do not require a test environment or mocking. For example, if I want to test how a library works to improve my understanding of it.

I usually default to #1 initially if a test environment is available. It's much faster. However, there are drawbacks. Tests cannot be run by a CI/CD since it would need access to the test environment. In some cases, this may be acceptable if the team's policy is to make sure tests run and pass locally. Test can also break if data changes in the test environment. In some scenarios, the test data can be dropped or truncated for each test to avoid that. Or, tests can be written for data counts known at the time of the test being written. assert len(data) > 300.

Mocking or stubbing resources removes the reliance on a test environment and even API hosts. It also allows the CI/CD to run the tests. This takes longer to code, and can be more tedious. It can lead to "mock the world" fatigue. Nonetheless, it's a good long-term test strategy. Sometimes, I'll start with #1 to prove something works, then later go back and refactor to #2. I'll often grab API results via Postman, and save them as fixtures to use in the tests. You can create an API stub which mimics requests and even failures. Sometimes I want my API Hook to know the error JSON format, and parse it to raise a meaningful Python error. This can be mocked in tests. Force an error in Postman and save the error JSON as a fixture.

One thing I don't like about how Airflow was designed, is the tight coupling with Connections, Variables, etc. The pattern they use is that a Hook or Operator usually requires a connection id be set in the constructor. The connection is usually instantiated inside the Hook/Operator. This is unfortunate since it tightly binds the class to a running instance of Airflow and access to Airflow's defined Connections, Variables, etc. Alternatively, they should've used inversion of control and injected the dependencies into the class constructors. This would allow tests to mock the Connection and the test runner wouldn't need a running instance of Airflow to run properly.

Simple Example Test Suite

Here's a small/simple Hook test example. I usually create a factory helper function for each suite.

import logging
import pytest
from datetime import datetime

from airflow_mycompany.hooks.xyz_hook import XyzHook

test_logger = logging.getLogger()
now = datetime.now()


def create_xyz_hook():
    return XyzHook('xyz_api')


@pytest.mark.skip("Skipping since previously passed.")
def test_get_conn():
    hook = create_xyz_hook()
    session, base_url = hook.get_conn()

    assert session is not None
    assert base_url is not None
    assert base_url != ''

    test_logger.debug(f"base_url = {base_url}")


@pytest.mark.skip("Skipping since previously passed.")
def test_endpoint1_url_with_start_end_times():
    hook = create_xyz_hook()
    _session, base_url = hook.get_conn()

    start_time = '2021-03-29T00:00:00-04:00'
    end_time = '2021-04-09T23:59:59-04:00'

    expected = f"https://api.xyz-co.com/endpoint1?startTime={start_time}&endTime={end_time}"
    actual = XyzHook.endpoint1_url_with_start_end_times(base_url, start_time, end_time)

    assert expected == actual


@pytest.mark.skip("Skipping since previously passed.")
def test_get_endpoint1_paginated_data():
    hook = create_xyz_hook()

    start_time = '2021-03-29T00:00:00-04:00'
    end_time = '2021-04-09T23:59:59-04:00'
    data = hook.get_endpoint1(start_time, end_time)

    expected_count = 350
    actual_count = len(data)

    test_logger.debug(f"actual_count = {actual_count}")

    assert actual_count >= expected_count

Pytest Tips

Helpers

Helpers in pytest are defined in the tests/conftest.py file. In order to use them, you will need to add the dependency to your requirements.txt file:

pytest-helpers-namespace~=2019.1.8
Here's an example helper:
import datetime
import pytest
from airflow.models import DAG


pytest_plugins = ["helpers_namespace"]

@pytest.fixture
def test_dag():
    """Airflow DAG for testing."""
    return DAG(
        "test_dag", start_date=datetime.datetime(2020, 1, 1), schedule_interval=datetime.timedelta(days=1)
    )

@pytest.helpers.register
def run_task(task, dag):
    """Run an Airflow task."""
    dag.clear()
    task.run(start_date=dag.start_date, end_date=dag.start_date)

Enable logging output

In order to enable logging output for your tests, you'll need to create or modify your pytest.ini file:

[pytest]
log_cli = True
log_cli_level = DEBUG

Logging inside your tests

Once logging has been enabled for pytest, you can use logging inside your tests like this:

import logging
import pytest


logger = logging.getLogger()

def test_example():
    expected = #...
    actual = #...

    logger.debug("expected:\n\n%s\n" % expected)
    logger.debug("actual:\n\n%s\n" % actual)

    assert expected == actual

Logging inside your test subjects

If you do any logging inside your custom Airflow Operator / Hook / Sensor, Airflow will consume those log messages. In order to circumvent this, you can create a new logger in your test and dependency-inject it into your test subject. For example, if you have a custom Operator, you can add a logger parameter to the constructor and a new log property which logs to the injected logger, or Airflow's logger:

from logging import Logger
from typing import Optional
from airflow.models import BaseOperator


class MyCustomOperator(BaseOperator):

    def __init__(
            self,
            # other params here...
            logger: Optional[Logger] = None,
            *args,
            **kwargs) -> None:

        super(MyCustomOperator, self).__init__(*args, **kwargs)

        # Set logger if provided
        if logger is not None:
            self._logger = logger

    @property
    def log(self) -> Logger:
        if self._logger is not None:
            return self._logger
        else:
            return self._log

    def execute(self, context: any):
       self.log.info("Executing...")

That's all I can think of right now. I hope this helps!

More Python Articles