Presented by Ash Berlin-Taylor, Director of Airflow Engineering at Astronomer
Airflow 2.2.0 is finally almost here! The new release combines two new big features and a whole lot of small quality of life improvements to make Airflow even more powerful, and also fixes some long-standing complaints.
Airflow 2.2.0 new features
AIP stands for Airflow Improvement Proposal. Any kind of big architectural change or a fundamental change to the way Airflow operates goes through the Airflow improvement proposal process and a vote. It ensures that the big fundamental changes get by in front of the community.
1. AIP-39: Custom Timetables
- Schedule where you couldn’t go before!
- Cron expressions only got us as far as regular time intervals
- For example, daily Monday-Friday (but not weekend) wasn’t possible.
- Full back-compatibility maintained,
schedule_interval
is not going away - Timetables also introduce explicit “data interval” - super useful when looking at a given data for a specific period of time
- Now possible to draw Friday data on Saturday or any other funky interval.
-
No more “why didn’t my dag run yet?”
- The concept of “execution_date” was confusing to every new user, so now it is deprecated! In its place there is:
logical_date
(aka execution_date)data_interval_start
(same value as execution_date for built in)data_interval_end
(same value asnext_execution_date, at least for the built-in Timetables)
- The concept of “execution_date” was confusing to every new user, so now it is deprecated! In its place there is:
-
Pluggable timetables! Airflow 2.2 ships with a few built-in timetables that mirror the behavior of schedule_interval.
You can also add your own timetable! Example timetable:
class RunAtTimetable(Timetable):
def __init__(self, cron: str, timezone: Timezone) -> None:
self._expression = cron_presets.get(cron, cron)
self._timezone = timezone
def infer_manual_data_interval(self, *, run_after: DateTime) -> DataInterval:
return DataInterval(None, None)
def next_dagrun_info(self, *,
last_automated_data_interval: DataInterval | None,
restriction: TimeRestriction) -> DagRunInfo | None:
# TODO: handle restriction.latest and restriction.catchup
when = last_automated_data_interval or restriction.earliest
cron = croniter(self._expression, start_time=when)
scheduled = cron.get_next(datetime.datetime)
return make_aware(scheduled.in_timezone(self._timezone))
-
Limitations
- Should return same result every time it’s called (no HTTP requests please - event triggering coming in future)
- Timetables “evaluated” inside scheduler when creating DagRuns, to keep it fast and error-free
-
NYSE trading timetable (Astronomer customers only!)
from astronomer.timetables.trading_hours \
import USTradingHoursTimetable
with DAG(timetable=USTradingHoursTimetable()):
@task.python
def fetch_daily_trades():
2. AIP-40: Deferrable Tasks
Allows tasks or sensors to free up worker resources when waiting for external systems/events.
-
Ideal use case: submit then poll operators
- Airbnb introduced smart sensors, a first tackle of this issue
- Deferrable task is a great for anything that submits a job to external system then polls for status (not just sensors)
- Does not consume a worker slot while in deferral mode - instead, runs hundreds at once in an async process
- Uses fewer resources, and is more reliable
- Doesn’t need a DAG running
-
Advantages of async
- New component!
- Async operators:
-
DateTimeSensorAsync
-
TimeDeltaSensorAsync
-
Astronomer customers only:
- DatabricksRunNowOperatorAsync
- DatabricksSubmitRunOperatorAsync
- HttpSensorAsync
- ExternalTaskSensorAsync
-
3. @task.docker decorator
@task.docker(image='python:3.9-slim-buster', multiple_outputs=True)
def transform(order_data_dict: dict):
total_order_value = 0
for value in order_data_dict.values():
total_order_value += value
return {"total_order_value": total_order_value}
4. Other features
- Validation of DAG params
The building blocks for true parameterized DAGs
with DAG(
'my_dag',
params: {
# a int param with default value
'int_param': Param(10, type='integer', minimum=0, maximum=20),
# a mandatory str param
'str_param': Param(type='string', minLength=2, maxLength=4),
# a param which can be None as well
'dummy_param': Param(type=['null', 'number', 'string']),
# i.e. no data or type validations
'old_param': 'old_way_of_passing',
# i.e. no data or type validations
'simple_param': Param('im_just_like_old_param'),
'email_param': Param(
default='example@example.com',
type='string',
format='idn-email',
minLength=5,
maxLength=255,
),
}
):
- Airflow standalone Run all the airflow components (migrations, scheduler, webserver, triggerer, etc) directly without a docker.
Thank you for your attention and see you on the day of the release!