Building Real-time Applications with Kafka Streams and ksqlDB: A Step-by-Step Tutorial


Building Real-time Applications with Kafka Streams and ksqlDB: A Step-by-Step Tutorial

Building Real-time Applications with Kafka Streams and ksqlDB: A Step-by-Step Tutorial

Apache Kafka is a powerful platform for handling event streams, but to truly unlock its value, you need tools to process, transform, and analyze that data in real time. Two prominent technologies within the Kafka ecosystem for building such real-time applications are Kafka Streams and ksqlDB.

Kafka Streams is a client library for building stream processing applications and microservices directly in Java or Scala. It offers a rich, high-level DSL and a lower-level Processor API for fine-grained control. On the other hand, ksqlDB provides a SQL-like streaming database built on Kafka, allowing you to define and run stream processing applications using familiar SQL syntax.

This step-by-step tutorial will guide you through building a simple real-time application using both Kafka Streams and ksqlDB, highlighting their respective strengths and use cases. At ActiveWizards, we often help clients choose and implement the right stream processing tools for their specific needs, and this guide will give you a practical introduction.

Our Example Use Case: Real-Time Clickstream Aggregation

Let's imagine we have a Kafka topic named page_views containing records of user page views on a website, with data like:

{ "user_id": "user_A", "page_url": "/products/123", "timestamp": 1678886400000 }
{ "user_id": "user_B", "page_url": "/home", "timestamp": 1678886405000 }
{ "user_id": "user_A", "page_url": "/cart", "timestamp": 1678886410000 }

Our goal is to build a real-time application that counts the number of page views per user within 1-minute windows and outputs these aggregations to a new Kafka topic named user_page_view_counts.

 

Diagram 1: Real-Time Clickstream Aggregation Flow.

Option 1: Building with Kafka Streams (Java/Scala API)

Kafka Streams is ideal when you need the full power and flexibility of a programming language (Java or Scala), tight integration with other Java libraries, or complex custom logic that goes beyond SQL capabilities.

Prerequisites for Kafka Streams:

  • Java Development Kit (JDK) 8 or higher.
  • A build tool like Maven or Gradle.
  • Access to a running Kafka cluster.
  • Kafka client libraries (including Kafka Streams) added as dependencies.

Step-by-Step Kafka Streams Application (Conceptual Java Example):

1. Setup Project & Dependencies (e.g., Maven pom.xml):

    
        org.apache.kafka
        kafka-streams
        3.6.0 
        com.fasterxml.jackson.core
        jackson-databind
        2.15.2 
    

2. Define Data Structures (POJOs) and SerDes (Serializers/Deserializers):

For our example, let's assume you have POJOs PageView.java and UserViewCount.java. You'll need custom SerDes if not using a library that provides them for JSON or Avro.

Pro-Tip: For robust schema management and evolution, especially in production, using Avro with Confluent Schema Registry and the corresponding Avro SerDes (e.g., SpecificAvroSerde or GenericAvroSerde) is highly recommended over plain JSON.

3. Write the Stream Processing Logic:

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.KeyValue; // For map result
import org.apache.kafka.streams.kstream.*; // For KStream, KTable, TimeWindows etc.
// import your.package.PageView; // Assuming PageView POJO
// import your.package.UserViewCount; // Assuming UserViewCount POJO
// import your.package.serdes.PageViewSerde; // Assuming custom PageView SerDe

import java.time.Duration;
import java.util.Properties;
// conceptual import for a JSON library like Jackson
// import com.fasterxml.jackson.databind.ObjectMapper; 

public class PageViewAggregatorKafkaStreams {

    // Placeholder for POJOs for clarity in this example
    // In a real app, these would be proper classes.
    static class PageView { 
        public String userId; 
        public String pageUrl; 
        public long timestamp; 
        public String getUserId() { return userId; } 
        // Add constructor, getters, setters as needed
        // Example: public PageView(String userId, String pageUrl, long timestamp) { this.userId = userId; this.pageUrl = pageUrl; this.timestamp = timestamp; }
    }
    
    // static class UserViewCount { public String userId; public long count; /* constructors, getters, setters */ }


    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "page-view-aggregator-streams-app");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // Replace with your brokers
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        // For simplicity in this example, we'll assume the input value is a simple String (e.g. user_id)
        // or a JSON string that we conceptually parse. In production, use a proper PageViewSerde.
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); 
        // If PageViewSerde is implemented:
        // props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, PageViewSerde.class.getName());


        StreamsBuilder builder = new StreamsBuilder();

        KStream<String, String> pageViewTextStream = builder.stream(
            "page_views", // Input topic
            Consumed.with(Serdes.String(), Serdes.String()) // Assuming key is String, value is JSON String
        );

        // Conceptual: Parse JSON String to PageView Object (replace with robust JSON parsing)
        KStream<String, PageView> pageViews = pageViewTextStream.mapValues(valueJsonString -> {
            // This is a simplified conceptual parsing. Use a proper JSON library (Jackson, Gson).
            // For this example, let's assume the JSON string contains a userId.
            // Example with Jackson (conceptual, ObjectMapper should be instantiated once):
            // ObjectMapper objectMapper = new ObjectMapper(); 
            // try { 
            //    return objectMapper.readValue(valueJsonString, PageView.class); 
            // } catch (Exception e) { 
            //    System.err.println("Failed to parse PageView: " + e.getMessage());
            //    return null; // Or handle error appropriately
            // }
            // Simplified for this example:
            PageView pv = new PageView(); 
            // Let's assume the JSON string 'valueJsonString' is like: {"user_id": "user_X", ...}
            // A real parser would extract this:
            if (valueJsonString.contains("user_A")) pv.userId = "user_A"; // Highly simplified
            else if (valueJsonString.contains("user_B")) pv.userId = "user_B"; // Highly simplified
            else pv.userId = "unknown_user";
            return pv;
        }).filter((key, value) -> value != null && value.getUserId() != null && !value.getUserId().equals("unknown_user"));


        KTable<Windowed, Long> userViewCountsTable = pageViews
            .groupBy((key, pageView) -> pageView.getUserId(), 
                     Grouped.with(Serdes.String(), Serdes.String())) // Use PageViewSerde here in prod
            .windowedBy(TimeWindows.of(Duration.ofMinutes(1)))
            .count(Materialized.as("user-page-view-counts-store")); // State store name

        userViewCountsTable.toStream()
            // Map to a format suitable for output: key=user_id, value=count (as String)
            .map((windowedUserId, count) -> KeyValue.pair(windowedUserId.key(), count.toString())) 
            .to("user_page_view_counts", Produced.with(Serdes.String(), Serdes.String())); // Output topic

        final KafkaStreams streams = new KafkaStreams(builder.build(), props);
        
        // Add shutdown hook for graceful closure
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            System.out.println("Shutting down Kafka Streams application...");
            streams.close(Duration.ofSeconds(10)); // Allow up to 10 seconds for cleanup
        }));

        try {
            streams.cleanUp(); // Clean up local state directory before starting (for dev/testing)
            streams.start();
            System.out.println("Kafka Streams application started.");
        } catch (Throwable e) {
            System.err.println("Error starting Kafka Streams application: " + e.getMessage());
            e.printStackTrace();
            System.exit(1);
        }
    }
}

Explanation:

  • We define StreamsConfig with application ID and bootstrap servers. You would replace placeholder versions and broker addresses.
  • We create a StreamsBuilder.
  • We consume from the page_views topic into a KStream. For simplicity, we're treating values as Strings and doing conceptual parsing; in production, you'd use a custom SerDe (Serializer/Deserializer) for your PageView objects (e.g., JSON or Avro).
  • We groupBy the user_id extracted from the PageView object.
  • We apply a tumbling windowedBy of 1 minute.
  • We count() the occurrences within each window for each user, materializing the result into a state store.
  • Finally, we convert the resulting KTable (which represents the aggregated counts) back to a KStream and send it to the user_page_view_counts topic. The count (a Long) is converted to a String for this example output SerDe.
  • A shutdown hook is added for graceful closure.

4. Build and Run:

Compile your Java/Scala code (e.g., using Maven: `mvn clean package`) and run the application JAR. It will connect to Kafka and start processing data from the `page_views` topic.

Quick Reference: Common Kafka Streams Operations

  • builder.stream("topic_name", Consumed.with(keySerde, valueSerde)): Consume from a topic.
  • kstream.mapValues(ValueMapper<V, VR> mapper): Transform the value of each record.
  • kstream.filter(Predicate<K, V> predicate): Keep records that match a predicate.
  • kstream.groupBy(KeyValueMapper<K, V, KR> selector, Grouped.with(keySerde, valueSerde)): Group records by a new key.
  • kgroupedStream.windowedBy(TimeWindows.of(Duration)): Define time windows for aggregation.
  • kwindowedStream.count() or .count(Materialized.as("store-name")): Count records in each group/window.
  • kwindowedStream.aggregate(Initializer, Aggregator, Materialized): Perform custom aggregation.
  • kstream.join(otherKStream, ValueJoiner, JoinWindows): Join with another KStream.
  • kstream.to("output_topic_name", Produced.with(keySerde, valueSerde)): Write results to an output topic.

Option 2: Building with ksqlDB (SQL-like Interface)

ksqlDB is excellent for users familiar with SQL, for rapid prototyping, and for use cases that can be expressed well with SQL semantics. It runs as a separate server that interacts with your Kafka cluster.

Prerequisites for ksqlDB:

  • A running Kafka cluster.
  • A running ksqlDB server (or Confluent Cloud ksqlDB).
  • ksqlDB CLI (for interacting with the ksqlDB server).

Step-by-Step ksqlDB Application:

1. Start ksqlDB Server and CLI:

Follow the official ksqlDB documentation for setup. Once running, connect to your ksqlDB server using the ksqlDB CLI.

2. Create a STREAM from the Kafka Topic:

In the ksqlDB CLI:

SET 'auto.offset.reset' = 'earliest';

CREATE STREAM page_views_stream (
    user_id VARCHAR KEY,    -- Assuming user_id is the Kafka message key
    page_url VARCHAR,
    event_timestamp BIGINT  -- Renamed from 'timestamp' to avoid ksqlDB keyword conflict
) WITH (
    KAFKA_TOPIC = 'page_views',
    VALUE_FORMAT = 'JSON',  -- Or AVRO if your topic uses Avro
    TIMESTAMP = 'event_timestamp' -- Use the timestamp field from the message for windowing
);

Here, `event_timestamp` refers to the field within your JSON message that holds the actual event time. If your Kafka messages themselves have a header timestamp you want to use, ksqlDB can also work with that (ROWTIME by default).

3. Define the Continuous Query for Aggregation:

CREATE TABLE user_page_view_counts_table
WITH (KAFKA_TOPIC='user_page_view_counts', VALUE_FORMAT='JSON', KEY_FORMAT='JSON') -- Or AVRO
AS SELECT
    user_id,
    COUNT(*) AS view_count
FROM
    page_views_stream
WINDOW TUMBLING (SIZE 1 MINUTE)
GROUP BY
    user_id
EMIT CHANGES;

Explanation:

  • CREATE STREAM defines a schema on top of your existing page_views Kafka topic, making its data queryable. We renamed `timestamp` to `event_timestamp` to avoid potential conflicts with ksqlDB's internal `ROWTIME`.
  • CREATE TABLE AS SELECT ...defines a continuous query that performs the aggregation.
    • It selects the user_id and counts occurrences (COUNT(*)).
    • WINDOW TUMBLING (SIZE 1 MINUTE) defines the 1-minute non-overlapping window.
    • GROUP BY user_id aggregates counts per user within each window.
    • EMIT CHANGES means that updates to the table (new counts or updated counts for a user within a window) will be streamed to its underlying Kafka topic.
    • WITH (KAFKA_TOPIC='user_page_view_counts', ...) explicitly names the output Kafka topic and its format.

Pro-Tip for ksqlDB: When creating streams or tables, pay close attention to VALUE_FORMAT (e.g., JSON, AVRO, PROTOBUF). If using Avro or Protobuf, ensure your ksqlDB server is configured with the Schema Registry URL (ksql.schema.registry.url in ksqlDB server properties).

4. (Optional) Query the results directly in ksqlDB or consume from the output topic:

You can query the ksqlDB table:

SELECT * FROM user_page_view_counts_table EMIT CHANGES;

Or, any Kafka consumer can now subscribe to the user_page_view_counts topic to receive the aggregated counts.

Kafka Streams vs. ksqlDB: When to Choose Which?

 
Feature/ConsiderationKafka StreamsksqlDB
Primary Interface Java/Scala API SQL-like language
Complexity of Logic Handles highly complex, custom logic; full programming power. Best for logic expressible in SQL; UDFs/UDAs for extension.
Development Speed Longer development cycle (compile, package, deploy). Very rapid development and iteration via CLI/UI.
Operational Overhead Runs as a standard Java/Scala application; you manage deployment. Requires running ksqlDB server(s) in addition to Kafka.
State Management Rich state store capabilities (RocksDB); fine-grained control. Manages state internally; simpler for users but less control.
Target User Software Engineers, Stream Processing Specialists. Data Analysts, Engineers familiar with SQL, rapid prototypers.
Ecosystem Integration Seamless with Java/Scala libraries and frameworks. Connectors for external systems; primarily focused on Kafka data.
Data Transformation Highly flexible via code; custom functions easily implemented. Good for common transformations (filtering, mapping, basic UDFs). Complex transformations can be verbose or require UDFs.
Windowing Capabilities Rich and flexible windowing (tumbling, hopping, session, custom). Supports tumbling, hopping, and session windows with SQL syntax.
Testing Standard Java/Scala unit and integration testing frameworks (e.g., `TopologyTestDriver`). Testing often involves setting up test Kafka topics and verifying output; less direct unit testing of SQL logic.

Choose Kafka Streams when:

  • You need complex, custom processing logic not easily expressed in SQL (e.g., iterative algorithms, machine learning model inference within the stream, highly custom stateful operations).
  • You require fine-grained, programmatic control over state management, including choice of state stores (like RocksDB) and custom state access patterns.
  • Your application needs to integrate deeply with other existing Java or Scala libraries and frameworks seamlessly within the same application.
  • You are building embedded streaming capabilities within a larger Java/Scala application or microservice rather than a standalone processing layer.
  • Performance is absolutely critical, and you need the ability to optimize at a very low level using the Processor API.
  • Your team primarily consists of Java/Scala developers who are comfortable with API-based stream processing.

Choose ksqlDB when:

  • Your team is proficient in SQL and wants to leverage those existing skills for building stream processing applications quickly.
  • You need rapid development, interactive querying, and fast iteration cycles for defining and testing streaming logic.
  • The processing logic (filtering, transformations, joins, windowed aggregations) fits well within standard or extended SQL semantics.
  • You prefer a declarative approach to stream processing, defining *what* you want to achieve rather than *how* to implement it step-by-step.
  • You want a simpler operational model for defining and managing stream processing jobs (once the ksqlDB server itself is set up and managed).
  • The primary goal is to transform, enrich, or aggregate data already within Kafka for consumption by other Kafka-aware applications or for sinking to other systems via Kafka Connect.
  • You need to provide data analysts or less code-centric roles the ability to work with streaming data.

It's also common to use both! For instance, ksqlDB might be used for initial data preparation or simpler aggregations, with Kafka Streams applications consuming those processed streams for more complex tasks.

Conclusion: Empowering Your Real-Time Initiatives

Both Kafka Streams and ksqlDB offer powerful capabilities for building real-time applications on top of Apache Kafka. Kafka Streams provides unparalleled flexibility and control for developers, while ksqlDB brings the accessibility and rapid development power of SQL to the world of stream processing.

By understanding their strengths and following practical examples like the one outlined, you can choose the right tool—or combination of tools—to transform your Kafka event streams into actionable insights and intelligent real-time applications.

Further Exploration & Official Resources

To dive deeper into Kafka Streams and ksqlDB:

 

Looking to build sophisticated real-time applications with Kafka Streams or ksqlDB?

ActiveWizards offers expert development and consulting services to help you design, implement, and optimize your stream processing solutions. We can guide you in choosing the right technology and architecting applications that meet your specific business needs for speed, scale, and reliability.

Comments (0)

Add a new comment: