Kafka Connect REST API Essentials

Kafka Connect API Examples Swagger Screenshot

The Kafka Connect REST API endpoints are used for both administration of Kafka Connectors (Sinks and Sources) as well as Kafka Connect service itself.  In this tutorial, we will explore the Kafka Connect REST API with examples.  Before we dive into specific examples, we need to set the context with an overview of Kafka Connect operating modes.  Next, we need to ensure we all know there are two distinct areas by which Kafka Connect itself as well as connectors are configured.  I’ve noticed this can often be a source of confusion for folks.

Table of Contents

Overview

Kafka Connect may be run in either “standalone” or “distributed” mode.  Standalone is often easier for folks to get started with and generally speaking, can be appropriate for lightweight needs in production environments.  Running Kafka Connect in distributed mode is the recommended approach when deploying in production.

This brings us to our first key takeaway: regardless if running in “standalone” or “distributed” mode, the Kafka Connect REST API is available to use for connector administration tasks.

When initializing or starting Kafka Connect instances in either standalone or distributed mode, an executable is called with a configuration file location argument.  This configuration file is used for attributes of the instance itself such as the bootstrap.servers or group.id, as well as configuration which can affect the individual connectors themselves such as key.converter and value.converter settings.

This is our second key takeaway: there are two separate mechanisms by which configuration of Kafka Connect can occur — in a configuration file when initializing the standalone or distributed instance as well as the REST API of the instance.

If either of these two key points are not clear, then I recommend you check out Kafka Connect Standalone vs. Distributed Mode and the What is Kafka Connect tutorials on this site.  I understand you may not trust me yet and not check either one of these recommended two tutorials, but let me tell you, I wouldn’t steer you wrong.  Because, I know you are at least a 3 or 4 out of 10 on the big deal blog readers scale.  At least 3 or 4!

Kafka Connect REST API Setup

In my environment, I’ve downloaded and extracted Apache Kafka 2.4 and have a Kafka cluster running in a Docker container.  The steps to recreate are:

  1. Start Docker (if it is not already running)
  2. git clone https://github.com/conduktor/kafka-stack-docker-compose.git
  3. cd kafka-stack-docker-compose
  4. docker-compose -f zk-single-kafka-multiple.yml up -d (wait 5-30 seconds for everything to come online)
  5. In another terminal window, go to the bin/ directory of downloaded and extracted Apache Kafka.  For me, that’s ~/dev/kafka_2.11-2.4.1/bin.
  6. Run `./kafka-topics.sh –list –bootstrap-server localhost:9092` to ensure all is well
  7. Start Kafka Connect with the default File Sink with `./connect-standalone.sh ../config/connect-standalone.properties ../config/connect-file-sink.properties`

Let’s stop right here because this last step is an example of my earlier point.  As mentioned, there are two areas where Kafka Connect is configured.  In Step 7, the second argument which passes in the connect-standalone.propertiesfile is the first area.  The second area is the REST API.

Note: standalone mode is different than distributed because it requires a second argument to initialize a particular sink or source.  Running in distributed mode does not require the second argument of configuration.  Remember I recommended the Kafka Connect Standalone vs. Distributed Mode tutorial is this doesn’t click yet.

Kafka Connect REST API Examples

We’ll use curl, but should translate easily to Postman or your preferred HTTP client.

Kafka Connect List Active Connectors

curl -s http://localhost:8083/connectors
["local-file-sink"]

As expected just the file sink connector is running.

Kafka Connect Describe Connector

curl -s http://localhost:8083/connectors/local-file-sink | jq '.'
{
  "name": "local-file-sink",
  "config": {
    "connector.class": "FileStreamSink",
    "file": "test.sink.txt",
    "tasks.max": "1",
    "topics": "connect-test",
    "name": "local-file-sink"
  },
  "tasks": [
    {
      "connector": "local-file-sink",
      "task": 0
    }
  ],
  "type": "sink"
}

Got it. 1 running task, 1 task max, it’s a sink and not a source. Subscribed to “connect-test” topic.

Kafka Connect Get Connector Config

curl -s http://localhost:8083/connectors/local-file-sink/config | jq '.'
{
  "connector.class": "FileStreamSink",
  "file": "test.sink.txt",
  "tasks.max": "1",
  "topics": "connect-test",
  "name": "local-file-sink"
}

Ok, so now we have just the config.

Kafka Connect Update Connector Config

curl -X PUT http://localhost:8083/connectors/local-file-sink/config \
>       -H "Content-Type: application/json" \
>       -d '{ "connector.class": "FileStreamSink", "file": "test-2.sink.txt", "tasks.max": "2", "topics": "connect-test-new", "name": "local-file-sink" }' | jq '.'
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   413  100   272  100   141   5130   2659 – :--: – – :--: – – :--: –  8787
{
  "name": "local-file-sink",
  "config": {
    "connector.class": "FileStreamSink",
    "file": "test-2.sink.txt",
    "tasks.max": "2",
    "topics": "connect-test-new",
    "name": "local-file-sink"
  },
  "tasks": [
    {
      "connector": "local-file-sink",
      "task": 0
    },
    {
      "connector": "local-file-sink",
      "task": 1
    }
  ],
  "type": "sink"
}

Use PUT method, set header to JSON, and send in the new config in JSON. Get back the results that the configuration change succeeded. But, did it?

Kafka Connect Get Connector Status

curl -s http://localhost:8083/connectors/local-file-sink/status | jq '.'
{
  "name": "local-file-sink",
  "connector": {
    "state": "RUNNING",
    "worker_id": "10.155.153.76:8083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "RUNNING",
      "worker_id": "10.155.153.76:8083"
    },
    {
      "id": 1,
      "state": "RUNNING",
      "worker_id": "10.155.153.76:8083"
    }
  ],
  "type": "sink"
}

Looks good state is RUNNING and now running two tasks instead of 1.

Kafka Connect Pause Connector

curl -X PUT http://localhost:8083/connectors/local-file-sink/pause | jq '.'
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 – :--: – – :--: – – :--: –     0
~ $ curl -s http://localhost:8083/connectors/local-file-sink/status | jq '.'
{
  "name": "local-file-sink",
  "connector": {
    "state": "PAUSED",
    "worker_id": "10.155.153.76:8083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "PAUSED",
      "worker_id": "10.155.153.76:8083"
    },
    {
      "id": 1,
      "state": "PAUSED",
      "worker_id": "10.155.153.76:8083"
    }
  ],
  "type": "sink"
}

Great. Attempted to pause it and verified it.

Kafka Connect Restart Connector

curl -X PUT http://localhost:8083/connectors/local-file-sink/resume | jq '.'
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 – :--: – – :--: – – :--: –     0
~ $ curl -s http://localhost:8083/connectors/local-file-sink/status | jq '.'
{
  "name": "local-file-sink",
  "connector": {
    "state": "RUNNING",
    "worker_id": "10.155.153.76:8083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "RUNNING",
      "worker_id": "10.155.153.76:8083"
    },
    {
      "id": 1,
      "state": "RUNNING",
      "worker_id": "10.155.153.76:8083"
    }
  ],
  "type": "sink"
}

All we’re back and running

Kafka Connect Restart Connector Task

What if we had a particular task we wanted to restart?

curl -X POST http://localhost:8083/connectors/local-file-sink/tasks/1/restart | jq '.'
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 – :--: – – :--: – – :--: –     0
~ $ curl -s http://localhost:8083/connectors/local-file-sink/tasks/1/status | jq '.'
{
  "id": 1,
  "state": "RUNNING",
  "worker_id": "10.155.153.76:8083"
}

As we can see in the above sample, the answer is we can restart a specific task and query the status of specific connector task as well.

Hope these examples help! Again, you don’t have to use curl. Let me know if any feedback.

Further Resources

Leave a Reply

Your email address will not be published.