In this Kafka Connect mysql tutorial, we’ll cover reading from mySQL to Kafka and reading from Kafka and writing to mySQL. Let’s run this on your environment.
Now, it’s just an example and we’re not going to debate operations concerns such as running in standalone or distributed mode. The focus will be keeping it simple and get it working. We can optimize afterward.
We may cover Kafka Connect transformations or topics like Kafka Connect credential management in a later tutorial, but not here. I hope you don’t mind. You see, I’m a big shot tutorial engineer and I get to make the decisions around here. So, when I write “I hope you don’t mind”, what I really mean is that I don’t care. (Well, I’m just being cheeky now. If you have questions, comments or ideas for improvement, please leave them below.)
REQUIREMENTS
This is what you’ll need if you’d like to perform the steps in your environment. Adjust as necessary. You can do that in your environment because you’re the boss there.
In this Kafka Connect with mySQL tutorial, you’ll need
- running Kafka with Connect and Schema Registry
- mySQL
- mySQL JDBC driver
SETUP
I’ll run through this in the screencast below, but this tutorial example utilizes the mySQL Employees sample database. The link to the download is included in the References section below.
The mySQL JDBC driver needs to be downloaded and located in the Confluent classpath. I’ll also demonstrate in this in the screencast, but for now, just take my word for it that the jar is in share/java/kafka-connect-jdbc of your Confluent root dir. Yeah, trust me.
KAFKA CONNECT MYSQL SOURCE EXAMPLE
Do you ever the expression “let’s work backwards”. I hear it all the time now. Anyhow, let’s work backwards and see the end result in the following screencast and then go through the steps it took to get there.
To recap, here are the key aspects of the screencast demonstration (Note: since I recorded this screencast above, the Confluent CLI has changed with a confluent local
Depending on your version, you may need to add local
immediately after confluent
for example confluent local status connectors
-
- Kafka (connect, schema registry) running in one terminal tab
- mysql jdbc driver downloaded and located in share/java/kafka-connect-jdbc (note about needing to restart after download)
- Sequel PRO with mySQL – imported the employees db
- list the topics `bin/kafka-topics –list –zookeeper localhost:2181`
- `bin/confluent status connectors`
-
- `bin/confluent load mysql-bulk-source -d mysql-bulk-source.properties`
-
- `bin/confluent status connectors` or `bin/confluent status mysql-bulk-source`
-
- list the topics again `bin/kafka-topics –list –zookeeper localhost:2181` and see the tables as topics
- `bin/kafka-avro-console-consumer –bootstrap-server localhost:9092 –topic mysql-departments –from-beginning`
Be careful copy-and-paste any of the commands above with double hyphens “–” This is changed to em dash sometimes and it can cause issues.
KAFKA CONNECT MYSQL CONFIGURATION STEPS
To run the example shown above, you’ll need to perform the following in your environment.
Kafka and associated components like connect, zookeeper, schema-registry are running. This will be dependent on which flavor of Kafka you are using. I’m using Confluent Open Source in the screencast. If you need any assistance with setting up other Kafka distros, just let me know.
Regardless of Kafka version, make sure you have the mySQL jdbc driver available in the Kafka Connect classpath. You can add it to this classpath by putting the jar in <YOUR_KAFKA>/share/java/kafka-connect-jdbc directory.
Speaking of paths, many of the CLI commands might be easier or more efficient to run if you add the appropriate `bin/` directory to your path. I do not have that set in my environment for this tutorial.
KAFKA CONNECT MYSQL INGEST CONFIGURATION
As my astute readers surely saw, the connector’s config is controlled by the `mysql-bulk-source.properties` file. You can create this file from scratch or copy or an existing config file such as the sqllite based one located in `etc/kafka-connect-jdbc/`
I’ve also provided sample files for you in my github repo. See link in References section below.
Outside of regular JDBC connection configuration, the items of note are `mode` and `topic.prefix`. For mode, you have options, but since we want to copy everything it’s best just to set to `bulk`. Other options include timestamp, incrementing and timestamp+incrementing. See link for config options below in Reference section.
SECTION CONCLUSION
Ok, we did it. We ingested mySQL tables into Kafka using Kafka Connect. That’s a milestone and we should be happy and maybe a bit proud. Well, let me rephrase that. I did it. I know that is true. Did you do it too? I hope so because you are my most favorite big-shot-engineer-written-tutorial-reader ever.
Should we stop now and celebrate? I know what you’re thinking. Well, maybe. And to that I say…. ok, let’s do it. Just kidding. It’s too late to stop now. Let’s keep goin you fargin bastage.
KAFKA CONNECT MYSQL SINK EXAMPLE
Now that we have our mySQL sample database in Kafka topics, how do we get it out? Rhetorical question. Let’s configure and run a Kafka Connect Sink to read from our Kafka topics and write to mySQL. Again, let’s start at the end. Here’s a screencast writing to mySQL from Kafka using Kafka Connect
Once again, here are the key takeaways from the demonstration
-
- Kafka running in one terminal tab
- Sequel PRO with mySQL – created a new destination database and verified tables and data created
- list the topics `bin/kafka-topics –list –zookeeper localhost:2181`
-
- `bin/confluent load mysql-bulk-sink -d mysql-bulk-sink.properties`
- `bin/confluent status connectors` or `bin/confluent status mysql-bulk-sink`
KAFKA CONNECT MYSQL SINK CONFIGURATION
Not much has changed from the first source example. The one thing to call out is the `topics.regex` in the mysql-bulk-sink.properties file. Using this setting, it’s possible to set a regex expression for all the topics which we wish to process.
CONCLUSION
Or let me know if you have any questions or suggestions for improvement. Feedback always welcomed. Well, money is welcomed more, but feedback is kinda sorta welcomed too.
REFERENCES
SOURCE CONNECTOR DOCS
https://docs.confluent.io/current/connect/kafka-connect-jdbc/source-connector/index.html
https://docs.confluent.io/current/connect/kafka-connect-jdbc/source-connector/source_config_options.html#jdbc-source-configs
SINK CONNECTOR DOCS
https://docs.confluent.io/current/connect/kafka-connect-jdbc/sink-connector/index.html
https://docs.confluent.io/current/connect/kafka-connect-jdbc/sink-connector/sink_config_options.html
EXAMPLE KAFKA CONNECT SOURCE AND SINK CONFIG FILES
https://github.com/tmcgrath/kafka-connect-examples/tree/master/mysql
MYSQL EMPLOYEES SAMPLE DATABASE
https://dev.mysql.com/doc/employee/en/
Additional Kafka Tutorials
Image credit https://pixabay.com/en/wood-woods-grain-rings-100181/
Hey,
In the first part, I am not able to see the topics created for every table.
I used the same source and sink file as shared by you in your github repo.
Can you please help?
Strange, it should work as shown… can you post the config you are using?