Kafka Connect S3 Examples


In this Kafka Connect S3 tutorial, let’s demo multiple Kafka S3 integration examples.  We’ll cover writing to S3 from one topic and also multiple Kafka source topics. Also, we’ll see an example of an S3 Kafka source connector reading files from S3 and writing to Kafka will be shown.

Examples will be provided for both Confluent and Apache distributions of Kafka.

I’ll document the steps so you can run this on your environment if you want.  Or, you can watch me do it in videos below. Or both. Your call.

Now, to set some initial expectations, these are just examples and we won’t examine Kafka Connect in standalone or distributed mode or how the internals of Kafka Consumer Groups assist Kafka Connect.

If you have any questions or concerns, leave them in the comments below.

The overall goal will be keeping it simple and get working examples asap.  We can optimize afterward.

Accompanying source code is available in GitHub (see Resources section for link) and screencast videos on YouTube.

Let’s get started.

Table of Contents

Kafka S3 Requirements

  1. S3 environment which you can write and read from. (I mean, “no duh”, or as some folks say, “no doy”.  What do you say?)
  2. MySQL (if you want to use the sample source data; described more below)
  3. Kafka (examples of both Confluent and Apache Kafka are shown)

Kafka S3 Setup

As you’ll see in the next screencast, this first tutorial utilizes the previous Kafka Connect MySQL tutorial.  In fact, if you are new to Kafka Connect, you may wish to reference this previous post on Kafka Connector MySQL examples before you start.  I’ll go through it quickly in the screencast below in case you need a refresher.

There are essentially two types of examples below.  One, an example of writing to S3 from Kafka with Kafka S3 Sink Connector and two, an example of reading from S3 to Kafka.  In other words, we will demo Kafka S3 Source examples and Kafka S3 Sink Examples.

Also, there is an example of reading from multiple Kafka topics and writing to S3 as well.

For authorization to S3, I’m going to show using the credentialsfile approach in the screencast examples.  For more information on S3 credential options, see the link in the Resources section below.

Kafka Connect S3 Sink Example with Confluent

Do you ever the expression “let’s work backward from the end”?  Well, you know what? I invented that saying! Actually, I’m kidding, I didn’t invent it.

Anyhow, let’s work backward from the end result in the following screencast. Then, we’ll go through each of the steps to get us there.

Again, we will start with Apache Kafka in Confluent example.  The next example uses the standalone Apache Kafka.

Here are the steps (more or less) in the above screencast

  1. Install S3 sink connector with `confluent-hub install confluentinc/kafka-connect-s3:5.4.1`
  2. confluent local start
  3. Optional `aws s3 ls kafka-connect-example` to verify your ~/.aws/credentials file
  4. Copy, modify s3-sink.properties file and load it with `confluent local load s3-sink — -d s3-sink.properties`
  5. List topics `kafka-topics –list –bootstrap-server localhost:9092`
  6. Load `mysql-bulk-source` source connector from the previous MySQL Kafka Connect tutorial with the command `confluent local load mysql-bulk-source — -d mysql-bulk-source.properties`
  7. List topics and confirm the mysql_* topics are present
  8. Show S3 Console with new files
  9. Review the S3 sink connector configuration

Kafka Connect S3 Sink Example with Apache Kafka

And now, let’s do it with Apache Kafka. Here’s a screencast of running the S3 sink connector with Apache Kafka.

Here are the steps (more or less) in the above screencast

  1. Start Zookeeper `bin/zookeeper-server-start.sh config/zookeeper.propties`
  2. Start Kafka `bin/kafka-server-start.sh config/server.properties`
  3. S3 sink connector is downloaded, extracted and other configuration
  4. Optional `aws s3 ls kafka-connect-example` to verify your ~/.aws/credentials file
  5. List topics `bin/kafka-topics.sh –list –bootstrap-server localhost:9092`
  6. Show modified s3-sink.properties file in my current directory (and the mysql-bulk-source.properties file as well)
  7. List topics `kafka-topics –list –bootstrap-server localhost:9092`
  8. Load `mysql-bulk-source` source connector from the previous MySQL Kafka Connect tutorial and also S3 sink connector with one command `bin/connect-standalone.sh config/connect-standalone.properties mysql-bulk-source.properties s3-sink.properties`
  9. List topics and confirm the mysql_* topics are present
  10. Show S3 Console with new files
  11. Review the S3 sink connector configuration

Kafka Connect S3 Sink Example with Multiple Source Topics

The previous examples showed streaming to S3 from a single Kafka topic.  What if you want to stream multiple topics from Kafka to S3?  Have no fear my internet friend, it’s easy with the topics.regexsetting and shown in the following screencast.  Sorry, but I just can’t do it.  Sometimes, and I just hate to admit this, but I just don’t have the energy to make all these big time TV shows.

If you need a TV show, let me know in the comments below and I might reconsider, but for now, this is what you need to do.

Here are the steps (more or less) of what I would have done in the Big Time TV Show (aka: a screencast) for sinking multiple Kafka topics into S3

  1. Update your s3-sink.properties file — comment out topics variable and uncomment the topics.regexvariable.
  2. Unload your S3 sink connector if it is running
  3. Load the S3 sink connector
  4. Check out S3 — you should see all your topic data whose name starts with mysql

Kafka Connect S3 Source Example

When it comes to ingesting from S3 to Kafka with a pre-built Kafka Connect connector, we might be a bit limited depending on your sitation.  One, at the time of this writing, there is a Kafka Connect S3 Source connector, but it is only able to read files created from the Connect S3 Sink connector.  We used this connector in the above examples.  From the Source connector’s documentation–

“The Kafka Connect Amazon S3 Source Connector provides the capability to read data exported to S3 by the Apache Kafka® Connect S3 Sink connector and publish it back to a Kafka topic”

Now, this might be completely fine for your use case, but if this is an issue for you, there might be a workaround.  As a possible workaround, there are ways to mount S3 buckets to a local files system using things like s3fs-fuse.  From there, it should be possible to read files into Kafka with sources such as the Spooldir connector.  Let me know in the comments.

Also, another possible limitation is license requirement. Because Kafka Connect S3 Source connector requires a Confluent license after 30 days, we’ll run through the following demo using Confluent.

Here are the steps (more or less) in the above Kafka Connect S3 Source screencast

  1. Install S3 sink connector with `confluent-hub install confluentinc/kafka-connect-s3-source:1.2.2`
  2. confluent local start
  3. Optional `aws s3 ls kafka-connect-example` to verify your ~/.aws/credentials file
  4. Copy and modify s3-source.properties file
  5. List topics `kafka-topics –list –bootstrap-server localhost:9092` and highlight how the mysql_* topics are present
  6. Load S3 source connector with `confluent local load s3-source — -d s3-source.properties`
  7. List topics and confirm the copy_of* topics are present
  8. Review the S3 sink connector configuration

Kafka Connect S3 Helpful Resources

Featured image https://pixabay.com/photos/old-bottles-glass-vintage-empty-768666/

See also  StreamConnectors.com Launches with Exclusive Kafka Connect Kinesis Source Connector – No Strings Attached!
About Todd M

Todd has held multiple software roles over his 20 year career. For the last 5 years, he has focused on helping organizations move from batch to data streaming. In addition to the free tutorials, he provides consulting, coaching for Data Engineers, Data Scientists, and Data Architects. Feel free to reach out directly or to connect on LinkedIn

1 thought on “Kafka Connect S3 Examples”

  1. I am trying to achieve Mysql=>debezium(cdc)=>kafka=>kafka connect S3 Sink.

    I have created a docker compose file for creating till Mysql=>debezium(cdc)=>kafka as mentioned below. Please help me on adding Kafka connect S3 sink to this docker compose.

    docker-Compose:
    version: ‘3.0’
    services:
    zookeeper:
    image: confluentinc/cp-zookeeper:latest
    container_name: zookeeper
    ports:
    – 2181:2181
    – 2888:2888
    – 3888:3888
    environment:
    – ALLOW_ANONYMOUS_LOGIN=yes
    – ZOOKEEPER_CLIENT_PORT=2181
    – ZOOKEEPER_TICK_TIME=2000
    kafka:
    image: confluentinc/cp-kafka:latest
    container_name: kafka
    ports:
    – 9092:9092
    – 29092:29092
    depends_on:
    – zookeeper
    environment:
    – KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
    – KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
    – KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
    – KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT
    – KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
    mysql:
    image: debezium/example-mysql:1.0
    container_name: mysql
    ports:
    – 3306:3306
    environment:
    – MYSQL_ROOT_PASSWORD=debezium
    – MYSQL_USER=mysqluser
    – MYSQL_PASSWORD=mysqlpw
    connect:
    image: debezium/connect:1.0
    container_name: connect
    ports:
    – 8083:8083
    depends_on:
    – kafka
    – mysql
    environment:
    – BOOTSTRAP_SERVERS=kafka:9092
    – GROUP_ID=1
    – CONFIG_STORAGE_TOPIC=my_connect_configs
    – OFFSET_STORAGE_TOPIC=my_connect_offsets
    – STATUS_STORAGE_TOPIC=my_connect_statuses
    postgresql:
    image: sameersbn/postgresql:9.4
    container_name: postgresql
    environment:
    – DEBUG=false
    – DB_USER=test
    – DB_PASS=Test123
    – DB_NAME=test
    ports:
    – “5432:5432”

    Reply

Leave a Comment