GCP Kafka Connect Google Cloud Storage Examples

GCP Kafka Connect Google Cloud Storage GCS

In this GCP Kafka tutorial, I will describe and show how to integrate Kafka Connect with GCP’s Google Cloud Storage (GCS).  We will cover writing to GCS from Kafka as well as reading from GCS to Kafka.  Descriptions and examples will be provided for both Confluent and Apache distributions of Kafka.

I’ll document the steps so you can run this on your environment if you want.  Or, you can watch me do it in videos below. Or both. Your call.

It is expected that you have some working knowledge of Apache Kafka at this point, but you may not be an expert yet.  If you know about running Kafka Connect in standalone vs distributed mode or how topics may be used to maintain state or other more advanced topics, that’s great.  This is more a specific use case how-to tutorial.

If you have any questions or concerns, leave them in the comments below.  I’m happy to help.  Well, I’m happier to help for cash money or Ethereum, cold beer, or bourbon.

The overall goal here is keeping it simple and get a demo working asap.  We can optimize afterward.  And in this case, when I say “we can optimize”, I really mean “you can optimize” for your particular use case.

All the examples of accompanying source code in GitHub and screencast videos on YouTube.

Let’s roll.

Kafka GCP Requirements

  1. GCP GCS bucket which you can write and read from. You knew that already though, right?  Because this is a tutorial on integrating Kafka with GCS.  If you didn’t know this, maybe you should leave now.
  2. Kafka
  3. GCP service account JSON credentials file.  How to create is described below and also see the Resources section below for a link to GCP Service Account info.

Kafka GCP GCS Overview

When showing examples of connecting Kafka with Google Cloud Storage (GCS) we assume familiarity with configuring Google GCS buckets for access.  There is a link for one way to do it in the Resources section below.  For setting up my credentials, I installed gcloudcreated a service account in the GCP console and downloaded the key file.  Then, I ran `gcloud auth active-service-account --key-file mykeyfile.json` to update my ~/.boto file.  Note: mykeyfile.json is just an example.  Your JSON key file will likely be named something different.  Whatever the name of this file, you will need it to perform the steps below.

One way you can verify your GCP setup for this tutorial is to successfully run gsutil ls from the command line.

If you are new to Kafka Connect if you find the previous posts on Kafka Connect tutorials helpful.  I’ll go through it quickly in the screencast below in case you need a refresher.

Again, we will cover two types of examples.  Writing to GCS from Kafka with the Kafka GCS Sink Connector and then an example of reading from GCS to Kafka.  Technically speaking, we will configure and demo the Kafka Connect GCS Source and Kafka Connect GCS Sink connectors.

Kafka Connect GCS Sink Example with Confluent

Let’s see a demo to start.  In the following screencast, I show how to configure and run Kafka Connect with Confluent distribution of Apache Kafka. Afterward, we’ll go through each of the steps to get us there.

As you’ll see, this demo assumes you’ve downloaded the Confluent Platform already. I downloaded the tarball and have my $CONFLUENT_HOME variable set to /Users/todd.mcgrath/dev/confluent-5.4.1

As you will see, you will need your GCP service account JSON file for GCP authentication.  Let’s cover writing both Avro and JSON to GCP in the following tv show screencast.

Steps in screencast

  1. confluent local start
  2. Note how I copied over gcs-sink.propertiesfile from my Github repo.  Link below.  Open the file and show JSON credentials reference and Avro output example
  3. Show sink connector already installed
  4. Show empty GCS bucket
  5. First example is Avro, so generate 100 events of test data with `ksql-datagen quickstart=orders format=avro topic=orders maxInterval=100 iterations=100`  See the previous post on test data in Kafka for reference.
  6. confluent local load gcs-sink — -d gcs-sink.properties
  7. gsutil ls gs://kafka-connect-example/ and GCP console to show new data is present
  8. confluent local unload gcs-sink
  9. Second example is JSON output, so edit gcs-sink.properties file
  10. confluent local config datagen-pageviews — -d ./share/confluent-hub-components/confluentinc-kafka-connect-datagen/etc/connector_pageviews.config (Again, see link in References section below for previous generation of test data in Kafka post)
  11. kafkacat -b localhost:9092 -t pageviews

Kafka Connect GCS Sink Example with Apache Kafka

And now with Apache Kafka.  The GCS sink connector described above is a commercial offering, so you might want to try something else if you are a self-managed Kafka user.  At the time of this writing, I couldn’t find an option.  If you know of one, let me know in the comments below.  Thanks.

GCP Kafka Connect GCS Source Example

What to do when we want to hydrate data into Kafka from GCS?  Well, my fine friend, we use a GCS Source Kafka connector.  Let’s go through some examples.

In the following demo, since Kafka Connect GCS Source connector requires Confluent license after 30 days, we’ll run through the example using Confluent.

Steps in screencast

  1. confluent local start
  2. Again, I copied over gcs-source.propertiesfile from my Github repo.  Link below.
  3. Show sink connector already installed
  4. `gsutil ls gs://kafka-connect-example/topics/orders` which shows existing data on GCS from the previous tutorial
  5. `kafka-topics --list --bootstrap-server localhost:9092` to show orders topic doesn’t exist
  6. confluent local load gcs-source — -d gcs-source.properties
  7. confluent local consume orders -- --value-format avro --from-beginningor kafkacat -b localhost:9092 -t orders -s avro -r http://localhost:8081

What about if the source topic like orders already exists?  From docs

“Be careful when both the Connect GCS sink connector and the GCS Source Connector use the same Kafka cluster, since this results in the source connector writing to the same topic being consumed by the sink connector. This causes a continuous feedback loop that creates an ever-increasing number of duplicate Kafka records and GCS objects. It is possible to avoid this feedback loop by writing to a different topic than the one being consumed by the sink connector.”

If you want to run with the SMT

  1. confluent local unload gcs-source
  2. Modify gcs-source.properties file.  Uncomment SMT transformation lines as described in ye ole TV show above.
  3. confluent local load gcs-source — -d gcs-source.properties
  4. kafka-topics --list --bootstrap-server localhost:9092
  5. You should see copy_of_orderstopic

Kafka Connect GCS Sink Example with Apache Kafka

The GCS source connector described above is also commercial offering from Confluent, so let me know in the comments below if you find more suitable for self-managed Kafka.  Thanks.

 

GCP Kafka Google Cloud Storage (GCS) Helpful Resources

 

 

Featured image https://pixabay.com/photos/splash-jump-dive-sink-swim-shore-863458/

 

 

Share! Share! Share! Chant it with me now

Leave a Reply

Your email address will not be published. Required fields are marked *