WEBINARS

Everything you Need to Know About Airflow 2.2

Watch Video On Demand

Hosted By

  • Ash Berlin-Taylor

Presented by Ash Berlin-Taylor, Director of Airflow Engineering at Astronomer

Airflow 2.2.0 is finally almost here! The new release combines two new big features and a whole lot of small quality of life improvements to make Airflow even more powerful, and also fixes some long-standing complaints.

Airflow 2.2.0 new features

everything-you-need-to-know-about-airflow-2-2-image2

AIP stands for Airflow Improvement Proposal. Any kind of big architectural change or a fundamental change to the way Airflow operates goes through the Airflow improvement proposal process and a vote. It ensures that the big fundamental changes get by in front of the community.

1. AIP-39: Custom Timetables

everything-you-need-to-know-about-airflow-2-2-image5

You can also add your own timetable! Example timetable:

class RunAtTimetable(Timetable):
   def __init__(self, cron: str, timezone: Timezone) -> None:
       self._expression = cron_presets.get(cron, cron)
       self._timezone = timezone

   def infer_manual_data_interval(self, *, run_after: DateTime) -> DataInterval:
       return DataInterval(None, None)

   def next_dagrun_info(self, *,
                        last_automated_data_interval: DataInterval | None,
                        restriction: TimeRestriction) -> DagRunInfo | None:
       # TODO: handle restriction.latest and restriction.catchup
       when = last_automated_data_interval or restriction.earliest
       cron = croniter(self._expression, start_time=when)
       scheduled = cron.get_next(datetime.datetime)

       return make_aware(scheduled.in_timezone(self._timezone))
from astronomer.timetables.trading_hours \
    import USTradingHoursTimetable

    with DAG(timetable=USTradingHoursTimetable()):
       @task.python
       def fetch_daily_trades():

2. AIP-40: Deferrable Tasks

Allows tasks or sensors to free up worker resources when waiting for external systems/events.

everything-you-need-to-know-about-airflow-2-2-image1

everything-you-need-to-know-about-airflow-2-2-image3

everything-you-need-to-know-about-airflow-2-2-image4

3. @task.docker decorator

@task.docker(image='python:3.9-slim-buster', multiple_outputs=True)
def transform(order_data_dict: dict):
   total_order_value = 0

   for value in order_data_dict.values():
       total_order_value += value

   return {"total_order_value": total_order_value}

4. Other features

The building blocks for true parameterized DAGs

with DAG(
  'my_dag',
  params: {
    # a int param with default value
    'int_param': Param(10, type='integer', minimum=0, maximum=20),
    # a mandatory str param
    'str_param': Param(type='string', minLength=2, maxLength=4),  
    # a param which can be None as well
    'dummy_param': Param(type=['null', 'number', 'string']),
    # i.e. no data or type validations
    'old_param': 'old_way_of_passing',
    # i.e. no data or type validations
    'simple_param': Param('im_just_like_old_param'),
    'email_param': Param(
        default='example@example.com',
        type='string',
        format='idn-email',
        minLength=5,
        maxLength=255,
    ),
  }
):

Thank you for your attention and see you on the day of the release!

Build, run, & observe your data workflows.
All in one place.

Get $300 in free credits during your 14-day trial.

Get Started Free