Build a data pipeline with Airflow and GCP

Transforming data in BigQuery

Automate data collection with GCP Cloud Functions

Dark Light

In a previous post, we have talked about how we get crime data from London Datastore and aggregate it in BigQuery. From there, we also discussed how we can draw insight by creating a dashboard in Looker Studio from the aggregated data in BigQuery.

Now we are going to build a data pipeline with Airflow and different Google Cloud Platform (GCP) tools so we can automate and schedule the data ingestion, transformation and visualization process.

To achieve this, there are several steps that we need to do in the data pipeline including:

  1. Fetching the latest crime data from data.police.uk with Cloud Function and store it in a Cloud Storage Bucket
  2. Load the data from the bucket to BigQuery
  3. Transform the raw data in BigQuery
  4. Aggreate from the transformed data and visualize the data in Looker Studio

All these four steps can be done using Airflow operators to orchestrate functions or tools in GCP.

Fetching the latest crime data from data.police.uk with Cloud Function

To fetch the data from data.police.uk, we need to build a python program that simulate sending HTTP request to the website and download the generated zip file from the website.

As we will run this code every month when the Airflow DAG is triggered, we will download only 1 month of crime data with this python code. After downloading the file, we need to unzip it and upload the file to a designated Google Cloud Storage bucket. We will achieve this in the same python program.

The python program is deployed to Google Cloud Function which is a serverless function-as-a-service product allowing user to deploy function to run on cloud without managing any infrastructure.

We will then trigger this Cloud Function from Airflow with a SimpleHttpOperator. Since we need to tell our function data of which month we need to download, we also pass the execution date of the DAG to the Cloud Function as a parameter.

trigger_cloud_function = SimpleHttpOperator(
       task_id='trigger_cloud_function',
       http_conn_id='my_gcp_conn',  
       endpoint='<cloud-function-endpoint>/download_crime_data/?date={{ ds }}',
       method='POST',
       headers={"Content-Type": "application/json", "extra__google_cloud_platform__num_retries": "6"},
       response_check=lambda response: response.status_code == 200
        )

Load the data from Cloud Storage Bucket to BigQuery

Now the file is in the cloud Storage bucket, we can then load it into BigQuery. We can do this from Airflow using a GCSToBigQueryOperator. Our data is in CSV format, we need to specify this in our task as well as other parameters like autodetect, skip_leading_rows so the CSV file can be loaded into a BigQuery table properly with a correct schema.

This is going to be a trunc and load model meaning we will rewrite the same table every time we run this task. It is fine since we are going to process this raw data in later tasks and then insert the new data to our final aggregate table.

load_csv_to_bigquery = GCSToBigQueryOperator(
            task_id='load_csv_to_bigquery',
            gcp_conn_id='my_gcp_conn',
            bucket='gary-yiu-001-bucket',
            source_objects=['crime-metropolitan-street.csv'],
            destination_project_dataset_table='{0}.{1}.london_recent_crime_data_raw'.format(
                BQ_PROJECT, BQ_DATASET
            ), 
            create_disposition='CREATE_IF_NEEDED',
            write_disposition='WRITE_TRUNCATE',
            skip_leading_rows=1,  # Skip the header row
            source_format='CSV',
            autodetect=True,  # Automatically detect the schema from the CSV file
        )  

Transform the raw data in BigQuery

Now that we have our raw data in BigQuery, we can do some cleaning and formatting the raw data. Essentially this is the same as what we would do in BigQuery – running a query to clean the data, but we can do it with Airflow by using BigQueryOperator

cleaning_crime_data_insert = BigQueryOperator(
            task_id='cleaning_crime_data_insert',
            sql='london_crime_data_cleaned.sql',
            destination_dataset_table=f"{ BQ_PROJECT }.{ BQ_DATASET }.london_recent_crime_data$" + "{{ ds_nodash[:6] }}",    
            write_disposition='WRITE_TRUNCATE',
            create_disposition='CREATE_IF_NEEDED',
            allow_large_results=True,
            use_legacy_sql=False,
            time_partitioning={"type": "MONTH", "field": "date_month"},
            cluster_fields="lsoa_code",
            gcp_conn_id=BQ_CONN_ID,
            dag=dag
        )

We specifiy the SQL file that we’re using in the operator and also the destination of the output data. Here we are inserting the new data to an existing table london_recent_crime_data, notice that we are still using WRITE_TRUNCATE as our write_disposition. Since we have setup a time partitioning, we can insert the new data as a new partition instead of scrapping the whole table rebuilding it. This also explains why we need to append the date-month to the destination table.

Aggreate from the transformed data

From our clean dataset, we can now aggregate the data can put it into the final data table that we use for visualization and analysis. This can be done with another BigQueryOperator. After we run the aggregation, we are going to do a simple check just to see if data are being written into the table correctly. The check will be done using the BigQueryCheckOperator.

aggregate_data_with_polygon = BigQueryOperator(
            task_id='aggregate_data_with_polygon',
            sql='london_crime_data_joined_polygon.sql',
            destination_dataset_table=f'{BQ_PROJECT}.{BQ_DATASET}.london_crime_data_agg',    
            write_disposition='WRITE_TRUNCATE',
            allow_large_results=True,
            use_legacy_sql=False,
            gcp_conn_id=BQ_CONN_ID,
            dag=dag
        )

check_agg_data = BigQueryCheckOperator(
        task_id='check_agg_data',
        sql='''
        SELECT
            COUNT(*) AS rows_in_partition
        FROM `{0}.{1}.london_crime_data_agg`
        '''.format(BQ_PROJECT, BQ_DATASET
            ),
        use_legacy_sql=False,
        gcp_conn_id=BQ_CONN_ID,
        dag=dag)

Finally, our pipeline is done and we already have discussed how we created the visualization in Looker Studio here.

The source code of the pipeline can be found here in my github repository.

Related Posts