Creating a Cloud Dataflow pipeline

Convert a shapefile to Polygon data in BigQuery

Lift and shift database to Cloud SQL

Dark Light

We are trying to create a Cloud Dataflow pipeline to do a simple work count on any kind of text file. We’ll be using a wordcount function that is included in the apache_beam SDK package. It basically takes any text file and parse each lines into words, then perform a word frequency count.

Prerequisites:

  1. Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Datastore, and Cloud Resource Manager APIs
  2. Create a new Cloud Storage bucket

We’re going to use the Cloud Shell to create the pipeline so no need to set up a virtual environment

1. Get Apache BEAM SDK

Install python wheel

pip install wheel

Install latest BEAM SDK

pip install 'apache-beam[gcp]'

2. Run the pipeline locally in Cloud Shell to test

python -m apache_beam.examples.wordcount \
  --output output-test

Click ‘Authorize’ when asked by Cloud shell. The code by default run the word count on the Shakespeare’s play King Lear. After the run is finished we should see a output-test file created in our directory.

Use `vim output*` to view the output file you’ll see it counts the words appearance and list it out in the file.

3. Run the pipeline on Dataflow service

For demonstration, we are counting words from another Shakespeare’s play Macbeth. Enter the following code to your Cloud console.

python -m apache_beam.examples.wordcount \
    --region asia-east2 \
    --input gs://dataflow-samples/shakespeare/macbeth.txt \
    --output gs://bucket-for-quickstart-gary/results/outputs \
    --runner DataflowRunner \
    --project gary-yiu-001 \
    --temp_location gs://bucket-for-quickstart-gary/tmp/

Google cloud will then create the Dataflow job and the necessary resources to run the job. Go to Airflow from drop down menu, we can see our Dataflow job was created.

Click into the job name, we can see the details and each stage of the job. Also the resources that are being used are displayed.

After the job is done, we’ll get the following message in the Cloud console. We can go ahead to our bucket to see the output of the job.

Volia, our outputs are here.

Copy the output text files to our Cloud console to take a look of the content

gsutil cp gs://bucket-for-quickstart-gary/results/* .
vim outputs-00001-of-00003

Now we can confirmed we successfully built our Dataflow pipeline 😀 Remember to delete our bucket to avoid unnecessary cost.

Related Posts