Running Kafka Connect – Standalone vs Distributed Mode Examples

Kafka Connect Distributed Standalone Modes Examples

One of the many benefits of running Kafka Connect is the ability to run single or multiple workers in tandem.  Running multiple workers provides a way for horizontal scale-out which leads to increased capacity and/or an automated resiliency.  For resiliency, this means answering the question, “what happens if a particular worker goes offline for any reason?”.  Horizontal scale and failover resiliency are available out-of-the-box without a requirement to run another cluster.

In this post, we’ll go through examples of running Kafka Connect in both Standalone and Distributed mode.

Distributed mode is recommended when running Kafka Connect in production.

Standalone and Distributed Mode Overview

To review, Kafka connectors, whether sources or sinks, run as their own JVM processes called “workers”.  As mentioned, there are two ways workers may be configured to run: Standalone and Distributed.

Now, regardless of mode, Kafka connectors may be configured to run more or tasks within their individual processes. For example, a Kafka Connector Source may be configured to run 10 tasks as shown in the JDBC source example here https://github.com/tmcgrath/kafka-connect-examples/blob/master/mysql/mysql-bulk-source.properties. I wanted to make note of tasks vs. Distributed mode to avoid possible confusion.  Multiple tasks do provide some parallelism or scaling out, but it is a different construct than running in Distributed mode.  Ok, good, that’s out of the way.

Kafka Connect Standalone Mode

Running Kafka Connect in Standalone makes things really easy to get started.  Most of the examples on this site and others so far show running connectors in Standalone. In Standalone mode, a single process executes all connectors and their associated tasks.

There are cases when Standalone mode might make sense in Production.  For example, if you are log shipping from a particular host, it could make sense to run your log source in standalone mode on the host with the log(s) you are interested in ingesting into Kafka.  But generally speaking, you’ll probably want to run in Distributed mode in production.

As you can imagine, Standalone scalability is limited.  Also, there is no automated fault-tolerance out-of-the-box when a connector goes offline.  (Well, you could build an external automated monitoring process to restart failed Standalone connectors I guess, but that’s outside of scope here.  And also, why?  I mean, if you want automated failover, just utilize running in Distributed mode out-of-the-box.)

Kafka Connect Distributed Mode

Running Kafka Connect in Distributed mode runs Connect workers on one or multiple nodes.  When running on multiple nodes, the coordination mechanics to work in parallel does not require an orchestration manager such as YARN.  Let me stop here because this is an important point.  In other words, when running in a “cluster” of multiple nodes, the need to coordinate “which node is doing what?” is required.  This may or may not be relevant to you.  For me personally, I came to this after Apache Spark, so no requirement for an orchestration manager interested me.

The management of Connect nodes coordination is built upon Kafka Consumer Group functionality which was covered earlier on this site.  If Consumer Groups are new to you, check out that link first before proceeding here.

As you would expect with Consumer Groups, Connect nodes running in Distributed mode can evolve by adding or removing more nodes.  Like Consumer Group Consumers, Kafka Connect nodes will be rebalanced if nodes are added or removed.  As touched upon earlier in this post, this ability to expand or contract the number of worker nodes provides both the horizontal scale and fault tolerance inherent in Distributed mode.  Again, this is the same expectation you have when running Kafka Consumers in Consumer Groups.

If this is new to you, my suggestion is to try to keep-it-simple and go back to the foundational principles.  To understand Kafka Connect Distributed mode, spend time exploring Kafka Consumer Groups.

Kafka Connect Standalone and Distributed Mode Examples Overview

Let’s run examples of a connector in Standalone and Distributed mode.  To run in these modes, we are going to run a multi-node Kafka cluster in Docker.  When we run the example of Standalone, we will configure the Standalone connector to use this multi-node Kafka cluster.  And, when we run a connector in Distributed mode, yep, you guessed it, we’ll use this same cluster.

This might seem random, but do you watch TV shows?  Me too.  Why do I ask?

Well, I made a TV show running through the examples here.  I’m hoping it’s helpful for you to watch someone else run these examples if you are attempting to run the examples in your environment.

Here’s me running through the examples in the following “screencast” 🙁

By the way, yes, I know, you are right, most folks call these screencasts and not TV shows.  I get it.  But, it’s more fun to call it a Big Time TV show.

Kafka Connect Standalone and Distributed Mode Example Requirements

To run these examples in your environment, the following are required to be installed and/or downloaded.

  1. Docker installed
  2. Docker-compose installed
  3. Confluent Platform or Apache Kafka downloaded and extracted (so we have access to the CLI scripts like kafka-topics or kafka-topics.sh)

Kafka Cluster Setup

To run these Standalone and Distributed examples, we need access to a Kafka cluster.  It can be Apache Kafka or Confluent Platform.  I’m going to use a docker-compose example I created for the Confluent Platform.

As previously mentioned and shown in the Big Time TV show above, the Kafka cluster I’m using for these examples a multi-broker Kafka cluster in Docker.  I’m using Confluent based images.  The intention is to represent a reasonable, but lightweight production Kafka cluster having multi brokers but not too heavy to require multiple Zookeeper nodes.  If you are not using this cluster, you’ll just have to make configuration adjustments for your environment in the steps below.

  1. Download or clone the repo with the docker-compose.yml file available here https://github.com/tmcgrath/docker-for-demos/tree/master/confluent-3-broker-cluster
  2. Run docker-compose up in the directory which contains this docker-compose.yml file.
  3. Confirm you have external access to the cluster by running kafka-topics --list --bootstrap-server localhost:29092 or kafkacat -b localhost:19092 -L.  Again, running either of these examples presumes you have downloaded, expanded, a distribution of Apache Kafka or Confluent Platform as described in the Requirements section above.  (A note on my environment.  I have CONFLUENT_HOME set as an environment variable and $CONFLUENT_HOME/bin in my path and havekafkacatinstalled.  Your environment may vary.  For example, if you are using Apache Kafka for CLI scripts, use kafka-topics.shinstead of kafka-topics.)

Again, see the…. screencast, pfft, I mean, Big Time TV show, above if you’d like to see a working example of these steps.

If you want to run an Apache Kafka cluster instead of Confluent Platform, you might want to check out https://github.com/wurstmeister/kafka-docker or let me know if you have an alternative suggestion.

Kafka Connect Standalone Example

Both Confluent Platform and Apache Kafka include Kafka Connect sinks and source examples for both reading and writing to files.  For our first Standalone example, let’s use a File Source connector.  Again, I’m going to run through using the Confluent Platform, but I will note how to translate the examples to Apache Kafka.

If your external cluster is running as described above, go to the Confluent root directory in a terminal.  (You may have already set this to CONFLUENT_HOME environment variable).

  1. Kafka cluster is running.  See above.
  2. In a terminal window, cd to where you extracted Confluent Platform.  For my environment, I have this set to a CONFLUENT_HOME environment variable.
  3. Copy etc/kafka/connect-standalone.properties to the local directory; i.e. cp etc/kafka/connect-standalone.properties . (We are going to use a new connect-standalone.properties, so we can customize it for this example only.  This isn’t a requirement.  We could use the default etc/kafka/connect-standalone.properties, but I want to leave it so I can run confluent localcommands, as described in other Kafka Connect examples on this site.)
  4. Open this new connect-standalone.properties file in your favorite editor and change bootstrap.servers value to localhost:19092
  5. Also, make sure the plugin.pathvariable in this connect-standalone.properties file includes the directory where File Source connector jar file resides. For my environment, it is set to /Users/todd.mcgrath/dev/confluent-5.4.1/share/java/kafka.  By the way, my CONFLUENT_HOME var is set to /Users/todd.mcgrath/dev/confluent-5.4.1
  6. Create a test.txtfile with some sample data
  7. Run bin/connect-standalone ./connect-standalone.properties etc/kafka/connect-file-source.properties to start the File Source connector.
  8. If all went well, you should have a connect-test topic now.  You can verify with kafka-topics --list --bootstrap-server localhost:19092
  9. Confirm events are flowing with the console consumer; i.e bin/kafka-console-consumer --bootstrap-server localhost:19092 --topic connect-test

Cool.

Kafka Connect Standalone Configuration

A fundamental difference between Standalone and Distributed appears in this example.  Where and how offsets are stored in the two modes are completely different.  As we’ll see later on in the Distributed mode example, Distributed mode uses Kafka for offset storage, but in Standalone, we see that offsets are stored locally when looking at the connect-standalone.properties file.

Notice the following configuration in particular--

offset.storage.file.filename=/tmp/connect.offsets

Apache Kafka Differences

If you were to run these examples on Apache Kafka instead of Confluent, you’d need to run connect-standalone.sh instead of connect-standalone and the locations of the default locations of connect-standalone.properties, connect-file-source.properties, and the File Source connector jar (for setting in plugins.path) will be different.  You’ll need to adjust accordingly.  Let me know if you have any questions or concerns.

Kafka Connect Distributed Example

You’ll notice differences running connectors in Distributed mode right away.  To start, you don’t pass configuration files for each connector to startup.  Rather, you start up the Kafka Connect Distributed process and then manage via REST calls.  You’ll see in the example, but first let’s make sure you are setup and ready to go.

Kafka Connect Distributed Example -- Part 1 -- Setup

The following steps presume you are in a terminal at the root drive of your preferred Kafka distribution.  Agina, I’m going to use Confluent, so my CLI scripts do not have .shat the end.  Adjust yours as necessary.

First, pre-create 3 topics in the Dockerized cluster for Distributed mode as recommended in the documentation.  I’ll explain why afterward. If you running the Dockerized 3 node cluster described above, change the port from 9092 to 19092 such as:

  • bin/kafka-topics --create --bootstrap-server localhost:19092 --topic connect-configs --replication-factor 3 --partitions 1 --config cleanup.policy=compact
  • bin/kafka-topics --create --bootstrap-server localhost:19092 --topic connect-offsets --replication-factor 3 --partitions 50 --config cleanup.policy=compact
  • bin/kafka-topics --create --bootstrap-server localhost:19092 --topic connect-status --replication-factor 3 --partitions 10 --config cleanup.policy=compact
  • Verify all three topics are listed with-- kafka-topics --list --bootstrap-server localhost:19092

Next, cp over the example properties file for Distributed mode so we can customize for this example.  This is similar to what we did above in Standalone mode.

cp etc/kafka/connect-distributed.properties ./connect-distributed-example.properties

Edit this connect-distributed-example.properties in your favorite editor.

  • Change bootstrap.servers=localhost:9092 to bootstrap.servers=localhost:19092
  • Ensure your plugin.path is set to a path that contains the connector JAR you want to run.  Similar to the above, we are going to use File Connector source which is included by default in Confluent.

Ensure you have a test.txt file in your local directory.  The same one from above is fine.

Finally, we need a JSON file with our connector configuration we wish to run in Distributed mode.  Create a connect-file-source.json file and cut-and-paste content into the file from here https://gist.github.com/tmcgrath/794ff6c4922251f2859264abf39866ae

So, you should have a connect-file-source.json file.

Ok, to review the Setup, at this point you should have

  • 3 topics created
  • connect-distributed-example.properties file
  • test.txt file
  • connect-file-source.json file

If you have these 4 things, you, my good-times Internet buddy, are ready to roll.

Kafka Connect Distributed Example -- Part 2 -- Running a Simple Example

  • Startup Kafka Connect in Distributed — bin/connect-distributed connect-distributed-example.properties
  • Ensure this Distributed mode process you just started is ready to accept requests for Connector management via the Kafka Connect REST interface.  I’m going to use curlbut you can use whatever REST client you’d prefer.  The output from the commandcurl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors should be 200.
  • Make REST call to start a new File Source Connector with curl -s -X POST -H 'Content-Type: application/json' --data @connect-file-source.json http://localhost:8083/connectors

Should be good-to-go now, but let’s verify.

  • Is the “local-file-source” connector listed when issuing curl -s http://localhost:8083/connectors
  • In a separate terminal window, is the connect-test topic now listed; i.e. bin/kafka-topics --list --bootstrap-server localhost:19092
  • And can we consume from it?  bin/kafka-console-consumer --bootstrap-server localhost:19092 --topic connect-test.  Add more data to test.txt to watch it in real-time as I did in the screencast shown above.

If verification is successful, let’s shut the connector down with

  • curl -s -X PUT http://localhost:8083/connectors/local-file-source/pause
  • curl -s -X GET http://localhost:8083/connectors/local-file-source/status
  • curl -s -X DELETE http://localhost:8083/connectors/local-file-source
  • curl -s http://localhost:8083/connectors should be empty

Kill the Distributed worker process if you’d like.

So, that’s it!  I hope you did it.  You now know how to run Kafka Connect in Distributed mode.

To recap, here’s what we learned in this section

  • Unlike Standalone, running Kafka Connect in Distributed mode stores the offsets, configurations, and task statuses in Kafka topics.  As recommended, we pre-created these topics rather than aut0-create.  In the Dockerized cluster used above, you may have noticed it allows auto-create of topics.  We didn’t do that.  No way.  Not us.
  • As described, Distributed builds on the mechanics of Consumer Groups, so no surprise to see a group.idvariable in the connect-distributed-examples.properties file.
  • To manage connectors in Distributed mode, we use the REST API interface.  This differs from Standalone where we can pass in configuration properties file from CLI.

Kafka Connect Tutorial Updates

Writing this post inspired me to add resources for running in Distributed mode.

For the Kafka MySQL JDBC tutorial, I added JSON examples to GitHub in case you want to run in Distributed mode.  See https://github.com/tmcgrath/kafka-connect-examples/tree/master/mysql for access.

For the Kafka S3 examples, I also added JSON examples to GitHub in case you want to run in Distributed mode.  See https://github.com/tmcgrath/kafka-connect-examples/tree/master/s3 for access.

For the Kafka Azure tutorial, there is a JSON example for Blob Storage Source available on the Confluent site at https://docs.confluent.io/current/connect/kafka-connect-azure-blob-storage/source/index.html#azure-blob-storage-source-connector-rest-example which might be helpful.

For providing JSON for the other Kafka Connect Examples listed on GitHub, I will gladly accept PRs.

References

 

Featured image https://pixabay.com/photos/house-unit-standalone-architecture-3420617/

Azure Kafka Connect Example – Blob Storage

Azure Kafka Blob Storage Example

In this Azure Kafka tutorial, let’s describe and demonstrate how to integrate Kafka with Azure’s Blob Storage with existing Kafka Connect connectors.  Let’s get a little wacky and cover writing to Azure Blob Storage from Kafka as well as reading from Azure Blob Storage to Kafka.  In this case, “wacky” is a good thing, I hope.

Two types of references are available for your pleasure.  Well, let’s admit the phrase “for your pleasure” is just a saying, you know, and people don’t often mean it when they say it.  Well, I mean it and I hope you find this Kafka with Azure Blob Storage tutorial valuable.

This pleasurable Azure Kafka Azure tutorial contains step-by-step command references, sample configuration file examples for sink and source connectors as well as screencast videos of me demonstrating the setup and execution of the examples.

If you have questions, comments or suggestions for additional content, let me know in the comments below.

Note: It is expected that you have at least some working knowledge of Apache Kafka at this point, but you may not be an expert yet.

The overall goal here is to be focused on Azure Blob Storage Kafka integration through simple-as-possible examples.

Lastly, we are going to demonstrate the examples using Apache Kafka included in Confluent Platform instead of standalone Apache Kafka because the Azure Blob Storage sink and source connectors are commercial offerings from Confluent.

Here we go, let’s boogie.

Requirements

  1. An Azure account with enough permissions to be able to create storage accounts and container (more on this below)
  2. Azure CLI installed (Link in the Resources section below)
  3. Apache Kafka
  4. Download and install the Sink and Source Connectors into your Apache Kafka cluster (Links in the Resources section below)

Azure Blob Storage Setup

I’m going to paste the commands I ran to set up the Storage Container in Azure.  You will need to update the command variable values for your environment wherever appropriate.  Here’s a hint, at minimum, you need to change the tmcgrathstorageaccount and todd.   Those values are mine. You may wish to change other settings like the location variable as well.

1. az login

2. Create a resource group
az group create \
--name todd \
--location centralus

3. Create a storage account
az storage account create \
--name tmcgrathstorageaccount \
--resource-group todd \
--location centralus \
--sku Standard_LRS

For more on SKU types, https://docs.microsoft.com/en-us/rest/api/storagerp/srp_sku_types

4. Create a container
az storage container create \
--account-name tmcgrathstorageaccount \
--name kafka-connect-example \
--auth-mode login

5. For our Kafka Connect examples shown below, we need one of the two keys from the following command’s output.
az storage account keys list \
--account-name tmcgrathstorageaccount \
--resource-group todd \
--output table

Azure Blob Storage with Kafka Overview

When showing examples of connecting Kafka with Blob Storage, this tutorial assumes some familiarity with Apache Kafka, Kafka Connect, and Azure, as previously mentioned, but if you have any questions, just let me know.

Because both the Azure Blob Storage Sink and Source connectors are only available with a Confluent subscription or Confluent Cloud account, demonstrations will be conducted using Confluent Platform running on my laptop.  The goal of this tutorial is to keep things as simple as possible and provide a working example with the least amount of work for you.

Again, we will cover two types of Azure Kafka Blob Storage examples, so this tutorial is organized into two sections.  Section One is writing to Azure Blob Storage from Kafka with the Azure Blob Storage Sink Kafka Connector and the second section is an example of reading from Azure Blob Storage to Kafka.

Kafka Connect Azure Blob Storage Examples

Let’s kick things off with a demo.  In this demo, I’ll run through both the Sink and Source examples.

Now, that we’ve seen working examples, let’s go through the commands that were run and configurations described.

Kafka Connect Azure Blob Storage Sink Example

In the screencast, I showed how to configure and run Kafka Connect with Confluent distribution of Apache Kafka as mentioned above. Afterward seeing a working example, I’ll document each of the steps in case you would like to try.

As you saw if you watched the video, the demo assumes you’ve downloaded the Confluent Platform already. I downloaded the tarball and have my $CONFLUENT_HOME variable set to /Users/todd.mcgrath/dev/confluent-5.4.1

The demo uses an environment variable called AZURE_ACCOUNT_KEY for the Azure Blob Storage Key when using the Azure CLI.

You will need key1 or key2 values from Step 5 in the Azure Blob Storage setup section above and set it in your .properties files.

Steps in screencast

  1. confluent local start
  2. Show sink connector already installed (I previously installed with confluent-hub install confluentinc/kafka-connect-azure-blob-storage:1.3.2)
  3. Note how I copied over the azure-blob-storage-sink.propertiesfile from my Github repo.  The link to Github repo can be found below.
  4. Show updates needed for this file
  5. Show empty Azure Blob Storage container named kafka-connect-example with a command adjusted for your key az storage blob list --account-name tmcgrathstorageaccount --container-name kafka-connect-example --output table --account-key $AZURE_ACCOUNT_KEY
  6. Generate 10 events of Avro test data with ksql-datagen quickstart=orders format=avro topic=orders maxInterval=100 iterations=10  See the previous post on test data in Kafka for reference on ways to generate test data into Kafka.
  7. confluent local load azure-bs-sink -- -d azure-blob-storage-sink.properties
  8. az storage blob list --account-name tmcgrathstorageaccount --container-name kafka-connect-example --output table --account-key $AZURE_ACCOUNT_KEY
  9. confluent local unload azure-bs-sink
  10. The second example is JSON output, so edit azure-blob-storage-sink.properties file
  11. Generate some different test data with confluent local config datagen-pageviews -- -d ./share/confluent-hub-components/confluentinc-kafka-connect-datagen/etc/connector_pageviews.config (Again, see link in References section below for the previous generation of test data in Kafka post)
  12. Start the sink connector back up with confluent local load azure-bs-sink -- -d azure-blob-storage-sink.properties
  13. List out the new JSON objects landed into Azure with `az storage blob list --account-name tmcgrathstorageaccount --container-name kafka-connect-example --output table --account-key $AZURE_ACCOUNT_KEY`

 

Azure Kafka Connect Blob Storage Source Example

If you made it through the Blob Storage Sink example above, you may be thinking the Source example will be pretty easy.  And depending on what time you are reading this, that might be true.  However, if you are reading this in Spring 2020 or so, it’s not exactly straight forward, but it’s not a huge deal either.  I’ll show you what to do.

First, the Azure Blob Storage Source connector is similar to the other source examples in Amazon Kafka S3 as well as GCP Kafka Cloud Storage.  They are similar in a couple of ways.  One, if you are also using the associated sink connector to write from Kafka to S3 or GCS and you are attempting to read this data back into Kafka, you may run into an infinite loop where what is written back to Kafka is written to the cloud storage and back to Kafka and so on.  This means use the Azure Kafka Blob Storage Source connector independent of the sink connector or use an SMT to transform when writing back to Kafka.  I’ll cover both of these below.

Another similarity is Azure Kafka Connector for Blob Storage requires a Confluent license after 30 days.  This means we will use the Confluent Platform in the following demo.

Please note: as warned above, at the time of this writing, I needed to remove some jar files from the source connector in order to proceed.  See “Workaround” section below.

Steps in screencast

  1. confluent local start (I had already installed the Source connector and made the updates described in “Workaround” section below)
  2. I copied over azure-blob-storage-source.propertiesfile from my Github repo.  Link below.
  3. Show no existing topics with kafka-topics --list --bootstrap-server localhost:9092
  4. az storage blob list --account-name tmcgrathstorageaccount --container-name kafka-connect-example --output table --account-key $AZURE_ACCOUNT_KEY which shows existing data on Azure Blob Storage from the previous Sink tutorial
  5. Load the source connector confluent local load azure-bs-source -- -d azure-blob-storage-source.properties
  6. kafkacat -b localhost:9092 -t orders -s avro -r http://localhost:8081
  7. In this example, the destination topic did not exist, so let’s simulate the opposite.  What would we do if the destination topic does exist?
  8. Modify azure-blob-storage-source.properties file.  Uncomment SMT transformation section.

Workaround (Spring 2020)

When attempting to use kafka-connect-azure-blob-storage-source:1.2.2 connector
with Confluent 5.4.1, the connector fails with the following

Caused by: java.lang.ClassCastException: io.netty.channel.kqueue.KQueueEventLoopGroup can
not be cast to io.netty.channel.EventLoopGroup

It can be resolved if the Azure Blob Storage Source’s Netty libs are removed; i.e.
rm -rf ./share/confluent-hub-components/confluentinc-kafka-connect-azure-blob-storage-source/lib/netty-*

Kafka Connect Azure Blob Storage Source Example with Apache Kafka

The Azure Blob Storage Kafka Connect Source is a commercial offering from Confluent as described above, so let me know in the comments below if you find more suitable for self-managed Kafka.  Thanks.

 

Azure Kafka Examples with Azure Blob Storage Helpful Resources

 

Featured image https://pixabay.com/photos/barrel-kegs-wooden-heritage-cask-52934/

 

 

GCP Kafka Connect Google Cloud Storage Examples

GCP Kafka Connect Google Cloud Storage GCS

In this GCP Kafka tutorial, I will describe and show how to integrate Kafka Connect with GCP’s Google Cloud Storage (GCS).  We will cover writing to GCS from Kafka as well as reading from GCS to Kafka.  Descriptions and examples will be provided for both Confluent and Apache distributions of Kafka.

I’ll document the steps so you can run this on your environment if you want.  Or, you can watch me do it in videos below. Or both. Your call.

It is expected that you have some working knowledge of Apache Kafka at this point, but you may not be an expert yet.  If you know about running Kafka Connect in standalone vs distributed mode or how topics may be used to maintain state or other more advanced topics, that’s great.  This is more a specific use case how-to tutorial.

If you have any questions or concerns, leave them in the comments below.  I’m happy to help.  Well, I’m happier to help for cash money or Ethereum, cold beer, or bourbon.

The overall goal here is keeping it simple and get a demo working asap.  We can optimize afterward.  And in this case, when I say “we can optimize”, I really mean “you can optimize” for your particular use case.

All the examples of accompanying source code in GitHub and screencast videos on YouTube.

Let’s roll.

Kafka GCP Requirements

  1. GCP GCS bucket which you can write and read from. You knew that already though, right?  Because this is a tutorial on integrating Kafka with GCS.  If you didn’t know this, maybe you should leave now.
  2. Kafka
  3. GCP service account JSON credentials file.  How to create is described below and also see the Resources section below for a link to GCP Service Account info.

Kafka GCP GCS Overview

When showing examples of connecting Kafka with Google Cloud Storage (GCS) we assume familiarity with configuring Google GCS buckets for access.  There is a link for one way to do it in the Resources section below.  For setting up my credentials, I installed gcloudcreated a service account in the GCP console and downloaded the key file.  Then, I ran `gcloud auth active-service-account --key-file mykeyfile.json` to update my ~/.boto file.  Note: mykeyfile.json is just an example.  Your JSON key file will likely be named something different.  Whatever the name of this file, you will need it to perform the steps below.

One way you can verify your GCP setup for this tutorial is to successfully run gsutil ls from the command line.

If you are new to Kafka Connect if you find the previous posts on Kafka Connect tutorials helpful.  I’ll go through it quickly in the screencast below in case you need a refresher.

Again, we will cover two types of examples.  Writing to GCS from Kafka with the Kafka GCS Sink Connector and then an example of reading from GCS to Kafka.  Technically speaking, we will configure and demo the Kafka Connect GCS Source and Kafka Connect GCS Sink connectors.

Kafka Connect GCS Sink Example with Confluent

Let’s see a demo to start.  In the following screencast, I show how to configure and run Kafka Connect with Confluent distribution of Apache Kafka. Afterward, we’ll go through each of the steps to get us there.

As you’ll see, this demo assumes you’ve downloaded the Confluent Platform already. I downloaded the tarball and have my $CONFLUENT_HOME variable set to /Users/todd.mcgrath/dev/confluent-5.4.1

As you will see, you will need your GCP service account JSON file for GCP authentication.  Let’s cover writing both Avro and JSON to GCP in the following tv show screencast.

Steps in screencast

  1. confluent local start
  2. Note how I copied over gcs-sink.propertiesfile from my Github repo.  Link below.  Open the file and show JSON credentials reference and Avro output example
  3. Show sink connector already installed
  4. Show empty GCS bucket
  5. First example is Avro, so generate 100 events of test data with `ksql-datagen quickstart=orders format=avro topic=orders maxInterval=100 iterations=100`  See the previous post on test data in Kafka for reference.
  6. confluent local load gcs-sink — -d gcs-sink.properties
  7. gsutil ls gs://kafka-connect-example/ and GCP console to show new data is present
  8. confluent local unload gcs-sink
  9. Second example is JSON output, so edit gcs-sink.properties file
  10. confluent local config datagen-pageviews — -d ./share/confluent-hub-components/confluentinc-kafka-connect-datagen/etc/connector_pageviews.config (Again, see link in References section below for previous generation of test data in Kafka post)
  11. kafkacat -b localhost:9092 -t pageviews

Kafka Connect GCS Sink Example with Apache Kafka

And now with Apache Kafka.  The GCS sink connector described above is a commercial offering, so you might want to try something else if you are a self-managed Kafka user.  At the time of this writing, I couldn’t find an option.  If you know of one, let me know in the comments below.  Thanks.

GCP Kafka Connect GCS Source Example

What to do when we want to hydrate data into Kafka from GCS?  Well, my fine friend, we use a GCS Source Kafka connector.  Let’s go through some examples.

In the following demo, since Kafka Connect GCS Source connector requires Confluent license after 30 days, we’ll run through the example using Confluent.

Steps in screencast

  1. confluent local start
  2. Again, I copied over gcs-source.propertiesfile from my Github repo.  Link below.
  3. Show sink connector already installed
  4. `gsutil ls gs://kafka-connect-example/topics/orders` which shows existing data on GCS from the previous tutorial
  5. `kafka-topics --list --bootstrap-server localhost:9092` to show orders topic doesn’t exist
  6. confluent local load gcs-source — -d gcs-source.properties
  7. confluent local consume orders -- --value-format avro --from-beginningor kafkacat -b localhost:9092 -t orders -s avro -r http://localhost:8081

What about if the source topic like orders already exists?  From docs

“Be careful when both the Connect GCS sink connector and the GCS Source Connector use the same Kafka cluster, since this results in the source connector writing to the same topic being consumed by the sink connector. This causes a continuous feedback loop that creates an ever-increasing number of duplicate Kafka records and GCS objects. It is possible to avoid this feedback loop by writing to a different topic than the one being consumed by the sink connector.”

If you want to run with the SMT

  1. confluent local unload gcs-source
  2. Modify gcs-source.properties file.  Uncomment SMT transformation lines as described in ye ole TV show above.
  3. confluent local load gcs-source — -d gcs-source.properties
  4. kafka-topics --list --bootstrap-server localhost:9092
  5. You should see copy_of_orderstopic

Kafka Connect GCS Sink Example with Apache Kafka

The GCS source connector described above is also commercial offering from Confluent, so let me know in the comments below if you find more suitable for self-managed Kafka.  Thanks.

 

GCP Kafka Google Cloud Storage (GCS) Helpful Resources

 

 

Featured image https://pixabay.com/photos/splash-jump-dive-sink-swim-shore-863458/

 

 

Kafka Connect S3 Examples

Kafka Connect S3 Examples

In this Kafka Connect S3 tutorial, let’s demo multiple Kafka S3 integration examples.  We’ll cover writing to S3 from one topic and also multiple Kafka source topics. Also, we’ll see an example of an S3 Kafka source connector reading files from S3 and writing to Kafka will be shown.

Examples will be provided for both Confluent and Apache distributions of Kafka.

I’ll document the steps so you can run this on your environment if you want.  Or, you can watch me do it in videos below. Or both. Your call.

Now, to set some initial expectations, these are just examples and we won’t examine Kafka Connect in standalone or distributed mode or how the internals of Kafka Consumer Groups assist Kafka Connect.

If you have any questions or concerns, leave them in the comments below.

The overall goal will be keeping it simple and get working examples asap.  We can optimize afterward.

Accompanying source code is available in GitHub (see Resources section for link) and screencast videos on YouTube.

Let’s get started.

Kafka S3 Requirements

  1. S3 environment which you can write and read from. (I mean, “no duh”, or as some folks say, “no doy”.  What do you say?)
  2. MySQL (if you want to use the sample source data; described more below)
  3. Kafka (examples of both Confluent and Apache Kafka are shown)

Kafka S3 Setup

As you’ll see in the next screencast, this first tutorial utilizes the previous Kafka Connect MySQL tutorial.  In fact, if you are new to Kafka Connect, you may wish to reference this previous post on Kafka Connector MySQL examples before you start.  I’ll go through it quickly in the screencast below in case you need a refresher.

There are essentially two types of examples below.  One, an example of writing to S3 from Kafka with Kafka S3 Sink Connector and two, an example of reading from S3 to Kafka.  In other words, we will demo Kafka S3 Source examples and Kafka S3 Sink Examples.

Also, there is an example of reading from multiple Kafka topics and writing to S3 as well.

For authorization to S3, I’m going to show using the credentialsfile approach in the screencast examples.  For more information on S3 credential options, see the link in the Resources section below.

Kafka Connect S3 Sink Example with Confluent

Do you ever the expression “let’s work backward from the end”?  Well, you know what? I invented that saying! Actually, I’m kidding, I didn’t invent it.

Anyhow, let’s work backward from the end result in the following screencast. Then, we’ll go through each of the steps to get us there.

Again, we will start with Apache Kafka in Confluent example.  The next example uses the standalone Apache Kafka.

 

Here are the steps (more or less) in the above screencast

  1. Install S3 sink connector with `confluent-hub install confluentinc/kafka-connect-s3:5.4.1`
  2. confluent local start
  3. Optional `aws s3 ls kafka-connect-example` to verify your ~/.aws/credentials file
  4. Copy, modify s3-sink.properties file and load it with `confluent local load s3-sink — -d s3-sink.properties`
  5. List topics `kafka-topics --list --bootstrap-server localhost:9092`
  6. Load `mysql-bulk-source` source connector from the previous MySQL Kafka Connect tutorial with the command `confluent local load mysql-bulk-source — -d mysql-bulk-source.properties`
  7. List topics and confirm the mysql_* topics are present
  8. Show S3 Console with new files
  9. Review the S3 sink connector configuration

 

Kafka Connect S3 Sink Example with Apache Kafka

And now, let’s do it with Apache Kafka. Here’s a screencast of running the S3 sink connector with Apache Kafka.

Here are the steps (more or less) in the above screencast

  1. Start Zookeeper `bin/zookeeper-server-start.sh config/zookeeper.propties`
  2. Start Kafka `bin/kafka-server-start.sh config/server.properties`
  3. S3 sink connector is downloaded, extracted and other configuration
  4. Optional `aws s3 ls kafka-connect-example` to verify your ~/.aws/credentials file
  5. List topics `bin/kafka-topics.sh --list --bootstrap-server localhost:9092`
  6. Show modified s3-sink.properties file in my current directory (and the mysql-bulk-source.properties file as well)
  7. List topics `kafka-topics --list --bootstrap-server localhost:9092`
  8. Load `mysql-bulk-source` source connector from the previous MySQL Kafka Connect tutorial and also S3 sink connector with one command `bin/connect-standalone.sh config/connect-standalone.properties mysql-bulk-source.properties s3-sink.properties`
  9. List topics and confirm the mysql_* topics are present
  10. Show S3 Console with new files
  11. Review the S3 sink connector configuration

Kafka Connect S3 Sink Example with Multiple Source Topics

The previous examples showed streaming to S3 from a single Kafka topic.  What if you want to stream multiple topics from Kafka to S3?  Have no fear my internet friend, it’s easy with the topics.regexsetting and shown in the following screencast.  Sorry, but I just can’t do it.  Sometimes, and I just hate to admit this, but I just don’t have the energy to make all these big time TV shows.

If you need a TV show, let me know in the comments below and I might reconsider, but for now, this is what you need to do.

Here are the steps (more or less) of what I would have done in the Big Time TV Show (aka: a screencast) for sinking multiple Kafka topics into S3

  1. Update your s3-sink.properties file — comment out topics variable and uncomment the topics.regexvariable.
  2. Unload your S3 sink connector if it is running
  3. Load the S3 sink connector
  4. Check out S3 — you should see all your topic data whose name starts with mysql

Kafka Connect S3 Source Example

When it comes to ingesting reading from S3 to Kafka with a pre-built Kafka Connect connector, we might be a bit limited.  At the time of this writing, there is a Kafka Connect S3 Source connector, but it is only able to read files created from the Connect S3 Sink connector.  We used this connector in the above examples.  From the Source connector’s documentation--

“The Kafka Connect Amazon S3 Source Connector provides the capability to read data exported to S3 by the Apache Kafka® Connect S3 Sink connector and publish it back to a Kafka topic”

Now, this might be completely fine for your use case, but if this is an issue for you, there might be a workaround.  As a possible workaround, there are ways to mount S3 buckets to a local files system using things like s3fs-fuse.  From there, it should be possible to read files into Kafka with sources such as the Spooldir connector.  Let me know in the comments.

Because Kafka Connect S3 Source connector requires a Confluent license after 30 days, we’ll run through the following demo using Confluent.

Here are the steps (more or less) in the above screencast

  1. Install S3 sink connector with `confluent-hub install confluentinc/kafka-connect-s3-source:1.2.2`
  2. confluent local start
  3. Optional `aws s3 ls kafka-connect-example` to verify your ~/.aws/credentials file
  4. Copy and modify s3-source.properties file
  5. List topics `kafka-topics --list --bootstrap-server localhost:9092` and highlight how the mysql_* topics are present
  6. Load S3 source connector with `confluent local load s3-source — -d s3-source.properties`
  7. List topics and confirm the copy_of* topics are present
  8. Review the S3 sink connector configuration

Kafka Connect S3 Helpful Resources

 

Featured image https://pixabay.com/photos/old-bottles-glass-vintage-empty-768666/

 

Kafka Connect mySQL Examples

Kafka Connect mySQL examples

In this Kafka Connect mysql tutorial, we’ll cover reading from mySQL to Kafka and reading from Kafka and writing to mySQL.   Let’s run this on your environment.

Now, it’s just an example and we’re not going to debate operations concerns such as running in standalone or distributed mode.  The focus will be keeping it simple and get it working.  We can optimize afterward.

We may cover Kafka Connect transformations or topics like Kafka Connect credential management in a later tutorial, but not here.  I hope you don’t mind.  You see, I’m a big shot tutorial engineer and I get to make the decisions around here.  So, when I write “I hope you don’t mind”, what I really mean is that I don’t care.  (Well, I’m just being cheeky now.  If you have questions, comments or ideas for improvement, please leave them below.)

REQUIREMENTS

This is what you’ll need if you’d like to perform the steps in your environment.  Adjust as necessary.  You can do that in your environment because you’re the boss there.

In this Kafka Connect with mySQL tutorial, you’ll need

  • running Kafka with Connect and Schema Registry
  • mySQL
  • mySQL JDBC driver

SETUP

I’ll run through this in the screencast below, but this tutorial example utilizes the mySQL Employees sample database.  The link to the download is included in the References section below.

The mySQL JDBC driver needs to be downloaded and located in the Confluent classpath.  I’ll also demonstrate in this in the screencast, but for now, just take my word for it that the jar is in share/java/kafka-connect-jdbc of your Confluent root dir.  Yeah, trust me.

KAFKA CONNECT MYSQL SOURCE EXAMPLE

Do you ever the expression “let’s work backwards”.  I hear it all the time now.  Anyhow, let’s work backwards and see the end result in the following screencast and then go through the steps it took to get there.

To recap, here are the key aspects of the screencast demonstration (Note:  since I recorded this screencast above, the Confluent CLI has changed with a confluent local Depending on your version, you may need to add local immediately after confluent for example confluent local status connectors

    • Kafka (connect, schema registry) running in one terminal tab
    • mysql jdbc driver downloaded and located in share/java/kafka-connect-jdbc (note about needing to restart after download)
    • Sequel PRO with mySQL -- imported the employees db
    • list the topics `bin/kafka-topics --list --zookeeper localhost:2181`
    • `bin/confluent status connectors`
    • `bin/confluent load mysql-bulk-source -d mysql-bulk-source.properties`
    • `bin/confluent status connectors` or `bin/confluent status mysql-bulk-source`
    • list the topics again `bin/kafka-topics --list --zookeeper localhost:2181` and see the tables as topics
    • `bin/kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic mysql-departments --from-beginning`

Be careful copy-and-paste any of the commands above with double hyphens “--”  This is changed to em dash sometimes and it can cause issues.

KAFKA CONNECT MYSQL CONFIGURATION STEPS

To run the example shown above, you’ll need to perform the following in your environment.

Kafka and associated components like connect, zookeeper, schema-registry are running.  This will be dependent on which flavor of Kafka you are using.  I’m using Confluent Open Source in the screencast.  If you need any assistance with setting up other Kafka distros, just let me know.

Regardless of Kafka version, make sure you have the mySQL jdbc driver available in the Kafka Connect classpath.  You can add it to this classpath by putting the jar in <YOUR_KAFKA>/share/java/kafka-connect-jdbc directory.

Speaking of paths, many of the CLI commands might be easier or more efficient to run if you add the appropriate `bin/` directory to your path.  I do not have that set in my environment for this tutorial.

KAFKA CONNECT MYSQL INGEST CONFIGURATION

As my astute readers surely saw, the connector’s config is controlled by the `mysql-bulk-source.properties` file.  You can create this file from scratch or copy or an existing config file such as the sqllite based one located in `etc/kafka-connect-jdbc/`

I’ve also provided sample files for you in my github repo.  See link in References section below.

Outside of regular JDBC connection configuration, the items of note are `mode` and `topic.prefix`.  For mode, you have options, but since we want to copy everything it’s best just to set to `bulk`.  Other options include timestamp, incrementing and timestamp+incrementing.  See link for config options below in Reference section.

SECTION CONCLUSION

Ok, we did it.  We ingested mySQL tables into Kafka using Kafka Connect.  That’s a milestone and we should be happy and maybe a bit proud.  Well, let me rephrase that.  I did it.  I know that is true.  Did you do it too?  I hope so because you are my most favorite big-shot-engineer-written-tutorial-reader ever.

Should we stop now and celebrate?  I know what you’re thinking.  Well, maybe.  And to that I say…. ok, let’s do it.  Just kidding.  It’s too late to stop now.  Let’s keep goin you fargin bastage.

KAFKA CONNECT MYSQL SINK EXAMPLE

Now that we have our mySQL sample database in Kafka topics, how do we get it out?  Rhetorical question.  Let’s configure and run a Kafka Connect Sink to read from our Kafka topics and write to mySQL.  Again, let’s start at the end.  Here’s a screencast writing to mySQL from Kafka using Kafka Connect

Once again, here are the key takeaways from the demonstration

    • Kafka running in one terminal tab
    • Sequel PRO with mySQL -- created a new destination database and verified tables and data created
    • list the topics `bin/kafka-topics --list --zookeeper localhost:2181`
    • `bin/confluent load mysql-bulk-sink -d mysql-bulk-sink.properties`
  • `bin/confluent status connectors` or `bin/confluent status mysql-bulk-sink`

KAFKA CONNECT MYSQL SINK CONFIGURATION

Not much has changed from the first source example.  The one thing to call out is the `topics.regex` in the mysql-bulk-sink.properties file.  Using this setting, it’s possible to set a regex expression for all the topics which we wish to process.

CONCLUSION

I hope you enjoyed your time here.  If you did, throw a couple of quarters in the tip jar if you’d like.  Or let me know if you have any questions or suggestions for improvement.  Feedback always welcomed.  Well, money is welcomed more, but feedback is kinda sorta welcomed too.

REFERENCES

SOURCE CONNECTOR DOCS

https://docs.confluent.io/current/connect/kafka-connect-jdbc/source-connector/index.html

https://docs.confluent.io/current/connect/kafka-connect-jdbc/source-connector/source_config_options.html#jdbc-source-configs

SINK CONNECTOR DOCS

https://docs.confluent.io/current/connect/kafka-connect-jdbc/sink-connector/index.html

https://docs.confluent.io/current/connect/kafka-connect-jdbc/sink-connector/sink_config_options.html

EXAMPLE KAFKA CONNECT SOURCE AND SINK CONFIG FILES

https://github.com/tmcgrath/kafka-connect-examples/tree/master/mysql

MYSQL EMPLOYEES SAMPLE DATABASE

https://dev.mysql.com/doc/employee/en/

Additional Kafka Tutorials

Kafka Tutorials

Kafka Streams Tutorials

Image credit https://pixabay.com/en/wood-woods-grain-rings-100181/