Automate data collection with GCP Cloud Functions

Build a data pipeline with Airflow and GCP

Dark Light

Introduction

In the previous post, we have demonstrate steps to build an Airflow ELT data pipeline for crime data in London. In the post, I’ll walk you through a python program I developed to automate downloading, processing, and uploading of the Metropolitan Police crime data. We then deploy this function to run in Google Cloud Functions. This function leverages web scraping, data manipulation, and Google Cloud Storage (GCS) integration.

Prerequisites

  • A Google Cloud Platform (GCP) project with billing enabled
  • The Cloud Functions API enabled in your GCP project
  • Basic familiarity with Python web development concepts

Understanding the Code

The python program accomplishes the following tasks in order:

  • Retrieving crime data: It interacts with the data.police.uk website using BeautifulSoup to simulate a browser to access Metropolitan Police crime data web page. Then input necessary parameters to submit the request to generate the download URL of the requested crime data.
  • Locating and reusing CSRF token:
    • Understanding CSRF Protection: The data.police.uk website uses a CSRF token to safeguard the data download form. This token is a unique value the site embeds within its web pages.
    • Locating the Token: Our code sends a GET request to access the download page. In the response, the website sets a cookie called ‘csrftoken’. This cookie holds the crucial CSRF token value.
    • Extracting the Value: The line csrf_token = response.cookies.get('csrftoken') uses the requests library to easily access cookies from the response and extracts the value of the ‘csrftoken’ cookie, storing it in the csrf_token variable.
    • Including the Token in the Download Request: Later in the code, when the function simulates submitting the download request, it includes this extracted csrf_token. This inclusion tells the website that the request originates from a legitimate page view and not a malicious attempt from another website.
  • Extracting download URL: After submitting the download requests, we need to wait for the server to output the download URL and store that URL for next step.
  • Downloading and unzipping: It downloads the compressed crime data and extracts the relevant CSV file. The code includes handling of different status code we got from the response.
  • File Renaming and cleanup: The extracted CSV file is renamed for clarity and moved.
  • Google Cloud Storage upload: The processed CSV file is uploaded to a designated GCS bucket. A client object (storage_client) is created to establish a connection with Google Cloud Storage. Then a bucket object (bucket) is created referencing the specific GCS bucket where the file will reside. A blob object (blob) is created to represents the file we want to upload and includes its desired destination path within your GCS bucket. Finally we make use of the  blob.upload_from_filename() method to physically transfers the CSV file from the Cloud Functions’s local storage to the designated location within the Google Cloud Storage bucket.

Code Breakdown

import functions_framework
import requests
from bs4 import BeautifulSoup
import time
from datetime import datetime, timedelta
import json
import zipfile
import os
from google.cloud import storage

##function to fetch data from authority
@functions_framework.http
def download_and_upload_data(request):
    """Downloads, processes, and uploads Metropolitan Police crime data from data.police.uk to GCS."""
    run_date = request.args.get('date')

    # Step 1: Send a GET request to the custom download page URL
    headers = {
        'Referer': 'https://data.police.uk/data/'
    }

    response = requests.get("https://data.police.uk/data/", headers=headers)

    if response.status_code == 200:
        # Step 2: Parse HTML content and locate the form
        soup = BeautifulSoup(response.content, "html.parser")
        form = soup.find("form")

        # Step 3: Extract the CSRF token from the response cookies
        csrf_token = response.cookies.get('csrftoken')

        current_date = datetime.strptime(run_date, '%Y-%m-%d')
        minus_one_month = (current_date - timedelta(days=30)).strftime("%Y-%m")
        print('Input date minus one month is ' + minus_one_month)

        # Step 4: Extract form data including hidden fields or required parameters
        form_data = {
            "csrfmiddlewaretoken": csrf_token,
            'date_from': minus_one_month,
            'date_to': minus_one_month,
            'forces': ['metropolitan'],
            'include_crime': 'on'
            # Add any other required form fields
        }

        # Step 5: Simulate form submission by sending a POST request with CSRF cookie
        submit_url = "https://data.police.uk/data/"
        cookies = {
            'csrftoken': csrf_token
        }
        response = requests.post(submit_url, data=form_data, headers=headers, cookies=cookies, allow_redirects=False)
        
        if response.status_code == 302:
            # Step 6: Retrieve refreshed page locatoin after form submission
            location = response.headers.get('location')
            print(location)
            # Step 7: Retrieve the session ID from the location and access the progress page for the download URL
            substring_to_remove = "/data/fetch/"
            result_string = location.replace(substring_to_remove, "")

            redirected_response = requests.get("https://data.police.uk/data/progress/" + result_string)
            
            download_url = None

            # Step 8: Extract the download URL from the JSON response
            refreshed_page_content = redirected_response.content.decode('utf-8')
            print(refreshed_page_content)
            refreshed_page_json = json.loads(refreshed_page_content)
            print(refreshed_page_json)
            download_url = refreshed_page_json['url']
            print(download_url)

            # Step 9: Download the file
            if download_url:
                response = requests.get(download_url)
                if response.status_code == 200:
                    file_name = "crime_data_metropolitan.zip"  # Adjust the file name if needed
                    with open(file_name, "wb") as file:
                        file.write(response.content)
                    print("Metropolitan Police crime data downloaded successfully.")
                    # Step 10: Unzip the file
                    with zipfile.ZipFile(file_name, 'r') as zip_ref:
                        zip_ref.extractall()

                    # Step 11: Move the CSV file to desired location
                    extracted_file_name = minus_one_month + "-metropolitan-street.csv" # Adjust the file name if needed
                    renamed_file_name = "crime-metropolitan-street.csv"
                    os.rename(minus_one_month + "/" + extracted_file_name, renamed_file_name)  # Move the file to the desired location

                    print("CSV file extracted and saved successfully.")

                    # Step 12: Upload the CSV file to a GCS bucket
                    bucket_name = 'gary-bucket'  # Replace with your GCS bucket name
                    file_path = renamed_file_name  # Path to the CSV file
                    destination_blob_name = 'crime-metropolitan-street.csv'  # Desired path and filename in the bucket

                    storage_client = storage.Client()
                    bucket = storage_client.bucket(bucket_name)
                    blob = bucket.blob(destination_blob_name)
                    blob.upload_from_filename(file_path)

                    print("CSV file uploaded to Google Cloud Storage successfully.")
                else:
                    print("Failed to download Metropolitan Police crime data.")
                    return make_response("Failed to download Metropolitan Police crime data.", 500)
            else:
                print("Download URL not found.")
                return make_response("Download URL not found.", 500)
        else:
            print("Form submission failed.")
            return make_response("Form submission failed.", 500)
    else:
        print("Failed to access the custom download page.")
        return make_response("Failed to access the custom download page.", 500)
    return "Process completed successfully."

Deploy in Cloud Functions

  1. Cloud Functions Page: Navigate to the Cloud Functions page in the Google Cloud console
  2. Create Function: Click on “Create function”
  3. Function Configuration:
    • Name: Provide a name for your function
    • Region: Choose the region where you want to deploy the function
    • Trigger: Select “HTTP” as your trigger
    • Runtime: Select “Python 3.8”
    • Entry point: Specify the name of the function within your code file that will be executed in our case download_and_upload_data
    • Source Code: Choose “Inline Editor” then paste the code in the code editor
  4. Dependencies: Inrequirements.txt file, include beautifulsoup4, requests, google-cloud-storage
  5. Advanced Settings: left these as default
  6. Deploy: Click “Create” to deploy your function. You’ll get a URL endpoint that you can use to trigger your function once deployment is complete.

Hope it all makes sense and you can find the source code here in GitHub.

Related Posts