Stream Processor Windows

Stream Processor Windows

When moving to stream processing architecture or building stream processors, you will soon face two choices.  Will you process streams on an individual, per event basis?  Or, will you collect and buffer multiple events/messages first, and then apply a function or join results to this collection of events?

Examples of single event processing might be the current GPS location, temperature, removing PII or enriching a record with address information.

Conversely, examples of the processing multiple events, which involves computing results across multiple events, including weblog traffic for breakout pages clicked during a session in order to make a recommendation, metrics from mobile or IoT devices such temperature readings over the last 5 minutes, fraud detection based on events over a set time period, and behavior analysis of what is added and removed from a shopping cart while visiting a particular website.  This is the opposite of single event processing where any associated context of the event is irrelevant.

But, back to the idea of processing events as a group of events rather than individually.  This implies stream processor implementations provide the capability of 1) gathering multiple events and 2) performing some kind of computation function on the collection of events.  Different implementations provide variances around these two fundamental constructs.  This high-level capability when designing stream processors is called window-based operations or more succinctly, windowing.

Stream Processor Windows Overview

Window operations define boundaries to create finite sets of events and then perform functions against the set of events.  Sometimes these sets are called segments or collections or buckets.  The assignment of events into buckets is based on time or properties inherent in the event data.  Critical concept: There can be two notions of “time”: event time and processing time.

Event time is when the event occurred vs processing time which is the time when even was processed.  Do not overlook the differences between event and processing time.  Ideally, these two values are the same, but that’s not reality and the time will have different degrees of skew over time.  For a great explanation of these differences and how skew occurs, check out the Streaming 101 post.

Windowing needs set boundaries for bucketing and how often the window produces a result.  Result functions may be aggregated, such as the total time spent in a particular location or other common functions such as max, min, mean, median, standard deviation, etc.

Types of Stream Processor Windows

There are four types of windowing.  Some of the most commonly used windowing methods implemented in streaming engines include tumbling, hopping sliding and session windows.

Tumbling Windows

Tumbling windows segment events into fixed sizes and then perform a function against the events in the collection.  Tumbling window segmentation may be based on a particular count of elements or a set period of time.   A key differentiator in Tumbling windows is there is no overlap between windows.  In other words, unlike other types of windows, an event can only be part of one particular window.  This differs from Hopping windows, which is the next type of window described.

Hopping Windows

Hopping windows are based on fixed time period intervals.  Hoping window results may overlap with other windows, so events may belong to more than one window processing result.  Hopping windows are defined by window time size (e.g. 5 minutes) and the advance interval or “hop” (e.g. 1 minute).   A Hopping window may be configured to be the same as a Tumbling window if the hop size to be the same as the window time size.  This will result in no overlaps and thus, all events will be part of only one bucket.

Sliding Windows

From my research, sliding vs tumbling windows is a bit of a debate.

Sliding windows produce an output result only when an event occurs which is different than Tumbling or Hopping windows.  A good graphic comparing and contrasting Sliding vs Tumbling windows can be seen here.

In Kafka Streams, sliding windows are used only for `join` operations.  There doesn’t appear to be this distinction in Spark Streaming.

Similarities to previously described Tumbling and Hopping windows include the notion an event might belong to multiple windows and windows are defined by size and hop size.

Session Windows

Grouping of events originating from the same period of user activity or session is commonly used for behavior analysis.  User activities or “sessions” can be collected using a session window.  A session window starts when the first event occurs and remains open until a timeout value is reached. If another event occurs within the specified timeout from the previously ingested event, the session window extends to ingest the new event. If no events occur within the timeout, the session window is closed.

A common example of a session is a user browsing a website.  A user may enter a search term, compare different product pages, add one or more items to their shopping cart and eventually checkout or abandon their session without purchase.

Windowing Conclusion and Resources

You will face a variety of options when considering streaming applications and stream processing architectures.   If you need to process and produce results from a bucket of events rather than processing one event at a time, you will need stream processor windows.  Depending on your streaming implementation, you will have variances in what window operations are available to utilize.   The following links may help in learning more about stream processing windows from different perspectives.

Stream Processor Window Examples

Kafka Stream Joins has examples of setting windows.

Kafka Streams Windowing

Spark Streaming Windowing

Flink Windowing

Beam Windowing

Change Data Capture – What, Why, How

Change Data Capture

Change Data Capture can be a lightweight mechanism to capture the changes in databases so they may be processed someplace other than the database or application(s) which made the change. 

But, why?  Why would we want this? 

Let’s cover common questions and concerns around Change Data Capture or CDC in this post, so we have context for the specific CDC tutorials available on this site.

Also, in addition to “Why CDC”, let’s provide insight into other common CDC questions such as “What is CDC?” and “How do we implement CDC?” and listing any pros/cons to CDC.  Let’s start with Why.

Why CDC?

Let’s consider “Why CDC” at a high level first before we get into more of the details.  First, if data can be processed which results in “value” without any integration with other data, there is simply no need for CDC.  Let’s consider “value” to be examples such as providing the ability better decisions, create more sales, improve logistic costs, etc.  If we can realize this value in one application with one database, for example, there is no need for CDC.

But, what do we do if this “value” we are seeking can only be realized if data is combined from multiple sources?  For example, how do we integrate data from an E-commerce application and our inventory databases?  As a Data Engineer, how do you implement this?  As you already know, you have many options such as traditional Extract Transform Load (ETL) or more modern Extract Load Transform (ELT) attempts.

What is CDC?

CDC is utilized to extract and transport changes in online transactional processing (OLTP) data systems to downstream systems such as an Event Log, data lakes, and/or stream processors.  For example, our CDC architecture might resemble this diagram.

 

Change Data Capture Architecture Diagram example
Change Data Capture Architecture Diagram example

 

In essence, CDC is implemented in databases by writing to immutable transaction logs and then providing a mechanism to read from these logs. These transaction logs are not designed solely for CDC, but more often, these transaction logs are an unexpected benefit for CDC.  These transaction logs are more often intended to solve resiliency and performance concerns in a particular database.  But, these logs also provide a mechanism for which a CDC process can read transactions with minimal performance impact on the transaction database.

  • Turning on and configuring CDC also requires no change to existing schema or applications using the database.
  • In addition to data mutations, CDC transaction logs can also capture changes in structure.  In other words, a database transaction logs can capture both DML and DDL.

The alternative to CDC is reading from a transaction log is reading the database tables directly.  The tables will need indicators to help determine what data has been created or updated.  These indicators are typically implemented with audit columns such as dateCreatedto indicate Inserts and lastUpdated to flag updates.

Let’s consider a high-level example.  In an E-commerce application, new inserts, updates, or deletes to transactions in the E-commerce database could be consumed via CDC and sent to an Event Log.  From here, the Event Log could be a source for Stream Processors to perform valuable near real-time computations such as fraud detection, recommendations, alerts, etc. or the Event Log could be just a plain old data buffer before publishing to downstream analytic systems.

For more information on Event Log, see Why Event Logs? post.

Change Data Capture Vendor Examples

Change Data Capture is freely available out-of-the-box from database vendors such as Microsoft SQL Server, Oracle, PostgreSQL, and MySQL.

In Microsoft SQL Server, CDC records insert, edit, and delete activities in the SQL Server table in a detailed format [1]. Column information, along with the metadata, is captured for the modified rows. Then, it is stored in append-only change tables. Table-valued functions enable users to have systematic access to these change data tables.  Records within the change tables are immutable and therefore similar to the value of an immutable log.

In Oracle, it is possible to capture and publish changed data in synchronous and asynchronous modes [2]. In synchronous mode, change data is captured as part of the transaction that modifies the source table. This mode uses triggers on the source database to capture change data. Change data is captured in real-time on the source database. SYNC_SOURCE is a single, predefined synchronous change source that cannot be altered. Synchronous mode is cost-efficient, though it adds overhead to the source database at capture time.  In asynchronous mode, change data is captured from the database redo log files after changes have been made to the source database.

In MySQL, CDC is available as part of the `binlog` which was originally used in auditing and copying data to other MySQL systems [3]. But, the `binlog` may be utilized outside of MySQL for CDC events processing and saving to downstream analytics systems for example.

In PostgreSQL, change data capture is possible in either transaction logs or triggers. In transaction logs, all the write transactions (i.e., INSERT, UPDATE, DELETE, DDL’s) are written to the Write-Ahead Logs (WAL) before the transaction result is sent to the user or client [4].  The WAL allows the consumption of CDC events.

In MongoDB, the transaction log is called the `opLog`.

I’m sure you get the idea by now.

When to Consider Change Data Capture?

Change data capture plays a significant role in streaming data processing and pipelines. As the amount of data grows rapidly, the need for CDC techniques becomes crucial to handle data inflow for analytics and near real-time analytics such as machine learning and artificial intelligence (AI).

Problems arise when long-running, taxing analytic queries are introduced on OLTP systems which affect overall application performance. By using CDC-based technologies, it allows users to capture database mutations such as inserts, updates, deletes as well as changes in structures such as DDL `alter table` changes automatically.  This promotes flexibility to process in streaming applications and/or to store in destinations suited for analytic queries.

CDC implementations contain the metadata necessary to understand the changes made. CDC technologies reduce cost and enable the improvement of data quality, accuracy, and provide a mechanism to create streaming architectures. CDC is a solution for continuous and accelerating growth in data volumes, reducing load time, resources and cost.

Change data capture summary list of advantages:

    • CDC implementation does not require application code changes
    • Only requires configuration changes to the database
    • Enables us to identify a change history
    • Facilitates the user to add context information to every DML data mutation if required
    • CDC has an auto cleanup feature, deleting information automatically based on the retention period

Change Data Capture has its disadvantages:

    • CDC may not track change time
    • May not track the security context of the change
    • Does not specify save how the data changed; it only tracks that a change was made
    • A slight overhead to the system may be added, depending on the number of changes

Change Data Capture Options

Once you have configured CDC in Oracle, SQL Server, etc. you may be wondering how can you consume and process the CDC events?  How can you implement a streaming architecture with CDC?  What are your options for building CDC stream processors?  etc.

At the time of this writing, here are a few options to consider and not listed in any particular order

  1. Debezium – Open Source.  Built on the Kafka Connect framework. All of Debezium’s connectors are Kafka Connector source connectors so they have the pros and cons associated with Kafka Connect.
  2. StreamSets – Open Source.  Out of the box support for all CDC implementations described here as well as others.
  3. Others?  Let us know.

Also, you may consider vendor-specific options as well such as

  1. Oracle Golden Gate
  2. Dynamo DB Streams
  3. Attunity

Conclusion

Change data capture is available out-of-the-box in many database systems such as MS SQL Server, Oracle, MySQL, Postgres, and MongoDB.  CDC is prevalent in streaming architectures when implementing separation of concerns between transactions and other concerns such as analytics, search indexing, machine learning, AI, near real-time monitoring, and alerting.

Hope this helps!

Let us know if you have any questions or concerns.

References

[1] SQL Server https://docs.microsoft.com/en-us/sql/relational-databases/track-changes/about-change-data-capture-sql-server?view=sql-server-2017

[2] Oracle CDC https://docs.oracle.com/cd/E11882_01/server.112/e25554/cdc.htm

[3] MySQL binlog https://dev.mysql.com/doc/internals/en/binary-log-overview.html

[4] PostgreSQL WAL https://www.postgresql.org/docs/11/runtime-config-wal.html

 

 

Image credit https://pixabay.com/en/deer-dream-animal-fantasy-1333814/