How to orchestrate Azure Data Explorer queries with Airflow
Azure Data Explorer (ADX) is a managed data analytics service used for performing real-time analysis of large volumes of streaming data. It's particularly useful for IoT applications, big data logging platforms, and SaaS applications.
You can use the ADX Hook and Operator which are part of the Azure provider package, to integrate ADX queries into your DAGs. In this post, we show how to make your ADX cluster work with Airflow and create a DAG that runs a query against a database in that cluster.
Prerequisites
To complete the example in this post, you need:
- A running ADX cluster with a database. See Quickstart: Create an Azure Data Explorer cluster and database for instructions. If you don't already have an ADX cluster, Azure offers a $200 credit when you sign up for a free Azure account.
- The Astro CLI.
Step 1: Configure your ADX cluster to work with Airflow
To allow Airflow to communicate with your ADX database, you need to configure service principal authentication. To do this, create and register a Microsoft Entra ID service principal, then give that principal permission to access your ADX database. See Create a Microsoft Entra ID application registration in Azure Data Explorer for more details.
Step 2: Populate your ADX database
Populate your ADX database with demo data. See Quickstart: Ingest sample data into ADX for instructions on ingesting the StormEvents
sample dataset. If you are working with an existing ADX cluster that already contains data, you can skip this step.
Step 3: Configure your Astro project
Now that you have your Azure resources configured, you can move on to setting up Airflow.
Create a new Astro project:
$ mkdir astro-adx-tutorial && cd astro-adx-tutorial $ astro dev init
Add the following line to the
requirements.txt
file of your Astro project:apache-airflow-providers-microsoft-azure
This installs the Azure provider package that contains all of the relevant ADX modules.
Run the following command to start your project in a local environment:
astro dev start
Step 4: Add an Airflow connection to ADX
Add a connection that Airflow will use to connect to ADX. In the Airflow UI, go to Admin -> Connections.
Create a new connection named adx
and choose the Azure Data Explorer
connection type. Enter the following information:
- Host: Your cluster URL
- Login: Your client ID
- Password: Your client secret
- Extra: Include at least "tenant" with your tenant ID, and "auth_method" with your chosen authentication method. The method will correspond to how you set up your service principle in Step 1. Based on the auth method, you may also need to specify "certificate" and/or "thumbprint" parameters.
For more information on setting up this connection, including available authentication methods, see the ADX hook documentation.
Your connection should look similar to this:
Step 5: Create your DAG
In your Astro project dags/
folder, create a new file called adx-pipeline.py
. Paste the following code into the file:
from airflow.models.dag import DAG
from airflow.providers.microsoft.azure.operators.adx import AzureDataExplorerQueryOperator
from datetime import datetime, timedelta
adx_query = '''StormEvents
| sort by StartTime desc
| take 10'''
with DAG('azure_data_explorer',
start_date=datetime(2020, 12, 1),
max_active_runs=1,
schedule='@daily',
default_args={
'depends_on_past': False,
'retries': 0,
},
catchup=False
) as dag:
opr_adx_query = AzureDataExplorerQueryOperator(
task_id='adx_query',
query=adx_query,
database='storm_demo',
azure_data_explorer_conn_id='adx'
)
Update the database
parameter in the AzureDataExplorerQueryOperator
to be the name of your database. If you are not working with the StormEvents
demo dataset, you can also update the adx_query
to something appropriate for your data.
Step 6: Run your DAG and review XComs
Go to the Airflow UI, unpause your azure_data_explorer
DAG, and trigger it to run the query in your ADX cluster. The results of the query will automatically be pushed to XCom. Go to Admin -> XComs to view the results.