The Kafka Connect REST API endpoints are used for both administration of Kafka Connectors (Sinks and Sources) as well as Kafka Connect service itself. In this tutorial, we will explore the Kafka Connect REST API with examples. Before we dive into specific examples, we need to set the context with an overview of Kafka Connect operating modes. Next, we need to ensure we all know there are two distinct areas by which Kafka Connect itself as well as connectors are configured. I’ve noticed this can often be a source of confusion for folks.
Table of Contents
Overview
Kafka Connect may be run in either “standalone” or “distributed” mode. Standalone is often easier for folks to get started with and generally speaking, can be appropriate for lightweight needs in production environments. Running Kafka Connect in distributed mode is the recommended approach when deploying in production.
This brings us to our first key takeaway: regardless if running in “standalone” or “distributed” mode, the Kafka Connect REST API is available to use for connector administration tasks.
When initializing or starting Kafka Connect instances in either standalone or distributed mode, an executable is called with a configuration file location argument. This configuration file is used for attributes of the instance itself such as the bootstrap.servers
or group.id
, as well as configuration which can affect the individual connectors themselves such as key.converter
and value.converter
settings.
This is our second key takeaway: there are two separate mechanisms by which configuration of Kafka Connect can occur — in a configuration file when initializing the standalone or distributed instance as well as the REST API of the instance.
If either of these two key points are not clear, then I recommend you check out Kafka Connect Standalone vs. Distributed Mode and the What is Kafka Connect tutorials on this site. I understand you may not trust me yet and not check either one of these recommended two tutorials, but let me tell you, I wouldn’t steer you wrong. Because, I know you are at least a 3 or 4 out of 10 on the big deal blog readers scale. At least 3 or 4!
Kafka Connect REST API Setup
In my environment, I’ve downloaded and extracted Apache Kafka 2.4 and have a Kafka cluster running in a Docker container. The steps to recreate are:
- Start Docker (if it is not already running)
- git clone https://github.com/conduktor/kafka-stack-docker-compose.git
- cd kafka-stack-docker-compose
- docker-compose -f zk-single-kafka-multiple.yml up -d (wait 5-30 seconds for everything to come online)
- In another terminal window, go to the
bin/
directory of downloaded and extracted Apache Kafka. For me, that’s ~/dev/kafka_2.11-2.4.1/bin. - Run `./kafka-topics.sh –list –bootstrap-server localhost:9092` to ensure all is well
- Start Kafka Connect with the default File Sink with `./connect-standalone.sh ../config/connect-standalone.properties ../config/connect-file-sink.properties`
Let’s stop right here because this last step is an example of my earlier point. As mentioned, there are two areas where Kafka Connect is configured. In Step 7, the second argument which passes in the connect-standalone.properties
file is the first area. The second area is the REST API.
Note: standalone mode is different than distributed because it requires a second argument to initialize a particular sink or source. Running in distributed mode does not require the second argument of configuration. Remember I recommended the Kafka Connect Standalone vs. Distributed Mode tutorial is this doesn’t click yet.
Kafka Connect REST API Examples
We’ll use curl, but should translate easily to Postman or your preferred HTTP client.
Kafka Connect List Active Connectors
curl -s http://localhost:8083/connectors
["local-file-sink"]
As expected just the file sink connector is running.
Kafka Connect Describe Connector
curl -s http://localhost:8083/connectors/local-file-sink | jq '.'
{
"name": "local-file-sink",
"config": {
"connector.class": "FileStreamSink",
"file": "test.sink.txt",
"tasks.max": "1",
"topics": "connect-test",
"name": "local-file-sink"
},
"tasks": [
{
"connector": "local-file-sink",
"task": 0
}
],
"type": "sink"
}
Got it. 1 running task, 1 task max, it’s a sink and not a source. Subscribed to “connect-test” topic.
Kafka Connect Get Connector Config
curl -s http://localhost:8083/connectors/local-file-sink/config | jq '.'
{
"connector.class": "FileStreamSink",
"file": "test.sink.txt",
"tasks.max": "1",
"topics": "connect-test",
"name": "local-file-sink"
}
Ok, so now we have just the config.
Kafka Connect Update Connector Config
curl -X PUT http://localhost:8083/connectors/local-file-sink/config \
> -H "Content-Type: application/json" \
> -d '{ "connector.class": "FileStreamSink", "file": "test-2.sink.txt", "tasks.max": "2", "topics": "connect-test-new", "name": "local-file-sink" }' | jq '.'
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 413 100 272 100 141 5130 2659 --:--:-- --:--:-- --:--:-- 8787
{
"name": "local-file-sink",
"config": {
"connector.class": "FileStreamSink",
"file": "test-2.sink.txt",
"tasks.max": "2",
"topics": "connect-test-new",
"name": "local-file-sink"
},
"tasks": [
{
"connector": "local-file-sink",
"task": 0
},
{
"connector": "local-file-sink",
"task": 1
}
],
"type": "sink"
}
Use PUT method, set header to JSON, and send in the new config in JSON. Get back the results that the configuration change succeeded. But, did it?
Kafka Connect Get Connector Status
curl -s http://localhost:8083/connectors/local-file-sink/status | jq '.'
{
"name": "local-file-sink",
"connector": {
"state": "RUNNING",
"worker_id": "10.155.153.76:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "10.155.153.76:8083"
},
{
"id": 1,
"state": "RUNNING",
"worker_id": "10.155.153.76:8083"
}
],
"type": "sink"
}
Looks good state is RUNNING and now running two tasks instead of 1.
Kafka Connect Pause Connector
curl -X PUT http://localhost:8083/connectors/local-file-sink/pause | jq '.'
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
0 0 0 0 0 0 0 0 --:--:-- --:--:-- --:--:-- 0
~ $ curl -s http://localhost:8083/connectors/local-file-sink/status | jq '.'
{
"name": "local-file-sink",
"connector": {
"state": "PAUSED",
"worker_id": "10.155.153.76:8083"
},
"tasks": [
{
"id": 0,
"state": "PAUSED",
"worker_id": "10.155.153.76:8083"
},
{
"id": 1,
"state": "PAUSED",
"worker_id": "10.155.153.76:8083"
}
],
"type": "sink"
}
Great. Attempted to pause it and verified it.
Kafka Connect Restart Connector
curl -X PUT http://localhost:8083/connectors/local-file-sink/resume | jq '.'
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
0 0 0 0 0 0 0 0 --:--:-- --:--:-- --:--:-- 0
~ $ curl -s http://localhost:8083/connectors/local-file-sink/status | jq '.'
{
"name": "local-file-sink",
"connector": {
"state": "RUNNING",
"worker_id": "10.155.153.76:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "10.155.153.76:8083"
},
{
"id": 1,
"state": "RUNNING",
"worker_id": "10.155.153.76:8083"
}
],
"type": "sink"
}
All we’re back and running
Kafka Connect Restart Connector Task
What if we had a particular task we wanted to restart?
curl -X POST http://localhost:8083/connectors/local-file-sink/tasks/1/restart | jq '.'
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
0 0 0 0 0 0 0 0 --:--:-- --:--:-- --:--:-- 0
~ $ curl -s http://localhost:8083/connectors/local-file-sink/tasks/1/status | jq '.'
{
"id": 1,
"state": "RUNNING",
"worker_id": "10.155.153.76:8083"
}
As we can see in the above sample, the answer is we can restart a specific task and query the status of specific connector task as well.
Hope these examples help! Again, you don’t have to use curl. Let me know if any feedback.
Further Resources
- https://kafka.apache.org/documentation/#connect_rest
- Kafka Connect based Data Generator tutorial has specific examples of concepts presented in this tutorial
- OpenAPI spec for Kafka Connect at https://kafka.apache.org/33/generated/connect_rest.yaml which you can import into a Swagger editor.