Apache Airflow Tips
Here are a few Apache Airflow tips I sent to a collegue:
Resources
First off, I always recommend this book from authors Bas Harenslak and Julian Rutger de Ruiter:
It's a great Airflow book, chapters 8 Building Custom Components & 9 Testing are especially pertinent.
Bas also has several good resources related to testing in Airflow:
- Video: Testing Airflow workflows - ensuring your DAGs work before going into production
- Article: Testing and debugging Apache Airflow
- Repos:
Airflow 2 Structure
In moving from Airflow 1 to 2, I've realized that many of the API-based Operators I wrote should've really been split into a Hook for the API portion, and an Operator for any specific use case of something which needs to use the API (Hook).
So for example:
oanda_to_xxx_operator.py # (API code + task code) → oanda_hook.py (API code only) + oanda_to_xxx_operator.py (task code only)
servicenow_ #...
samsara_ #...
greenroad_ #...
This helps reinforce the Single Responsibility principle. And supports scenarios where an API can fulfill multiple distinct use cases (different Operators which do different things).
Also, in Airflow 2 the "plugins" directory is not recommended any longer.
"It is no longer considered best practice to use the Airflow Plugins mechanism for hooks, operators or sensors. Please see this guide on the best way to implement these." -- from Using Apache Airflow Plugins - Hooks & Operators
"Custom hooks and operators are a powerful way to extend Airflow to meet your needs. There is however some confusion on the best way to implement them. According to the Airflow documentation, they can be added using Airflow’s Plugins mechanism. This however, over complicates the issue and leads to confusion for many people. Airflow is even considering deprecating using the Plugins mechanism for hooks and operators going forward." -- from Importing Custom Hooks & Operators
This reinforces the idea from the book that custom Hooks, Operators, Sensors, etc. should be in their own Python package. The package can be a separate repo, or incorporated into the same repo with the DAGs. Likewise, any tests related to the package can live in the package repo, or in the same repo as the DAGs if that's where the package resides.
Hook Design
I like to peruse the existing providers Hooks source code (airflow/airflow/providers/*/hooks) examples to get ideas of how a Hook can be modeled. The "when in Rome, do as the Romans do" sentiment.
If you're using the requests library for http requests, one pattern I saw somewhere which I've adopted, is to split out the Session and base url when getting a connection. This is helpful if you need to add to the Session and also when crafting end-points off the base url.
def get_conn(self, headers: Optional[Dict[Any, Any]] = None) -> (Session, str):
"""Returns a requests Session and the Xyz API base URL.
:param headers: Any optional headers to use
:type headers: Optional[Dict[Any, Any]]
:return: The request Session and the Xyz API base URL
:rtype: (Session, str)
"""
if self._session is None:
# Get the connection
config = self.get_connection(self.http_conn_id)
if config.conn_type != 'http':
raise ValueError(f'connection type for {self.http_conn_id} must be http, {config.conn_type} found')
# Build the session
self._session = requests.Session()
if config.login:
self._session.auth = (config.login, config.password)
else:
if config.extra:
try:
self._session.headers.update(config.extra_dejson)
except TypeError:
self.log.warning('Connection to %s has invalid extra field.', config.host)
# Build the base url
if self._base_url is None:
schema = config.schema or self.DEFAULT_SCHEMA
host = config.host or self.DEFAULT_HOST
self._base_url = f"{schema}://{host}"
if headers:
self._session.headers.update(headers)
return self._session, self._base_url
Another pattern I use often, I've borrowed from Ruby. Many enumerables in Ruby support a method which returns all of the data, and a method which yields each result or row. This is useful with dealing with large datasets, or if you need to provide a progress status, or also if you want to deal with failures on an individual or batch basis. In python this can also be done using yield. For example, one Hook could have methods like this:
def request_all(self, table_name: str, fields: Optional[str] = None, start_datetime=None, end_datetime=None)
def request_each(self, table_name: str, fields: Optional[str] = None, start_datetime=None, end_datetime=None)
def request_each_with_index(self, table_name: str, fields: Optional[str] = None, start_datetime=None, end_datetime=None)
All returns all, each yields for each request, and each_with_index yields for each batch. So, if you specify a batch size of 1000, index 1 will be the first batch.
If APIs are more broad in their scope, and less singular, then you would have specific end-points that could call the generic request method, or pagination method.
The API-based Hook's only responsibility is dealing with the API. That could also include accommodating (or hiding) any complexities of the returned JSON, pagination, etc.
// generic pagination method used by all end-points:
def get_paginated_data(self, session: Session, url: str, data_key: str = 'data') -> List[Dict[str, any]]
// specific end-points:
def get_fleet_safety_events(self, start_time: str, end_time: str) -> List[Dict[str, any]]
// ...
// dealing with API complexities...
def fleet_safety_events_url_with_start_end_times(base_url: str, start_time: str, end_time: str) -> str
def decode_content_and_deserialize_json(content: bytes) -> Dict
def parse_error_from_json_content(content: bytes) -> str
def log_and_raise_page_response_error(self, page: int, response: Response) -> None
Testing
When testing Hooks, Operators et. al., I usually consider two approaches:
- Testing against a test environment (e.g. test database with test data, sandbox environment, etc.)
- Mocking or stubbing resources
Of course there are also basic tests which do not require a test environment or mocking. For example, if I want to test how a library works to improve my understanding of it.
I usually default to #1 initially if a test environment is available. It's much faster. However, there are drawbacks. Tests cannot be run by a CI/CD since it would need access to the test environment. In some cases, this may be acceptable if the team's policy is to make sure tests run and pass locally. Test can also break if data changes in the test environment. In some scenarios, the test data can be dropped or truncated for each test to avoid that. Or, tests can be written for data counts known at the time of the test being written. assert len(data) > 300.
Mocking or stubbing resources removes the reliance on a test environment and even API hosts. It also allows the CI/CD to run the tests. This takes longer to code, and can be more tedious. It can lead to "mock the world" fatigue. Nonetheless, it's a good long-term test strategy. Sometimes, I'll start with #1 to prove something works, then later go back and refactor to #2. I'll often grab API results via Postman, and save them as fixtures to use in the tests. You can create an API stub which mimics requests and even failures. Sometimes I want my API Hook to know the error JSON format, and parse it to raise a meaningful Python error. This can be mocked in tests. Force an error in Postman and save the error JSON as a fixture.
One thing I don't like about how Airflow was designed, is the tight coupling with Connections, Variables, etc. The pattern they use is that a Hook or Operator usually requires a connection id be set in the constructor. The connection is usually instantiated inside the Hook/Operator. This is unfortunate since it tightly binds the class to a running instance of Airflow and access to Airflow's defined Connections, Variables, etc. Alternatively, they should've used inversion of control and injected the dependencies into the class constructors. This would allow tests to mock the Connection and the test runner wouldn't need a running instance of Airflow to run properly.
Simple Example Test Suite
Here's a small/simple Hook test example. I usually create a factory helper function for each suite.
import logging
import pytest
from datetime import datetime
from airflow_mycompany.hooks.xyz_hook import XyzHook
test_logger = logging.getLogger()
now = datetime.now()
def create_xyz_hook():
return XyzHook('xyz_api')
@pytest.mark.skip("Skipping since previously passed.")
def test_get_conn():
hook = create_xyz_hook()
session, base_url = hook.get_conn()
assert session is not None
assert base_url is not None
assert base_url != ''
test_logger.debug(f"base_url = {base_url}")
@pytest.mark.skip("Skipping since previously passed.")
def test_endpoint1_url_with_start_end_times():
hook = create_xyz_hook()
_session, base_url = hook.get_conn()
start_time = '2021-03-29T00:00:00-04:00'
end_time = '2021-04-09T23:59:59-04:00'
expected = f"https://api.xyz-co.com/endpoint1?startTime={start_time}&endTime={end_time}"
actual = XyzHook.endpoint1_url_with_start_end_times(base_url, start_time, end_time)
assert expected == actual
@pytest.mark.skip("Skipping since previously passed.")
def test_get_endpoint1_paginated_data():
hook = create_xyz_hook()
start_time = '2021-03-29T00:00:00-04:00'
end_time = '2021-04-09T23:59:59-04:00'
data = hook.get_endpoint1(start_time, end_time)
expected_count = 350
actual_count = len(data)
test_logger.debug(f"actual_count = {actual_count}")
assert actual_count >= expected_count
Pytest Tips
Helpers
Helpers in pytest are defined in the tests/conftest.py file. In order to use them, you will need to add the dependency to your requirements.txt file:
pytest-helpers-namespace~=2019.1.8
Here's an example helper:
import datetime
import pytest
from airflow.models import DAG
pytest_plugins = ["helpers_namespace"]
@pytest.fixture
def test_dag():
"""Airflow DAG for testing."""
return DAG(
"test_dag", start_date=datetime.datetime(2020, 1, 1), schedule_interval=datetime.timedelta(days=1)
)
@pytest.helpers.register
def run_task(task, dag):
"""Run an Airflow task."""
dag.clear()
task.run(start_date=dag.start_date, end_date=dag.start_date)
Enable logging output
In order to enable logging output for your tests, you'll need to create or modify your pytest.ini file:
[pytest]
log_cli = True
log_cli_level = DEBUG
Logging inside your tests
Once logging has been enabled for pytest, you can use logging inside your tests like this:
import logging
import pytest
logger = logging.getLogger()
def test_example():
expected = #...
actual = #...
logger.debug("expected:\n\n%s\n" % expected)
logger.debug("actual:\n\n%s\n" % actual)
assert expected == actual
Logging inside your test subjects
If you do any logging inside your custom Airflow Operator / Hook / Sensor, Airflow will consume those log messages. In order to circumvent this, you can create a new logger in your test and dependency-inject it into your test subject. For example, if you have a custom Operator, you can add a logger parameter to the constructor and a new log property which logs to the injected logger, or Airflow's logger:
from logging import Logger
from typing import Optional
from airflow.models import BaseOperator
class MyCustomOperator(BaseOperator):
def __init__(
self,
# other params here...
logger: Optional[Logger] = None,
*args,
**kwargs) -> None:
super(MyCustomOperator, self).__init__(*args, **kwargs)
# Set logger if provided
if logger is not None:
self._logger = logger
@property
def log(self) -> Logger:
if self._logger is not None:
return self._logger
else:
return self._log
def execute(self, context: any):
self.log.info("Executing...")
That's all I can think of right now. I hope this helps!
More Python Articles
- Advice for Python Beginners
- Built a touch screen Raspberry Pi 3 to interface with my car’s OBD II via Bluetooth
- Lane Detection using Python
- Sending an email attachment using Python
- Web Scraping with Python for a Friend
- Different ways of coding a bar chart
- PyQueryableList: LINQ's Queryable List for Python
- IronPython WinForms Example: HTML Encoder
- Replace Text within a Word document using IronPython
- Converting a Word document to text using IronPython
- Porting C# to IronPython – Example 1
- Fixing the PyAverager sample on ADC
- IronPython revisited