Run ETL with Astro and CrateDB Cloud in 30min - fully up in the cloud
ETL and ELT processes are the bread and butter of data engineering: everyone has data sources and wants to gather that data in a neat and tidy SQL database like CrateDB.
With Apache Airflow®, you can orchestrate ETL/ELT processes end-to-end, fully integrated in your wider data ecosystem. And now, thanks to the Astro free trial, you can step up your game and get your pipelines up in the cloud, paired with a CrateDB managed instance which keeps you working purely in the clouds forever.
In this blog post we will dive into a powerful combination of Astro and CrateDB Cloud and walk you through a speedy basic setup so you can run an ELT/ETL pipeline today, all up in the cloud, for free, no credit card needed.
CrateDB: Distributed database for real-time analytics
CrateDB OSS is an open-source distributed SQL database designed for handling large-scale data workloads. It offers seamless scalability, fault tolerance, and real-time querying capabilities, making it a powerful solution for data-intensive applications.
CrateDB Cloud is a fully managed database as a service (DBaaS) platform that brings the power of CrateDB to the cloud. It offers a seamless and hassle-free experience, allowing you to focus on your applications rather than managing infrastructure. With CrateDB Cloud, you can easily monitor, deploy and scale your database, upgrade effortlessly without downtime, and enjoy robust security and reliable performance.
These features make CrateDB the perfect choice both to get started with your ETL/ELT pipelines orchestrated by Airflow, and also to leverage “almost-infinite” scalability when processing large amounts of data.
How to implement a simple ETL pipeline on Astro and CrateDB Cloud
This blog post shows you how to implement a simple ETL pipeline on Astro to:
- Dynamically ingest CSV files stored in a public GitHub repository
- Dynamically run transformations on that data
- Dynamically load the transformed data into a table in CrateDB Cloud
- Run a data quality check on your table
- Run a query on the table and get the results
- Fetch the query results from another task and print them to the logs
We will present this pipeline using data on possums, but of course you can easily adapt it to your use case. Just tweak the SQL queries and define your upstream data sources and you can have your organization's data flowing through the same pipeline.
Figure 1: Grid view of the ETL DAG in Airflow after it ran successfully.
This DAG uses cutting edge Airflow and CrateDB features to optimize your pipeline.
Data loading is performed using dynamic mapping over a task group (implemented in Airflow 2.5), creating a separate set of mapped tasks for each CSV file that is being extracted, transformed and loaded into CrateDB, giving you full observability, parallelism, concurrency control and allowing you to efficiently leverage Astro’s automated scaling.
Additionally, this pipeline incorporates a core principle of best practice data engineering: your result is only as good as the quality of your data, so make sure to check it! Before the final query runs, we are making sure the data does not contain errors using an operator of the SQL check operators family.
1. Get started with a free trial Astro account
It only takes 5 minutes, and you’ll get $300 in credits for free!
- Go to the Try Astro sign-up page and click on Get started.
- Sign up using either your email address, google or GitHub account.
- You will receive an email from Astro. In it click on Verify your email and a page will open up. After logging in using your method you will get a verification code, enter the code to dive into the world of Astro!
- You will be prompted to create an Organization and a Workspace. Choose any name you’d like!
Figure 2: Creating an Astro Organization and Workspace.
- Create an Astro Deployment by clicking on the Create Deployment button.
Figure 3: Click here to create an Astro Deployment.
- Choose a name for your Deployment as well as a cloud provider and region. You do not need to have an account for the cloud provider you choose and can leave the rest of the settings at their defaults! Click on Create Deployment to start the Deployment creation process.
Figure 4: Configure your Astro Deployment.
Now you are all set to run Airflow in the cloud. While the Deployment is being created (this takes a couple of minutes), let's hop over to CrateDB and get your CrateDB Cloud account.
2. Deploy a free cluster on CrateDB Cloud
In CrateDB Cloud you have the option to deploy a FREE Tier Cluster without adding any payment information, which is ideal for the first evaluation steps. It only takes a few minutes to get started:
- Sign up for a CrateDB Cloud account using your GitHub, Google, Microsoft, or custom credentials.
- If you choose custom credentials and you don’t have an account you will first need to register your login details.
- After the sign-up process is complete, you can create a new organization with a name of your choice.
- Once the organization is created, you are ready to deploy your first cluster, so hit the Deploy cluster button!
Figure 5: Click Deploy cluster to deploy your first CrateDB Cloud cluster.
Configuring your cluster settings consists of three steps: name your cluster, select a cloud provider and region (this does not need to be the same as you selected for your Astro deployment), and choose your compute settings. For our example, we choose the following setup:
- Cluster name:
cratedb-demo
- Region:
Azure East US 2
- Compute:
CRFREE
is a free tier single node cluster. If you want to try out other cluster configurations you can use the free credit available on CrateDB Cloud. The free tier single node cluster comes with 2 CPUs, 2 GiB of memory, and 8 GiB of storage.
- Cluster name:
Figure 6: Setting up the configuration of the CrateDB cluster.
- After the configuration is set up, hit the Deploy Cluster button and your cluster will be deployed shortly on. The first page on the CrateDB Cloud console shows the main information about your cluster and different ways how to connect to it from different tools and languages.
Figure 7: Overview of the CrateDB cluster and connection details.
To start creating tables and running queries, click on Open Admin UI. This opens a web administration user interface (or Admin UI) for your running cluster.
3. Set up your Astro project
With both clouds ready to run, let’s set up the Astro project containing all the code necessary for our pipeline.
Instead of following the steps below you can also clone the full project from GitHub. In that case you just need to make sure you have the latest version of the Astro CLI installed locally and then you can skip to Step 4.4 to create your environment variable to connect to CrateDB Cloud.
First, make sure you have the latest version of the Astro CLI installed locally. While you will need Docker Desktop if you want to use the full Astro CLI capabilities to develop Airflow DAGs locally, you do not need Docker in order to run your pipeline in the cloud!
To set up locally follow these steps:
- Create a new directory and run the initialization command. This command will create all the files and folders necessary to run a full Astro project!
astro dev init
- In the
requirements.txt
file add the Common SQL provider which contains several operators we will use in the Airflow DAG.
apache-airflow-providers-common-sql==1.6.0
Create a sql folder in your include folder and add the SQL statements found in this folder in the example repository. These statements will be used by the DAG to create a table, load data and query data in CrateDB Cloud.
The final piece of the puzzle is of course the DAG code! Navigate to your project root directory and run the following command. Your DAG will be delivered directly from the registry into your dags folder.
astro registry dag add astro_cratedb_elt_pipeline --version 1.0.0
Figure 8: The DAG code in the graph view.
This DAG shows several cutting-edge Airflow features, let's walk through the most exciting parts of the code!
Both the extract_data
and transform_data
tasks are defined using the TaskFlow API decorator @task
, reducing the code needed to turn a Python function into an Airflow task to 5 characters. Also note how the extract_data
task returns a pandas DataFrame which the transform_data
task takes in as an argument. This is now possible without any additional setup, thanks to built-in XCom serialization for pandas DataFrames in Airflow 2.6+.
@task
def extract_data(base_url, folder_path, file_name):
"""Extract the contents of a CSV file in a GitHub repo
and return a pandas DataFrame."""
file_url = base_url + "/main" + folder_path + f"/{file_name}"
possum_data = pd.read_csv(file_url)
return possum_data
@task
def transform_data(dataset):
"""Transform the data by dropping rows with missing values.
Return a tuple of lists of column values."""
possum_final = dataset.dropna()
return tuple(possum_final.to_dict(orient="list").values())
In our example we will ingest data from five CSV files but in real life use cases you often do not know the number of input elements you will get at any DAG run. This is where dynamic mapping comes into play!
This example DAG dynamically maps a whole task group over the list of file names retrieved from an upstream task, creating one set of sequential extract_data
, transform_data
and load_data
tasks for each file, running in parallel, inserting data at lightning speed.
@task_group
def extract_to_load(base_url, folder_path, file_name):
"""Extract data from a CSV file in a GitHub repo, transform it and
load it into CrateDB."""
extracted_data = extract_data(
base_url=base_url, folder_path=folder_path, file_name=file_name
)
transformed_data = transform_data(dataset=extracted_data)
SQLExecuteQueryOperator(
task_id="load_data",
conn_id=CRATE_DB_CONN_ID,
sql="sql/insert_data.sql",
parameters=transformed_data,
)
# dynamically map the task group over the list of file names, creating
# one task group for each file
extract_to_load_tg = extract_to_load.partial(
base_url=GH_CONTENT_URL, folder_path=GH_FOLDER_PATH
).expand(file_name=file_names)
Before we query our data, the data_check
task makes sure the data is up to our quality standards. The SQLColumnCheckOperator runs one data quality check, making sure there are no negative values for age. Additional checks can easily be added to the Python dictionary.
data_check = SQLColumnCheckOperator(
task_id="value_check",
conn_id=CRATE_DB_CONN_ID,
table="doc.possum",
column_mapping={
"age": {"min": {"geq_to": 0}},
},
)
Next, the select_data
task runs a query returning average tail lengths for different possum populations grouped by sex and population. The columnar storage in CrateDB enables very fast column-level operations like aggregations, grouping and sorting.
select_data = SQLExecuteQueryOperator(
task_id="select_data",
conn_id=CRATE_DB_CONN_ID,
sql="sql/select.sql",
)
Lastly, the print_selected_data
task will pull the query results from XCom, Airflows’ internal storage for metadata and print it to the logs.
selected_data = context["ti"].xcom_pull(task_ids="select_data")
Now you know exactly what your code will do. Let’s finally run it!
4. Get your DAG running in the cloud
Now, with all the files ready, you can deploy them directly to Astro.
- In the CLI run:
astro login
- Login in the browser and click Accept to authenticate your device to Astro.
- Run the following command to deploy all of your local code to Astro:
astro deploy -f
astro deploy
will push all local changes to your Astro Deployment. If you only want to push changes made to code in the dags folder, there is the option to do a DAG-only deploy. DAG-only deploys are the fastest way to deploy code to Astro.
- To set the connection to CrateDB Cloud you can add an environment variable directly in the Astro UI, leveraging CrateDB’s ability to use the PostgreSQL wire protocol. Click on the Variables tab of your Astro Deployment and then on Edit Variables. Copy and paste the
Key
andValue
from the code block below and make sure to add your own values for username, secret and CrateDB Cloud URL. Click the checkboxSecret?
to mark your variable as containing a secret and click Add and then Save Variables to add your new environment variable to your Deployment.
Figure 9: Adding an environment variable on Astro.
Key: AIRFLOW_CONN_CRATEDB_CONNECTION
Value: "postgres://$YOUR_USERNAME:$YOUR_SECRET@$YOUR_CRATEDB_CLOUD_URL:5432/?sslmode=require"
While we do not show this functionality in this blog post, the Astro CLI also makes it super easy to run your DAGs in a local environment. Just make sure you have Docker Desktop installed, add the environment variable from this step in your
.env
file and runastro dev start
to spin up a local environment to test your code before deploying to the cloud.
Now open the Airflow UI of your Astro Deployment running in the cloud by clicking on Open Airflow. Click the arrow to the right hand side of the astro_cratedb_elt_pipeline
DAG and you will see it running!
Finally, after the DAG has completed, head to the Admin UI in CrateDB Cloud to check and further analyze your data:
Figure 10: Browsing possum data in the CrateDB Admin UI.
Conclusion
You did it! You set up two cloud tools and ran a full ETL pipeline up in the sky in just 30min!
This is just the start of your Astro and CrateDB journey. Learn more about our tools at:
- Astronomer Learn
- Astro documentation
- Astronomer Academy
- Astronomer webinars
- CrateDB documentation
- CrateDB Cloud tutorials
- CrateDB webinars
If you have any questions feel free to ask in the Airflow and CrateDB communities. In the Airflow Slack, you can ask questions related to all things Astro in #airflow-astronomer.