Kafka Connect mySQL Examples


In this Kafka Connect mysql tutorial, we’ll cover reading from mySQL to Kafka and reading from Kafka and writing to mySQL.   Let’s run this on your environment.

Now, it’s just an example and we’re not going to debate operations concerns such as running in standalone or distributed mode.  The focus will be keeping it simple and get it working.  We can optimize afterward.

We may cover Kafka Connect transformations or topics like Kafka Connect credential management in a later tutorial, but not here.  I hope you don’t mind.  You see, I’m a big shot tutorial engineer and I get to make the decisions around here.  So, when I write “I hope you don’t mind”, what I really mean is that I don’t care.  (Well, I’m just being cheeky now.  If you have questions, comments or ideas for improvement, please leave them below.)

REQUIREMENTS

This is what you’ll need if you’d like to perform the steps in your environment.  Adjust as necessary.  You can do that in your environment because you’re the boss there.

In this Kafka Connect with mySQL tutorial, you’ll need

  • running Kafka with Connect and Schema Registry
  • mySQL
  • mySQL JDBC driver

SETUP

I’ll run through this in the screencast below, but this tutorial example utilizes the mySQL Employees sample database.  The link to the download is included in the References section below.

The mySQL JDBC driver needs to be downloaded and located in the Confluent classpath.  I’ll also demonstrate in this in the screencast, but for now, just take my word for it that the jar is in share/java/kafka-connect-jdbc of your Confluent root dir.  Yeah, trust me.

KAFKA CONNECT MYSQL SOURCE EXAMPLE

Do you ever the expression “let’s work backwards”.  I hear it all the time now.  Anyhow, let’s work backwards and see the end result in the following screencast and then go through the steps it took to get there.

See also  GCP Kafka Connect Google Cloud Storage Examples

To recap, here are the key aspects of the screencast demonstration (Note:  since I recorded this screencast above, the Confluent CLI has changed with a confluent local Depending on your version, you may need to add local immediately after confluent for example confluent local status connectors

    • Kafka (connect, schema registry) running in one terminal tab
    • mysql jdbc driver downloaded and located in share/java/kafka-connect-jdbc (note about needing to restart after download)
    • Sequel PRO with mySQL – imported the employees db
    • list the topics `bin/kafka-topics –list –zookeeper localhost:2181`
    • `bin/confluent status connectors`
    • `bin/confluent load mysql-bulk-source -d mysql-bulk-source.properties`
    • `bin/confluent status connectors` or `bin/confluent status mysql-bulk-source`
    • list the topics again `bin/kafka-topics –list –zookeeper localhost:2181` and see the tables as topics
    • `bin/kafka-avro-console-consumer –bootstrap-server localhost:9092 –topic mysql-departments –from-beginning`

Be careful copy-and-paste any of the commands above with double hyphens “–”  This is changed to em dash sometimes and it can cause issues.

KAFKA CONNECT MYSQL CONFIGURATION STEPS

To run the example shown above, you’ll need to perform the following in your environment.

Kafka and associated components like connect, zookeeper, schema-registry are running.  This will be dependent on which flavor of Kafka you are using.  I’m using Confluent Open Source in the screencast.  If you need any assistance with setting up other Kafka distros, just let me know.

Regardless of Kafka version, make sure you have the mySQL jdbc driver available in the Kafka Connect classpath.  You can add it to this classpath by putting the jar in <YOUR_KAFKA>/share/java/kafka-connect-jdbc directory.

Speaking of paths, many of the CLI commands might be easier or more efficient to run if you add the appropriate `bin/` directory to your path.  I do not have that set in my environment for this tutorial.

See also  Azure Kafka Connect Example - Blob Storage

KAFKA CONNECT MYSQL INGEST CONFIGURATION

As my astute readers surely saw, the connector’s config is controlled by the `mysql-bulk-source.properties` file.  You can create this file from scratch or copy or an existing config file such as the sqllite based one located in `etc/kafka-connect-jdbc/`

I’ve also provided sample files for you in my github repo.  See link in References section below.

Outside of regular JDBC connection configuration, the items of note are `mode` and `topic.prefix`.  For mode, you have options, but since we want to copy everything it’s best just to set to `bulk`.  Other options include timestamp, incrementing and timestamp+incrementing.  See link for config options below in Reference section.

SECTION CONCLUSION

Ok, we did it.  We ingested mySQL tables into Kafka using Kafka Connect.  That’s a milestone and we should be happy and maybe a bit proud.  Well, let me rephrase that.  I did it.  I know that is true.  Did you do it too?  I hope so because you are my most favorite big-shot-engineer-written-tutorial-reader ever.

Should we stop now and celebrate?  I know what you’re thinking.  Well, maybe.  And to that I say…. ok, let’s do it.  Just kidding.  It’s too late to stop now.  Let’s keep goin you fargin bastage.

KAFKA CONNECT MYSQL SINK EXAMPLE

Now that we have our mySQL sample database in Kafka topics, how do we get it out?  Rhetorical question.  Let’s configure and run a Kafka Connect Sink to read from our Kafka topics and write to mySQL.  Again, let’s start at the end.  Here’s a screencast writing to mySQL from Kafka using Kafka Connect

Once again, here are the key takeaways from the demonstration

    • Kafka running in one terminal tab
    • Sequel PRO with mySQL – created a new destination database and verified tables and data created
    • list the topics `bin/kafka-topics –list –zookeeper localhost:2181`
    • `bin/confluent load mysql-bulk-sink -d mysql-bulk-sink.properties`
  • `bin/confluent status connectors` or `bin/confluent status mysql-bulk-sink`
See also  Kafka Connect S3 Examples

KAFKA CONNECT MYSQL SINK CONFIGURATION

Not much has changed from the first source example.  The one thing to call out is the `topics.regex` in the mysql-bulk-sink.properties file.  Using this setting, it’s possible to set a regex expression for all the topics which we wish to process.

CONCLUSION

Or let me know if you have any questions or suggestions for improvement.  Feedback always welcomed.  Well, money is welcomed more, but feedback is kinda sorta welcomed too.

REFERENCES

SOURCE CONNECTOR DOCS

https://docs.confluent.io/current/connect/kafka-connect-jdbc/source-connector/index.html

https://docs.confluent.io/current/connect/kafka-connect-jdbc/source-connector/source_config_options.html#jdbc-source-configs

SINK CONNECTOR DOCS

https://docs.confluent.io/current/connect/kafka-connect-jdbc/sink-connector/index.html

https://docs.confluent.io/current/connect/kafka-connect-jdbc/sink-connector/sink_config_options.html

EXAMPLE KAFKA CONNECT SOURCE AND SINK CONFIG FILES

https://github.com/tmcgrath/kafka-connect-examples/tree/master/mysql

MYSQL EMPLOYEES SAMPLE DATABASE

https://dev.mysql.com/doc/employee/en/

Additional Kafka Tutorials

Kafka Tutorials

Kafka Streams Tutorials

Image credit https://pixabay.com/en/wood-woods-grain-rings-100181/

About Todd M

Todd has held multiple software roles over his 20 year career. For the last 5 years, he has focused on helping organizations move from batch to data streaming. In addition to the free tutorials, he provides consulting, coaching for Data Engineers, Data Scientists, and Data Architects. Feel free to reach out directly or to connect on LinkedIn

2 thoughts on “Kafka Connect mySQL Examples”

  1. Hey,
    In the first part, I am not able to see the topics created for every table.
    I used the same source and sink file as shared by you in your github repo.
    Can you please help?

    Reply

Leave a Comment