Stream Processing

Event Stream Processing

We choose Stream Processing as a way to process data more quickly than traditional approaches.  But, how do we do Stream Processing?

Is Stream Processing different than Event Stream Processing?  Why do we need it?  What are a few examples of event streaming patterns?  How do we implement it?

Let’s get into these questions.

As you’ve probably noticed, the topic of stream processing is becoming popular but the idea itself isn’t new at all [1].  The ways we can implement event stream processing vary, but I believe the intention is fundamentally the same.  We use Event Stream Processing to perform real-time computations on data as it arrives or is changed or is deleted.  Therefore, the purpose of Event Stream Processing is simple.  It facilitates the real-time (or as close as we can get to real-time) processing of our data to produce faster results.  It is the answer to “why we need stream processing?”.

Why do we need stream processing?

One way to approach the answer to this question is to consider businesses. Businesses constantly want more.  They want to grow or they want to stay the same.  If they want to grow, they need to change.  If they want to stay the same, the world changes around them and they likely need to adapt to it and that means change.  It doesn’t matter.  Let’s just go with the premise that businesses constantly want more and need to change and evolve to survive.

One area where businesses want more is data processing.  They want to process data faster to make money, save money or mitigate possible risks.  Again, it’s simple.  One way to process data faster is to implement stream processing.

What is not stream processing?

Before we attempt to answer “what is stream processing”, let’s define what it isn’t.  Sound ok?  Well, I hope so.  Here goes…

In a galaxy far, far away, businesses processed data after it landed at various locations to produce a result.  This result was computed much later than when the data originally “landed”.  The canonical example is the “daily report” or the “dashboard”.   Consider a daily report or dashboard which is created once per day and displays the previous business day’s sales from a particular region.  Or similarly, consider determining a probability score if a credit card transaction was fraudulent or not, 5 days after the transaction(s) occurred.  Another example is determining a product is out-of-stock one day after the order was placed.

There are common threads in these examples.  One, the determination of the result (daily report, fraud probability score, product out-of-stock) is produced much later than when the event data used in the calculation occurred.  Businesses want it to happen faster without any quality trade-offs.

Another common thread is how these determinations are implemented in software.  This implementation described in these examples is referred to as “batch processing”.  Why is it called “Batch”?  Because of the way these example results are calculated often based on processing a group of types of data at the same time.  To create a daily report, a group of the previous day’s transactions is used at 12:30a each day.  In the batch processor, multiple transactions are used from beginning to end of the processing.  To determine possible fraud, a group of the previous 5 days’ worth of transactions is used in the batch process.  While processing a group of the previous 8 hours of order transactions (in batch), one order, in particular, is determined not available in the current inventory.

Wouldn’t it be faster to produce the desired results as the data as it is created?  The answer is Yes.  Yes, the report or dashboard results could be updated sooner if they were updated as each order event occurred.  Yes, the fraud probability determination would be much quicker if the most recent transaction event was processed immediately after it happened.  And finally, yes, the out of inventory event could be determined much more quickly if the associated order event was processed at the time it was attempted.

As you’ve probably guessed, the underlines on the word “event” above are intentional.  Here is a spoiler alert.  You need to process events in stream processing.  That’s why stream processing is often referred to as “event stream processing”.

Stream Processing is not Batch Processing.

What is Stream Processing?

Let’s take a look at a couple of diagrams.  Here’s an example of how Stream Processing may fit into an existing architecture.

 

Event Stream Processing diagram example
Event Stream Processing diagram example

Origin events are created from downstream applications such as an E-commerce site or a green screen terminal.  Events are captured and stored in various formats including RDBMS, NoSQL, Mainframes, Log files. Next, these events (inserts, updates, deletes, appends to a log file, etc.) are written into an Event Log. Now, our Stream Processor(s) can be invoked and consume data from one or more Topics in the Event Log and process these events. Processing examples include calculating counts over a specified time range, filtering, aggregating and even joining multiple streams of Topic events to enrich into new Topics. There are many tutorial examples of Stream Processors on this site.

By the way, for more information, see the post on Event Logs.

The example diagram above is ideal when implementing Stream Processing into existing architectures.  The “Origin Events Recaptured”, Event Log and Stream Processors can be implemented and evolved which is beneficial for numerous reasons.

What if we are creating a brand-new, greenfield application?  And what if this application includes Microservices?  Stream Processors can fit nicely here as well as shown in the following diagram.

Event Stream Processing with Microservices Architecture Diagram
Event Stream Processing with Microservices Architecture Diagram

 

As you can see, Stream Processors can be implemented of the Event Log used in Microservice architectures as well.  It is important to note the changes to the arrows in this Microservice example above.  Notice how the arrows indicate data flow To and From the Event Log now.  In this example, Stream Processors are likely creating curated or enriched data Topics that are being consumed from various Microservices and/or landed in some downstream storage such as HDFS or an object store like S3 or some type of Data Lake.

What do most businesses software architectures look like today?

If you could combine both the diagrams above into one, that is what I see as most common.  Also, there are variances of these diagrams when implementing multiple data center solutions such as Hybrid Cloud.

As shown above, one key takeaway of stream processing is determining if you will or will not implement an Event Log.  Because I hear some of you and yes, I guess we could build stream processors without an Event Log, but why? Seriously, let us know in the comments below.

The Event Log will be a distributed transaction, ordered, append-only log for all your streaming architectures.  The Event Log is our lynchpin for building distributed, flexible streaming systems are there are various options available. But again, the log is nothing new.  Again, the concept isn’t new. Many RDBMS use changelogs (or WAL “write-ahead logs”) to improve performance, provide point-in-time-recovery (PITR) hooks for disaster recovery and finally, the foundation for distributed replication.

In most cases, I believe you’ll benefit from choosing to utilize a distributed event log, but there are cases where you may wish to stream directly and checkpoint along the way.  We’ll cover the Event Log in more detail in other areas of this site.

Can we Debate what a Stream Processor is Now?

These are just my opinions and I’m curious if you have something to add here. Do you see stream processing as something else?  Let me know.  As you can tell from above, I consider two requirements to be an Event Log and Stream Processing Framework or Application.  When designing stream processors, I write the stream processor computed results back to the Event Log rather than directly writing the results to a data lake or object store or database.  If the sinking of the stream processor results is required, do it someplace else outside of the stream processor itself; e.g. use a Kafka Connector sink to write to S3, Azure Blog Storage, Cassandra, HDFS, BigQuery, Snowflake, etc.

We have options for the implementation of our Event Log and Stream Processor compute.

Event Logs are most likely implemented in Apache Kafka, but the approach led by Apache Kafka has been expanded into other options as well.

For Stream Processors, our options include Kafka Streams, kSQLdb, Spark Streaming, Pulsar Functions, StreamSets, Nifi, Beam, DynamoDB Streams, Databricks, Flink, Storm, Samza, Google Cloud Dataflow.  What else?  What am I missing?  Let me know in the comments below.

On this site, we’ll deep dive into all these implementations examples and more.

For now, check out the tutorial sections of the site

Regardless of your implementation, there are certain constructs or patterns in stream processing which we should know or at minimum, be aware of for future reference.

Stream Processor Constructs

  • Ordering – do we need to process each event in strict order or is it perfectly fine to compute out of order in another case?
  • Event time vs processing time – must consider event vs processing time, an example could be calculating the average temperature every 5 minutes or average stock price over the last 10 minutes
  • Stream Processor Windows – in concert the meaning of time, perform calculations such as sums, averages, max/min.  An example could be calculating the average temperature every 5 minutes or average stock price over the last 10 minutes
  • Failover and replication
  • Joins – combining data from multiple sources and attempt to infer from or enrich into a new event

Stream Processing Vertical Applications

Where are Stream Processing solutions most prevalent?  The short answer is everywhere.  Stream processors are beneficial in healthcare, finance, security, IoT, Retail, etc. because as mentioned in the beginning, every industry wants data to be faster.

Specific solutions that span multiple verticals include:

  • Refined, harmonized, filter, streaming ETL – streaming from raw to filtered; transformations; anonymization, protected
  • Real-Time Inventory, Recommendations, Fraud Detection, Alerts, Real-Time XYZ, etc
  • Specialized – for example, stream data to search implementations such as Elastic or Solr
  • Machine Learning / AI – streaming data pipelines to training data repositories as well as running through a machine learning model(s)
  • Monitoring and alerting
  • Hybrid or Multi-cloud

Conclusion

Streaming Architecture will be interesting for years to come.  Let’s have some fun exploring and learning.

Additional Resources

[1] The concept described above isn’t necessarily new, as you’ll see in the following example shown in SQL https://en.wikipedia.org/wiki/Event_stream_processing.

 

 

Image credit https://pixabay.com/en/seljalandsfoss-waterfall-iceland-1751463/

Kafka Certification Tips for Developers

Kafka Certification Tips

If you are considering Kafka Certification, this page describes what I did to pass the Confluent Certified Developer for Apache Kafka Certification exam.  Good luck and hopefully this page is helpful for you!

There are many reasons why you may wish to become Kafka certified.

Achieving Kafka Certification might be a way to distinguish yourself in your current role.  Or, you may wish to transition to a new role which includes a higher bill rate for Kafka contract work.  Or, you may just like the pressure of testing yourself and your Kafka developer chops.  Whatever your reasons, it doesn’t matter here folks.  We are all equal under the stars in the sky.  Well, I’m not exactly sure that’s true about all being equal, but it sounds pretty good, right?  Sometimes, when you are Kafka certified, you need to sound good when you are speaking and writing.  Pfft…lol, anyhow, let’s get serious.

I achieved the Kafka Developer certification through hard work.  I had a couple of years of Kafka experience, but this experience wasn’t ever really solely focused on Kafka.  Kafka was just a component in the overall stack I needed and/or wanted to utilize.  It was never 100% devoted to Kafka though. Along the way, I wrote some Kafka tutorial blog posts here to help engrain some of my Kafka learnings.

My point is I had some Kafka experience.  And if I’m being honest, I had misconceptions about what Kafka is and is not at times.  If I’m being entirely honest, I still do sometimes.  You see, I’m honest.

Here’s a link to my certification

Kafka Certification
Kafka Certification

With my previously described experience with Kafka in mind, I took the following steps to pass the certification test on my first try.  Consider these my personal Kafka Certification tips and FAQ.

What is Kafka Certification?

Kafka Certification is a program through Confluent and at the time of this writing, there are two available certifications.  The Confluent Certified Developer for Apache Kafka (CCDAK) and the Confluent Certified Operator for Apache Kafka (CCOAK).  To become certified, you must take a remotely proctored exam from your computer.  Does anyone else think “remotely proctored” sounds a bit…. well, odd.

Are there multiple types of Kafka Certification?

Yes. This post, certification suggestions, and descriptions focus on the Kafka Developer certification.

What should you study to become Apache Kafka Certified?

Well, first off, you should be prepared for more than just Kafka, the distributed commit log, Consumers and Producers.  This is what most people think of when they hear Kafka.  The exam will also test your knowledge in areas such as Kafka Connect, Kafka Streams, KSQL and the Schema Registry.  I cover this in more detail in the next questions.

What is the percentage breakout of questions on Kafka vs. Kafka Streams, etc?

According to the developer’s certification guide from Confluent (at the time of this writing), we are given the following guidance on the breakout.

Application Design 40% which includes command-line tools, configuration, metrics, architecture, and design

Development 30% which is focused on Java Consumer and Producer APIs as well as the REST API

Deployment/Testing/Monitoring 30% includes tuning for latency/throughput, Kafka Streams, KSQL and troubleshooting

This percentage breakout is taken directly from the Confluent Certification Guide.  There is a link to this Guide in the Resources section below.

Are there any Kafka Certification practice tests?

Yes and I purchased a Kafka Certification practice exam course on Udemy and it helped.  It includes 3 sets of practice tests.

How did you prepare for the exam?

I started by reading documentation and performing some hands-on examples from both the Apache Kafka and Confluent documentation.  I read these Kafka docs pretty quickly at first because I wanted to obtain the big picture view of my skills.

Next, I took the first set of practice questions from the practice tests course I purchased. The results of this first practice test showed where I was strong and where I was weak.

Next, I went back to both the Apache Kafka and Confluent documentation and read more carefully in the areas where I needed improvement.

Afterward, I took the 2nd practice test with higher expectations this time.  Again, the results provided an updated assessment of my current Kafka skills and showed me my weaknesses.  This time, I studied where I was weak again.  But, I also spent 10-20% of the time in areas where I already felt confident.

By now, I imagine you can guess what I did next.  I took the third series in the practice exam set.

At this point, I was feeling pretty confident, so I scheduled my test and began studying the documentation again for 30min to 1 hour a day for each day before the exam.

Please describe the “proctor” experience?

Well, let me tell you they are not kidding about wanting to view the area where you take the test.  I had to spin my webcam 360 degrees.  The proctor asked questions about my dual monitor setup and was quite concerned.  I had to disconnect my second monitor and external camera.

Any Kafka Developer Certification sample questions you can share?

Well, the Confluent Certification guide contains a few sample questions, but from my personal experience, I’d buy the recommended practice exam questions I mentioned above and below.

Anything you didn’t do to prepare, but considered doing?

I kept it simple.  I didn’t buy any courses from Udemy or Pluralsight, but I admit I was tempted to buy.  Instead, I just purchased the practice exams and studied the Apache Kafka and Confluent documentation.  I was tempted to, but the documentation is good and there are plenty of examples on the Internet.  According to Steven Pressfield, I just needed to “do the work”.

Final Thoughts or any other helpful resources

 

Featured image https://pixabay.com/users/insspirito-1851261/

GCP Kafka Connect Google Cloud Storage Examples

GCP Kafka Connect Google Cloud Storage GCS

In this GCP Kafka tutorial, I will describe and show how to integrate Kafka Connect with GCP’s Google Cloud Storage (GCS).  We will cover writing to GCS from Kafka as well as reading from GCS to Kafka.  Descriptions and examples will be provided for both Confluent and Apache distributions of Kafka.

I’ll document the steps so you can run this on your environment if you want.  Or, you can watch me do it in videos below. Or both. Your call.

It is expected that you have some working knowledge of Apache Kafka at this point, but you may not be an expert yet.  If you know about running Kafka Connect in standalone vs distributed mode or how topics may be used to maintain state or other more advanced topics, that’s great.  This is more a specific use case how-to tutorial.

If you have any questions or concerns, leave them in the comments below.  I’m happy to help.  Well, I’m happier to help for cash money or Ethereum, cold beer, or bourbon.

The overall goal here is keeping it simple and get a demo working asap.  We can optimize afterward.  And in this case, when I say “we can optimize”, I really mean “you can optimize” for your particular use case.

All the examples of accompanying source code in GitHub and screencast videos on YouTube.

Let’s roll.

Kafka GCP Requirements

  1. GCP GCS bucket which you can write and read from. You knew that already though, right?  Because this is a tutorial on integrating Kafka with GCS.  If you didn’t know this, maybe you should leave now.
  2. Kafka
  3. GCP service account JSON credentials file.  How to create is described below and also see the Resources section below for a link to GCP Service Account info.

Kafka GCP GCS Overview

When showing examples of connecting Kafka with Google Cloud Storage (GCS) we assume familiarity with configuring Google GCS buckets for access.  There is a link for one way to do it in the Resources section below.  For setting up my credentials, I installed gcloudcreated a service account in the GCP console and downloaded the key file.  Then, I ran `gcloud auth active-service-account --key-file mykeyfile.json` to update my ~/.boto file.  Note: mykeyfile.json is just an example.  Your JSON key file will likely be named something different.  Whatever the name of this file, you will need it to perform the steps below.

One way you can verify your GCP setup for this tutorial is to successfully run gsutil ls from the command line.

If you are new to Kafka Connect if you find the previous posts on Kafka Connect tutorials helpful.  I’ll go through it quickly in the screencast below in case you need a refresher.

Again, we will cover two types of examples.  Writing to GCS from Kafka with the Kafka GCS Sink Connector and then an example of reading from GCS to Kafka.  Technically speaking, we will configure and demo the Kafka Connect GCS Source and Kafka Connect GCS Sink connectors.

Kafka Connect GCS Sink Example with Confluent

Let’s see a demo to start.  In the following screencast, I show how to configure and run Kafka Connect with Confluent distribution of Apache Kafka. Afterward, we’ll go through each of the steps to get us there.

As you’ll see, this demo assumes you’ve downloaded the Confluent Platform already. I downloaded the tarball and have my $CONFLUENT_HOME variable set to /Users/todd.mcgrath/dev/confluent-5.4.1

As you will see, you will need your GCP service account JSON file for GCP authentication.  Let’s cover writing both Avro and JSON to GCP in the following tv show screencast.

Steps in screencast

  1. confluent local start
  2. Note how I copied over gcs-sink.propertiesfile from my Github repo.  Link below.  Open the file and show JSON credentials reference and Avro output example
  3. Show sink connector already installed
  4. Show empty GCS bucket
  5. First example is Avro, so generate 100 events of test data with `ksql-datagen quickstart=orders format=avro topic=orders maxInterval=100 iterations=100`  See the previous post on test data in Kafka for reference.
  6. confluent local load gcs-sink — -d gcs-sink.properties
  7. gsutil ls gs://kafka-connect-example/ and GCP console to show new data is present
  8. confluent local unload gcs-sink
  9. Second example is JSON output, so edit gcs-sink.properties file
  10. confluent local config datagen-pageviews — -d ./share/confluent-hub-components/confluentinc-kafka-connect-datagen/etc/connector_pageviews.config (Again, see link in References section below for previous generation of test data in Kafka post)
  11. kafkacat -b localhost:9092 -t pageviews

Kafka Connect GCS Sink Example with Apache Kafka

And now with Apache Kafka.  The GCS sink connector described above is a commercial offering, so you might want to try something else if you are a self-managed Kafka user.  At the time of this writing, I couldn’t find an option.  If you know of one, let me know in the comments below.  Thanks.

GCP Kafka Connect GCS Source Example

What to do when we want to hydrate data into Kafka from GCS?  Well, my fine friend, we use a GCS Source Kafka connector.  Let’s go through some examples.

In the following demo, since Kafka Connect GCS Source connector requires Confluent license after 30 days, we’ll run through the example using Confluent.

Steps in screencast

  1. confluent local start
  2. Again, I copied over gcs-source.propertiesfile from my Github repo.  Link below.
  3. Show sink connector already installed
  4. `gsutil ls gs://kafka-connect-example/topics/orders` which shows existing data on GCS from the previous tutorial
  5. `kafka-topics --list --bootstrap-server localhost:9092` to show orders topic doesn’t exist
  6. confluent local load gcs-source — -d gcs-source.properties
  7. confluent local consume orders -- --value-format avro --from-beginningor kafkacat -b localhost:9092 -t orders -s avro -r http://localhost:8081

What about if the source topic like orders already exists?  From docs

“Be careful when both the Connect GCS sink connector and the GCS Source Connector use the same Kafka cluster, since this results in the source connector writing to the same topic being consumed by the sink connector. This causes a continuous feedback loop that creates an ever-increasing number of duplicate Kafka records and GCS objects. It is possible to avoid this feedback loop by writing to a different topic than the one being consumed by the sink connector.”

If you want to run with the SMT

  1. confluent local unload gcs-source
  2. Modify gcs-source.properties file.  Uncomment SMT transformation lines as described in ye ole TV show above.
  3. confluent local load gcs-source — -d gcs-source.properties
  4. kafka-topics --list --bootstrap-server localhost:9092
  5. You should see copy_of_orderstopic

Kafka Connect GCS Sink Example with Apache Kafka

The GCS source connector described above is also commercial offering from Confluent, so let me know in the comments below if you find more suitable for self-managed Kafka.  Thanks.

 

GCP Kafka Google Cloud Storage (GCS) Helpful Resources

 

 

Featured image https://pixabay.com/photos/splash-jump-dive-sink-swim-shore-863458/

 

 

Kafka Test Data Generation Examples

Kafka Test Data Generation

After you start working with Kafka, you will soon find yourself asking the question, “how can I generate test data into my Kafka cluster?”  Well, I’m here to show you have many options for generating test data in Kafka.  In this post and demonstration video, we’ll cover a few of the ways you can generate test data into your Kafka cluster.

Now, before we begin, let’s cover a possible edge case.  If you are wondering about test data in Kafka Streams applications, you might find my previous post on testing Kafka Streams helpful. Well, also, I might find it helpful if you read it and comment on it too.

With that out of the way, let’s go through a few of your options.  I’ll cover ways to generate test data in Kafka from both Apache Kafka and Confluent Platform.

Kafka Test Data Screencast (AKA: Big Time TV Show)

Check out the screencast below to see a demo of examples using kafkacat, Kafka Connectors Datagen and Voluble and finally, ksql-datagen

Part 1 with Kafkacat

Our first example utilizes the kafkacat which is freely available at https://github.com/edenhill/kafkacat/

Here are the steps (more or less) in the above screencast

  1. Start Zookeeper and Kafka on localhost
  2. kafkacatis installed and in my path
  3. cat /var/log/system.log | kafkacat -b localhost:9092 -t syslog
  4. kafkacat -b localhost:9092 -t syslog -J
  5. curl -s “http://api.openweathermap.org/data/2.5/weather?q=Minneapolis,USA&APPID=my-key-get-your-own” |\
    kafkacat -b localhost:9092 -t minneapolis_weather -P
  6. kafkacat -b localhost:9092 -t minneapolis_weather
  7. Show other fun, good time resources such as Mockeroo and JSON-server

Test Data with Apache Kafka Connect Options

There are a couple of available Kafka Connect source connectors to assist in generating test data into Kafka.   There is the Kafka Connect Datagen connector which has been around for a while.  The Datagen connector includes two quickstart schemas to ahh, well, you know, get you started quickly.  See the Reference section below for the link.

In the screencast, I showed how both connectors are already installed.

Next, run some commands such as

  1. confluent local config datagen-pageviews — -d ./share/confluent-hub-components/confluentinc-kafka-connect-datagen/etc/connector_pageviews.config (your path might be different)
  2. kafkacat -b localhost:9092 -t pageviews

Next, we switched to another option for Kafka Connect based Kafka mock (or stub) data generation is a connector called Voluble.  I like how it integrates the Java Faker project which provides support for creating cross-topic relationships such as seen the examples

'genkp.users.with' = '#{Name.full_name}'
'genvp.users.with' = '#{Name.blood_group}'

'genkp.publications.matching' = 'users.key'
'genv.publications.title.with' = '#{Book.title}'

See how the users.keyis referenced in the above example.  Anyhow, much more documentation available from Github repo in the link below.

Steps with Voluble

  1. Listed topics kafka-topics --list --bootstrap-server localhost:9092
  2. Then, I loaded using a sample properties file found in my Github repo.  See the Resources below.
  3. confluent local load voluble-source -- -d voluble-source.properties (bonus points and a chance to join me on a future Big Time TV Show if you post how to load it in vanilla Kafka in the comments below.  )
  4. kafka-topics --list --bootstrap-server localhost:9092
  5. kafkacat -b localhost:9092 -t owners

Kafka Test Data in Confluent Platform

If you are a user of the Confluent Platform, you have an easy button available from the CLI with ksql-datagentool.  It has a couple of quickstart schemas to get you rolling quickly as shown in the following screencast

Quickly, let’s run through the following commands

  1. ksql-datagen quickstart=orders format=avro topic=orders maxInterval=100
  2. confluent local consume orders -- --value-format avro --from-beginning
  3. kafkacat -b localhost:9092 -t orders -s avro -r http://localhost:8081

Resources and Helpful References

 

Featured image credit https://pixabay.com/photos/still-life-bottles-color-838387/

Kafka Connect S3 Examples

Kafka Connect S3 Examples

In this Kafka Connect S3 tutorial, let’s demo multiple Kafka S3 integration examples.  We’ll cover writing to S3 from one topic and also multiple Kafka source topics. Also, we’ll see an example of an S3 Kafka source connector reading files from S3 and writing to Kafka will be shown.

Examples will be provided for both Confluent and Apache distributions of Kafka.

I’ll document the steps so you can run this on your environment if you want.  Or, you can watch me do it in videos below. Or both. Your call.

Now, to set some initial expectations, these are just examples and we won’t examine Kafka Connect in standalone or distributed mode or how the internals of Kafka Consumer Groups assist Kafka Connect.

If you have any questions or concerns, leave them in the comments below.

The overall goal will be keeping it simple and get working examples asap.  We can optimize afterward.

Accompanying source code is available in GitHub (see Resources section for link) and screencast videos on YouTube.

Let’s get started.

Kafka S3 Requirements

  1. S3 environment which you can write and read from. (I mean, “no duh”, or as some folks say, “no doy”.  What do you say?)
  2. MySQL (if you want to use the sample source data; described more below)
  3. Kafka (examples of both Confluent and Apache Kafka are shown)

Kafka S3 Setup

As you’ll see in the next screencast, this first tutorial utilizes the previous Kafka Connect MySQL tutorial.  In fact, if you are new to Kafka Connect, you may wish to reference this previous post on Kafka Connector MySQL examples before you start.  I’ll go through it quickly in the screencast below in case you need a refresher.

There are essentially two types of examples below.  One, an example of writing to S3 from Kafka with Kafka S3 Sink Connector and two, an example of reading from S3 to Kafka.  In other words, we will demo Kafka S3 Source examples and Kafka S3 Sink Examples.

Also, there is an example of reading from multiple Kafka topics and writing to S3 as well.

For authorization to S3, I’m going to show using the credentialsfile approach in the screencast examples.  For more information on S3 credential options, see the link in the Resources section below.

Kafka Connect S3 Sink Example with Confluent

Do you ever the expression “let’s work backward from the end”?  Well, you know what? I invented that saying! Actually, I’m kidding, I didn’t invent it.

Anyhow, let’s work backward from the end result in the following screencast. Then, we’ll go through each of the steps to get us there.

Again, we will start with Apache Kafka in Confluent example.  The next example uses the standalone Apache Kafka.

 

Here are the steps (more or less) in the above screencast

  1. Install S3 sink connector with `confluent-hub install confluentinc/kafka-connect-s3:5.4.1`
  2. confluent local start
  3. Optional `aws s3 ls kafka-connect-example` to verify your ~/.aws/credentials file
  4. Copy, modify s3-sink.properties file and load it with `confluent local load s3-sink — -d s3-sink.properties`
  5. List topics `kafka-topics --list --bootstrap-server localhost:9092`
  6. Load `mysql-bulk-source` source connector from the previous MySQL Kafka Connect tutorial with the command `confluent local load mysql-bulk-source — -d mysql-bulk-source.properties`
  7. List topics and confirm the mysql_* topics are present
  8. Show S3 Console with new files
  9. Review the S3 sink connector configuration

 

Kafka Connect S3 Sink Example with Apache Kafka

And now, let’s do it with Apache Kafka. Here’s a screencast of running the S3 sink connector with Apache Kafka.

Here are the steps (more or less) in the above screencast

  1. Start Zookeeper `bin/zookeeper-server-start.sh config/zookeeper.propties`
  2. Start Kafka `bin/kafka-server-start.sh config/server.properties`
  3. S3 sink connector is downloaded, extracted and other configuration
  4. Optional `aws s3 ls kafka-connect-example` to verify your ~/.aws/credentials file
  5. List topics `bin/kafka-topics.sh --list --bootstrap-server localhost:9092`
  6. Show modified s3-sink.properties file in my current directory (and the mysql-bulk-source.properties file as well)
  7. List topics `kafka-topics --list --bootstrap-server localhost:9092`
  8. Load `mysql-bulk-source` source connector from the previous MySQL Kafka Connect tutorial and also S3 sink connector with one command `bin/connect-standalone.sh config/connect-standalone.properties mysql-bulk-source.properties s3-sink.properties`
  9. List topics and confirm the mysql_* topics are present
  10. Show S3 Console with new files
  11. Review the S3 sink connector configuration

Kafka Connect S3 Sink Example with Multiple Source Topics

The previous examples showed streaming to S3 from a single Kafka topic.  What if you want to stream multiple topics from Kafka to S3?  Have no fear my internet friend, it’s easy with the topics.regexsetting and shown in the following screencast.  Sorry, but I just can’t do it.  Sometimes, and I just hate to admit this, but I just don’t have the energy to make all these big time TV shows.

If you need a TV show, let me know in the comments below and I might reconsider, but for now, this is what you need to do.

Here are the steps (more or less) of what I would have done in the Big Time TV Show (aka: a screencast) for sinking multiple Kafka topics into S3

  1. Update your s3-sink.properties file — comment out topics variable and uncomment the topics.regexvariable.
  2. Unload your S3 sink connector if it is running
  3. Load the S3 sink connector
  4. Check out S3 — you should see all your topic data whose name starts with mysql

Kafka Connect S3 Source Example

When it comes to ingesting reading from S3 to Kafka with a pre-built Kafka Connect connector, we might be a bit limited.  At the time of this writing, there is a Kafka Connect S3 Source connector, but it is only able to read files created from the Connect S3 Sink connector.  We used this connector in the above examples.  From the Source connector’s documentation--

“The Kafka Connect Amazon S3 Source Connector provides the capability to read data exported to S3 by the Apache Kafka® Connect S3 Sink connector and publish it back to a Kafka topic”

Now, this might be completely fine for your use case, but if this is an issue for you, there might be a workaround.  As a possible workaround, there are ways to mount S3 buckets to a local files system using things like s3fs-fuse.  From there, it should be possible to read files into Kafka with sources such as the Spooldir connector.  Let me know in the comments.

Because Kafka Connect S3 Source connector requires a Confluent license after 30 days, we’ll run through the following demo using Confluent.

Here are the steps (more or less) in the above screencast

  1. Install S3 sink connector with `confluent-hub install confluentinc/kafka-connect-s3-source:1.2.2`
  2. confluent local start
  3. Optional `aws s3 ls kafka-connect-example` to verify your ~/.aws/credentials file
  4. Copy and modify s3-source.properties file
  5. List topics `kafka-topics --list --bootstrap-server localhost:9092` and highlight how the mysql_* topics are present
  6. Load S3 source connector with `confluent local load s3-source — -d s3-source.properties`
  7. List topics and confirm the copy_of* topics are present
  8. Review the S3 sink connector configuration

Kafka Connect S3 Helpful Resources

 

Featured image https://pixabay.com/photos/old-bottles-glass-vintage-empty-768666/