Running Kafka Connect [Standalone vs Distributed Mode Examples]


One of the many benefits of running Kafka Connect is the ability to run single or multiple workers in tandem.  This is referred to as running Kafka Connect in Standalone or Distributed mode.

Running multiple workers in distributed mode provides a way for horizontal scale-out which leads to increased capacity, automated resiliency, or both. 

For resiliency, this means answering the question, “what happens if a particular worker goes offline for any reason?”.  Horizontal scale and failover resiliency are available out-of-the-box without a requirement to run another cluster.

In this post, we’ll go through examples of running Kafka Connect in both Standalone and Distributed mode.

Distributed mode is recommended when running Kafka Connect in production.

In this Distributed and Standalone in Kafka Connect tutorial, I assume you are familiar with Kafka Connect constructs such as workers and tasks, but if you need a refresher check Kafka Connect overview.

Table of Contents

Standalone and Distributed Mode Overview

To review, Kafka connectors, whether sources or sinks, run as their own JVM processes called “workers”.  As mentioned, there are two ways workers may be configured and run: Standalone and Distributed.

Now, regardless of mode, Kafka connectors may be configured to run one more or tasks within their individual processes.

For example, a Kafka Connector Source may be configured to run 10 tasks as shown in the JDBC source example here https://github.com/tmcgrath/kafka-connect-examples/blob/master/mysql/mysql-bulk-source.properties.

I wanted to make note of tasks vs. Distributed mode to avoid possible confusion.  Multiple tasks do provide some parallelism or scaling out, but it is a different construct than running in Distributed mode. 

Ok, good, that’s out of the way, let’s proceed.

Kafka Connect Standalone Mode

Running Kafka Connect in Standalone makes things really easy to get started.  Most of the examples on this site and others so far show running connectors in Standalone. In Standalone mode, a single process executes all connectors and their associated tasks.

There are cases when Standalone mode might make sense in Production.  For example, if you are log shipping from a particular host, it could make sense to run your log source in standalone mode on the host with the log(s) you are interested in ingesting into Kafka. 

But, for reasons already mentioned, when in production, you should run Kafka Connect in Distributed mode.

As you can imagine, Standalone scalability is limited.  Also, there is no automated fault-tolerance out-of-the-box when a connector goes offline.  (Well, you could build an external automated monitoring process to restart failed Standalone connectors I guess, but that’s outside of scope here.  And also, why?  I mean, if you want automated failover, just utilize running in Distributed mode out-of-the-box.)

Kafka Connect Distributed Mode

Running Kafka Connect in Distributed mode runs Connect workers on one or multiple nodes.  When running on multiple nodes, the coordination mechanics to work in parallel does not require an orchestration manager such as YARN. 

Let me stop here because this is an important point.  In other words, when running in a “cluster” of multiple nodes, the need to coordinate “which node is doing what?” is required.  This may or may not be relevant to you.  For me personally, I came to this after Apache Spark, so no requirement for an orchestration manager interested me.

The management of Connect nodes coordination is built upon Kafka Consumer Group functionality which was covered earlier on this site.  If Consumer Groups are new to you, check out that link first before proceeding here.

As you would expect with Consumer Groups, Connect nodes running in Distributed mode can evolve by adding or removing more nodes.  Like Consumer Group Consumers, Kafka Connect nodes will be rebalanced if nodes are added or removed.  As touched upon earlier in this post, this ability to expand or contract the number of worker nodes provides both the horizontal scale and fault tolerance inherent in Distributed mode.  Again, this is the same expectation you have when running Kafka Consumers in Consumer Groups.

If this is new to you, my suggestion is to try to keep-it-simple and go back to the foundational principles.  To understand Kafka Connect Distributed mode, spend time exploring Kafka Consumer Groups commands.

Kafka Connect Standalone and Distributed Mode Examples Overview

Let’s run examples of a connector in Standalone and Distributed mode.  To run in these modes, we are going to run a multi-node Kafka cluster in Docker.  When we run the example of Standalone, we will configure the Standalone connector to use this multi-node Kafka cluster.  And, when we run a connector in Distributed mode, yep, you guessed it, we’ll use this same cluster.

I made a TV show running through the examples here.  Ok, some folks might call it a screencast, but TV show sounds better.

I’m hoping it’s helpful for you to watch someone else run these examples if you are attempting to run the examples in your environment.

Here’s me running through the examples in the following “screencast”

Need to learn more about with Kafka Connect?  Check out my Kafka Connect course.

Kafka Connect Standalone and Distributed Mode Example Requirements

To run these examples in your environment, the following are required to be installed and/or downloaded.

  1. Docker installed
  2. Docker-compose installed
  3. Confluent Platform or Apache Kafka downloaded and extracted (so we have access to the CLI scripts like kafka-topics or kafka-topics.sh)

Kafka Cluster Setup

To run these Standalone and Distributed examples, we need access to a Kafka cluster.  It can be Apache Kafka or Confluent Platform.  I’m going to use a docker-compose example I created for the Confluent Platform.

As previously mentioned and shown in the Big Time TV show above, the Kafka cluster I’m using for these examples a multi-broker Kafka cluster in Docker.  I’m using Confluent based images.  The intention is to represent a reasonable, but lightweight production Kafka cluster having multi brokers but not too heavy to require multiple Zookeeper nodes.  If you are not using this cluster, you’ll just have to make configuration adjustments for your environment in the steps below.

  1. Download or clone the repo with the docker-compose.yml file available here https://github.com/tmcgrath/docker-for-demos/tree/master/confluent-3-broker-cluster
  2. Run docker-compose up in the directory which contains this docker-compose.yml file.
  3. Confirm you have external access to the cluster by running kafka-topics --list --bootstrap-server localhost:29092 or kafkacat -b localhost:19092 -L.  Again, running either of these examples presumes you have downloaded, expanded, a distribution of Apache Kafka or Confluent Platform as described in the Requirements section above.  (A note on my environment.  I have CONFLUENT_HOME set as an environment variable and $CONFLUENT_HOME/bin in my path and havekafkacatinstalled.  Your environment may vary.  For example, if you are using Apache Kafka for CLI scripts, use kafka-topics.shinstead of kafka-topics.)

Again, see the…. screencast, pfft, I mean, Big Time TV show, above if you’d like to see a working example of these steps.

If you want to run an Apache Kafka cluster instead of Confluent Platform, you might want to check out https://github.com/wurstmeister/kafka-docker or let me know if you have an alternative suggestion.

Kafka Connect Standalone Example

Both Confluent Platform and Apache Kafka include Kafka Connect sinks and source examples for both reading and writing to files.  For our first Standalone example, let’s use a File Source connector. 

I’m going to run through using the Confluent Platform, but I will note how to translate the examples to Apache Kafka next.

Kafka Connect Example Standalone

If your external cluster is running as described above, go to the Confluent root directory in a terminal.  (You may have already set this to CONFLUENT_HOME environment variable).

  1. Kafka cluster is running.  See above.
  2. In a terminal window, cd to where you extracted Confluent Platform.  For my environment, I have this set to a CONFLUENT_HOME environment variable.
  3. Copy etc/kafka/connect-standalone.properties to the local directory; i.e. cp etc/kafka/connect-standalone.properties . (We are going to use a new connect-standalone.properties, so we can customize it for this example only.  This isn’t a requirement.  We could use the default etc/kafka/connect-standalone.properties, but I want to leave it so I can run confluent localcommands, as described in other Kafka Connect examples on this site.)
  4. Open this new connect-standalone.properties file in your favorite editor and change bootstrap.servers value to localhost:19092
  5. Also, make sure the plugin.pathvariable in this connect-standalone.properties file includes the directory where File Source connector jar file resides. For my environment, it is set to /Users/todd.mcgrath/dev/confluent-5.4.1/share/java/kafka.  By the way, my CONFLUENT_HOME var is set to /Users/todd.mcgrath/dev/confluent-5.4.1
  6. Create a test.txtfile with some sample data
  7. Run bin/connect-standalone ./connect-standalone.properties etc/kafka/connect-file-source.properties to start the File Source connector.
  8. If all went well, you should have a connect-test topic now.  You can verify with kafka-topics --list --bootstrap-server localhost:19092
  9. Confirm events are flowing with the console consumer; i.e bin/kafka-console-consumer --bootstrap-server localhost:19092 --topic connect-test

Cool.

Kafka Connect Standalone Configuration

A fundamental difference between Standalone and Distributed appears in this example.  Where and how offsets are stored in the two modes are completely different.  As we’ll see later on in the Distributed mode example, Distributed mode uses Kafka for offset storage, but in Standalone, we see that offsets are stored locally when looking at the connect-standalone.properties file.

Notice the following configuration in particular–

offset.storage.file.filename=/tmp/connect.offsets

Apache Kafka Differences

If you were to run these examples on Apache Kafka instead of Confluent, you’d need to run connect-standalone.sh instead of connect-standalone and the locations of the default locations of connect-standalone.properties, connect-file-source.properties, and the File Source connector jar (for setting in plugins.path) will be different. 

You’ll need to adjust accordingly.  Let me know if you have any questions or concerns.

Kafka Connect Distributed Example

You’ll notice differences running connectors in Distributed mode right away.  To start, you don’t pass configuration files for each connector to startup.  Rather, you start up the Kafka Connect Distributed process and then manage via REST calls.  You’ll see in the example, but first let’s make sure you are setup and ready to go.

Kafka Connect Example Distributed

Kafka Connect Distributed Example – Part 1 – Setup

The following steps presume you are in a terminal at the root drive of your preferred Kafka distribution.  Again, I’m going to use Confluent, so my CLI scripts do not have .sh at the end.  Adjust yours as necessary.

First, pre-create 3 topics in the Dockerized cluster for Distributed mode as recommended in the documentation.  I’ll explain why afterward. If you running the Dockerized 3 node cluster described above, change the port from 9092 to 19092 such as:

  • bin/kafka-topics --create --bootstrap-server localhost:19092 --topic connect-configs --replication-factor 3 --partitions 1 --config cleanup.policy=compact
  • bin/kafka-topics --create --bootstrap-server localhost:19092 --topic connect-offsets --replication-factor 3 --partitions 50 --config cleanup.policy=compact
  • bin/kafka-topics --create --bootstrap-server localhost:19092 --topic connect-status --replication-factor 3 --partitions 10 --config cleanup.policy=compact
  • Verify all three topics are listed with– kafka-topics --list --bootstrap-server localhost:19092

Next, cp over the example properties file for Distributed mode so we can customize for this example.  This is similar to what we did above in Standalone mode.

cp etc/kafka/connect-distributed.properties ./connect-distributed-example.properties

Edit this connect-distributed-example.properties in your favorite editor.

  • Change bootstrap.servers=localhost:9092 to bootstrap.servers=localhost:19092
  • Ensure your plugin.path is set to a path that contains the connector JAR you want to run.  Similar to the above, we are going to use File Connector source which is included by default in Confluent.

Ensure you have a test.txt file in your local directory.  The same one from above is fine.

Finally, we need a JSON file with our connector configuration we wish to run in Distributed mode.  Create a connect-file-source.json file and cut-and-paste content into the file from here https://gist.github.com/tmcgrath/794ff6c4922251f2859264abf39866ae

So, you should have a connect-file-source.json file.

Ok, to review the Setup, at this point you should have

  • 3 topics created
  • connect-distributed-example.properties file
  • test.txt file
  • connect-file-source.json file

If you have these 4 things, you, my good-times Internet buddy, are ready to roll.

Kafka Connect Distributed Example – Part 2 – Running a Simple Example

  • Startup Kafka Connect in Distributed — bin/connect-distributed connect-distributed-example.properties
  • Ensure this Distributed mode process you just started is ready to accept requests for Connector management via the Kafka Connect REST interface.  I’m going to use curlbut you can use whatever REST client you’d prefer.  The output from the commandcurl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors should be 200.
  • Make REST call to start a new File Source Connector with curl -s -X POST -H 'Content-Type: application/json' --data @connect-file-source.json http://localhost:8083/connectors

Should be good-to-go now, but let’s verify.

  • Is the “local-file-source” connector listed when issuing curl -s http://localhost:8083/connectors
  • In a separate terminal window, is the connect-test topic now listed; i.e. bin/kafka-topics --list --bootstrap-server localhost:19092
  • And can we consume from it?  bin/kafka-console-consumer --bootstrap-server localhost:19092 --topic connect-test.  Add more data to test.txt to watch it in real-time as I did in the screencast shown above.

If verification is successful, let’s shut the connector down with

  • curl -s -X PUT http://localhost:8083/connectors/local-file-source/pause
  • curl -s -X GET http://localhost:8083/connectors/local-file-source/status
  • curl -s -X DELETE http://localhost:8083/connectors/local-file-source
  • curl -s http://localhost:8083/connectors should be empty

Kill the Distributed worker process if you’d like.

So, that’s it!  I hope you did it.  You now know how to run Kafka Connect in Distributed mode.

To recap, here’s what we learned in this section

  • Unlike Standalone, running Kafka Connect in Distributed mode stores the offsets, configurations, and task statuses in Kafka topics.  As recommended, we pre-created these topics rather than aut0-create.  In the Dockerized cluster used above, you may have noticed it allows auto-create of topics.  We didn’t do that.  No way.  Not us.
  • As described, Distributed builds on the mechanics of Consumer Groups, so no surprise to see a group.id variable in the connect-distributed-examples.properties file.
  • To manage connectors in Distributed mode, we use the REST API interface.  This differs from Standalone where we can pass in configuration properties file from CLI.

For a deeper dive into REST options, see Kafka Connect REST API examples.

Kafka Connect Tutorial Updates

Writing this post inspired me to add resources for running in Distributed mode.

For the Kafka MySQL JDBC tutorial, I added JSON examples to GitHub in case you want to run in Distributed mode.  See https://github.com/tmcgrath/kafka-connect-examples/tree/master/mysql for access.

For the Kafka S3 examples, I also added JSON examples to GitHub in case you want to run in Distributed mode.  See https://github.com/tmcgrath/kafka-connect-examples/tree/master/s3 for access.

For the Kafka Azure tutorial, there is a JSON example for Blob Storage Source available on the Confluent site at https://docs.confluent.io/current/connect/kafka-connect-azure-blob-storage/source/index.html#azure-blob-storage-source-connector-rest-example which might be helpful.

For providing JSON for the other Kafka Connect Examples listed on GitHub, I will gladly accept PRs.

Kafka Connect Standalone and Distributed Mode References

See also  GCP Kafka Connect Google Cloud Storage Examples
About Todd M

Todd has held multiple software roles over his 20 year career. For the last 5 years, he has focused on helping organizations move from batch to data streaming. In addition to the free tutorials, he provides consulting, coaching for Data Engineers, Data Scientists, and Data Architects. Feel free to reach out directly or to connect on LinkedIn

Leave a Comment