In this Kafka Connect S3 tutorial, let’s demo multiple Kafka S3 integration examples. We’ll cover writing to S3 from one topic and also multiple Kafka source topics. Also, we’ll see an example of an S3 Kafka source connector reading files from S3 and writing to Kafka will be shown.
Examples will be provided for both Confluent and Apache distributions of Kafka.
I’ll document the steps so you can run this on your environment if you want. Or, you can watch me do it in videos below. Or both. Your call.
Now, to set some initial expectations, these are just examples and we won’t examine Kafka Connect in standalone or distributed mode or how the internals of Kafka Consumer Groups assist Kafka Connect.
If you have any questions or concerns, leave them in the comments below.
The overall goal will be keeping it simple and get working examples asap. We can optimize afterward.
Accompanying source code is available in GitHub (see Resources section for link) and screencast videos on YouTube.
Let’s get started.
Table of Contents
- Kafka S3 Requirements
- Kafka S3 Setup
- Kafka Connect S3 Sink Example with Confluent
- Kafka Connect S3 Sink Example with Apache Kafka
- Kafka Connect S3 Sink Example with Multiple Source Topics
- Kafka Connect S3 Source Example
- Kafka Connect S3 Helpful Resources
Kafka S3 Requirements
- S3 environment which you can write and read from. (I mean, “no duh”, or as some folks say, “no doy”. What do you say?)
- MySQL (if you want to use the sample source data; described more below)
- Kafka (examples of both Confluent and Apache Kafka are shown)
Kafka S3 Setup
As you’ll see in the next screencast, this first tutorial utilizes the previous Kafka Connect MySQL tutorial. In fact, if you are new to Kafka Connect, you may wish to reference this previous post on Kafka Connector MySQL examples before you start. I’ll go through it quickly in the screencast below in case you need a refresher.
There are essentially two types of examples below. One, an example of writing to S3 from Kafka with Kafka S3 Sink Connector and two, an example of reading from S3 to Kafka. In other words, we will demo Kafka S3 Source examples and Kafka S3 Sink Examples.
Also, there is an example of reading from multiple Kafka topics and writing to S3 as well.
For authorization to S3, I’m going to show using the credentials
file approach in the screencast examples. For more information on S3 credential options, see the link in the Resources section below.
Kafka Connect S3 Sink Example with Confluent
Do you ever the expression “let’s work backward from the end”? Well, you know what? I invented that saying! Actually, I’m kidding, I didn’t invent it.
Anyhow, let’s work backward from the end result in the following screencast. Then, we’ll go through each of the steps to get us there.
Again, we will start with Apache Kafka in Confluent example. The next example uses the standalone Apache Kafka.
Here are the steps (more or less) in the above screencast
- Install S3 sink connector with `confluent-hub install confluentinc/kafka-connect-s3:5.4.1`
- confluent local start
- Optional `aws s3 ls kafka-connect-example` to verify your ~/.aws/credentials file
- Copy, modify
s3-sink.properties
file and load it with `confluent local load s3-sink — -d s3-sink.properties` - List topics `kafka-topics –list –bootstrap-server localhost:9092`
- Load `mysql-bulk-source` source connector from the previous MySQL Kafka Connect tutorial with the command `confluent local load mysql-bulk-source — -d mysql-bulk-source.properties`
- List topics and confirm the mysql_* topics are present
- Show S3 Console with new files
- Review the S3 sink connector configuration
Kafka Connect S3 Sink Example with Apache Kafka
And now, let’s do it with Apache Kafka. Here’s a screencast of running the S3 sink connector with Apache Kafka.
Here are the steps (more or less) in the above screencast
- Start Zookeeper `bin/zookeeper-server-start.sh config/zookeeper.propties`
- Start Kafka `bin/kafka-server-start.sh config/server.properties`
- S3 sink connector is downloaded, extracted and other configuration
- Optional `aws s3 ls kafka-connect-example` to verify your ~/.aws/credentials file
- List topics `bin/kafka-topics.sh –list –bootstrap-server localhost:9092`
- Show modified
s3-sink.properties
file in my current directory (and the mysql-bulk-source.properties file as well) - List topics `kafka-topics –list –bootstrap-server localhost:9092`
- Load `mysql-bulk-source` source connector from the previous MySQL Kafka Connect tutorial and also S3 sink connector with one command `bin/connect-standalone.sh config/connect-standalone.properties mysql-bulk-source.properties s3-sink.properties`
- List topics and confirm the mysql_* topics are present
- Show S3 Console with new files
- Review the S3 sink connector configuration
Kafka Connect S3 Sink Example with Multiple Source Topics
The previous examples showed streaming to S3 from a single Kafka topic. What if you want to stream multiple topics from Kafka to S3? Have no fear my internet friend, it’s easy with the topics.regex
setting and shown in the following screencast. Sorry, but I just can’t do it. Sometimes, and I just hate to admit this, but I just don’t have the energy to make all these big time TV shows.
If you need a TV show, let me know in the comments below and I might reconsider, but for now, this is what you need to do.
Here are the steps (more or less) of what I would have done in the Big Time TV Show (aka: a screencast) for sinking multiple Kafka topics into S3
- Update your s3-sink.properties file — comment out
topics
variable and uncomment thetopics.regex
variable. - Unload your S3 sink connector if it is running
- Load the S3 sink connector
- Check out S3 — you should see all your topic data whose name starts with
mysql
Kafka Connect S3 Source Example
When it comes to ingesting from S3 to Kafka with a pre-built Kafka Connect connector, we might be a bit limited depending on your sitation. One, at the time of this writing, there is a Kafka Connect S3 Source connector, but it is only able to read files created from the Connect S3 Sink connector. We used this connector in the above examples. From the Source connector’s documentation–
“The Kafka Connect Amazon S3 Source Connector provides the capability to read data exported to S3 by the Apache Kafka® Connect S3 Sink connector and publish it back to a Kafka topic”
Now, this might be completely fine for your use case, but if this is an issue for you, there might be a workaround. As a possible workaround, there are ways to mount S3 buckets to a local files system using things like s3fs-fuse
. From there, it should be possible to read files into Kafka with sources such as the Spooldir connector. Let me know in the comments.
Also, another possible limitation is license requirement. Because Kafka Connect S3 Source connector requires a Confluent license after 30 days, we’ll run through the following demo using Confluent.
Here are the steps (more or less) in the above Kafka Connect S3 Source screencast
- Install S3 sink connector with `confluent-hub install confluentinc/kafka-connect-s3-source:1.2.2`
- confluent local start
- Optional `aws s3 ls kafka-connect-example` to verify your ~/.aws/credentials file
- Copy and modify
s3-source.properties
file - List topics `kafka-topics –list –bootstrap-server localhost:9092` and highlight how the mysql_* topics are present
- Load S3 source connector with `confluent local load s3-source — -d s3-source.properties`
- List topics and confirm the copy_of* topics are present
- Review the S3 sink connector configuration
Kafka Connect S3 Helpful Resources
- Source config examples in “s3” folder https://github.com/tmcgrath/kafka-connect-examples
- A blog post announcing the S3 Sink Connector https://www.confluent.io/blog/apache-kafka-to-amazon-s3-exactly-once/
- Kafka Connect S3 Sink Connector documentation https://docs.confluent.io/current/connect/kafka-connect-s3/index.html
- More information AWS Credential Providers https://docs.confluent.io/current/connect/kafka-connect-s3/index.html#credentials-providers
- Kafka Connect S3 Source connector https://docs.confluent.io/current/connect/kafka-connect-s3-source
Featured image https://pixabay.com/photos/old-bottles-glass-vintage-empty-768666/
I am trying to achieve Mysql=>debezium(cdc)=>kafka=>kafka connect S3 Sink.
I have created a docker compose file for creating till Mysql=>debezium(cdc)=>kafka as mentioned below. Please help me on adding Kafka connect S3 sink to this docker compose.
docker-Compose:
version: ‘3.0’
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
container_name: zookeeper
ports:
– 2181:2181
– 2888:2888
– 3888:3888
environment:
– ALLOW_ANONYMOUS_LOGIN=yes
– ZOOKEEPER_CLIENT_PORT=2181
– ZOOKEEPER_TICK_TIME=2000
kafka:
image: confluentinc/cp-kafka:latest
container_name: kafka
ports:
– 9092:9092
– 29092:29092
depends_on:
– zookeeper
environment:
– KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
– KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
– KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
– KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT
– KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
mysql:
image: debezium/example-mysql:1.0
container_name: mysql
ports:
– 3306:3306
environment:
– MYSQL_ROOT_PASSWORD=debezium
– MYSQL_USER=mysqluser
– MYSQL_PASSWORD=mysqlpw
connect:
image: debezium/connect:1.0
container_name: connect
ports:
– 8083:8083
depends_on:
– kafka
– mysql
environment:
– BOOTSTRAP_SERVERS=kafka:9092
– GROUP_ID=1
– CONFIG_STORAGE_TOPIC=my_connect_configs
– OFFSET_STORAGE_TOPIC=my_connect_offsets
– STATUS_STORAGE_TOPIC=my_connect_statuses
postgresql:
image: sameersbn/postgresql:9.4
container_name: postgresql
environment:
– DEBUG=false
– DB_USER=test
– DB_PASS=Test123
– DB_NAME=test
ports:
– “5432:5432”