Streaming SQL: Real-Time Analytics Approaches

Streaming SQL: Real-Time Analytics Approaches

Simor Consulting | 17 Aug, 2024 | 08 Mins read

Streaming SQL: Real-Time Analytics Approaches

Batch processing can’t deliver insights fast enough for many use cases. Streaming SQL extends SQL semantics to continuous queries over unbounded data streams, combining familiar SQL with real-time processing.

This article covers streaming SQL concepts, frameworks, and implementation patterns.

The Evolution from Batch to Stream Processing

Traditional Batch Processing

Traditionally, data analytics has followed a batch processing model:

  1. Data is collected over a period
  2. Processing is triggered at scheduled intervals
  3. Results are generated based on the entire dataset
  4. Insights are delivered with significant lag time
-- Traditional batch SQL query
SELECT
  product_id,
  COUNT(*) as purchase_count,
  SUM(amount) as total_revenue
FROM purchases
WHERE purchase_time >= '2024-08-01' AND purchase_time < '2024-08-02'
GROUP BY product_id
ORDER BY total_revenue DESC
LIMIT 10;

This approach works well for historical analysis but falls short when timely insights are required.

The Rise of Stream Processing

Stream processing fundamentally changes this paradigm:

  1. Data is processed as it arrives
  2. Computation is continuous rather than scheduled
  3. Results are incrementally updated
  4. Insights are available with minimal latency

The shift from batch to stream processing represents a profound change in how we think about data and computation:

AspectBatch ProcessingStream Processing
Data ModelStatic, bounded datasetsDynamic, unbounded streams
Processing TriggerScheduled or manualContinuous, event-driven
Result CompletenessComplete, exactProgressive, approximate
LatencyMinutes to hoursMilliseconds to seconds
State ManagementMinimal, often statelessComplex, stateful
Failure RecoverySimple restartSophisticated checkpointing

Fundamentals of Streaming SQL

Core Concepts

Streaming SQL extends traditional SQL semantics to handle unbounded data streams:

1. Time Windows

Since streams are unbounded, windows provide a mechanism to bound computation:

-- Tumbling window (non-overlapping fixed intervals)
SELECT
  product_id,
  TUMBLE_END(event_time, INTERVAL '5' MINUTE) AS window_end,
  COUNT(*) AS purchase_count,
  SUM(amount) AS revenue
FROM purchase_stream
GROUP BY
  product_id,
  TUMBLE(event_time, INTERVAL '5' MINUTE);

Common window types include:

  • Tumbling windows: Fixed-size, non-overlapping time intervals
  • Sliding windows: Fixed-size, overlapping time intervals
  • Session windows: Dynamic-size windows formed by periods of activity
  • Global windows: A single window containing all elements

2. Event Time vs. Processing Time

Streaming systems must handle two important time concepts:

  • Event Time: When an event actually occurred (embedded in the data)
  • Processing Time: When the event is observed by the system
-- Event time windowing with watermark to handle late data
SELECT
  sensor_id,
  HOP_START(event_time, INTERVAL '1' MINUTE, INTERVAL '5' MINUTE) AS window_start,
  HOP_END(event_time, INTERVAL '1' MINUTE, INTERVAL '5' MINUTE) AS window_end,
  AVG(temperature) AS avg_temp
FROM sensor_readings
WHERE event_time BETWEEN CURRENT_TIMESTAMP - INTERVAL '1' HOUR AND CURRENT_TIMESTAMP
GROUP BY
  sensor_id,
  HOP(event_time, INTERVAL '1' MINUTE, INTERVAL '5' MINUTE);

3. Stateful Processing

Streaming SQL often requires maintaining state across events:

-- Detecting temperature spikes using state
SELECT
  sensor_id,
  event_time,
  temperature,
  LAG(temperature, 1) OVER (PARTITION BY sensor_id ORDER BY event_time) AS prev_temp,
  CASE
    WHEN temperature - LAG(temperature, 1) OVER (PARTITION BY sensor_id ORDER BY event_time) > 5
    THEN 'SPIKE'
    ELSE 'NORMAL'
  END AS status
FROM sensor_readings;

4. Handling Late Data

Real-world streams may contain late-arriving data. Watermarks define how long to wait for late events:

-- Setting a watermark to handle late data
CREATE TABLE sensor_readings (
  sensor_id STRING,
  event_time TIMESTAMP(3),
  temperature DOUBLE,
  WATERMARK FOR event_time AS event_time - INTERVAL '10' SECONDS
);

Leading Streaming SQL Frameworks

Several frameworks and platforms have emerged to support SQL on streams:

Flink offers a robust SQL interface for stream processing:

-- Flink SQL example
CREATE TABLE order_stream (
  order_id BIGINT,
  user_id BIGINT,
  product_id BIGINT,
  order_time TIMESTAMP(3),
  amount DECIMAL(10, 2),
  WATERMARK FOR order_time AS order_time - INTERVAL '5' SECONDS
) WITH (
  'connector' = 'kafka',
  'topic' = 'orders',
  'properties.bootstrap.servers' = 'kafka:9092',
  'properties.group.id' = 'order-analytics',
  'format' = 'json',
  'scan.startup.mode' = 'latest-offset'
);

-- Continuous query
INSERT INTO order_summary
SELECT
  product_id,
  TUMBLE_END(order_time, INTERVAL '1' MINUTE) AS window_end,
  COUNT(DISTINCT user_id) AS unique_users,
  COUNT(*) AS order_count,
  SUM(amount) AS total_amount
FROM order_stream
GROUP BY
  product_id,
  TUMBLE(order_time, INTERVAL '1' MINUTE);

Key Flink SQL features:

  • Rich windowing functions
  • Sophisticated event time handling
  • Strong consistency guarantees
  • Integration with Flink’s DataStream API

Apache Kafka Streams and ksqlDB

Kafka’s ecosystem provides SQL-like capabilities for stream processing:

-- ksqlDB example
CREATE STREAM orders (
  order_id BIGINT,
  user_id BIGINT,
  product_id BIGINT,
  order_time TIMESTAMP,
  amount DOUBLE
) WITH (
  KAFKA_TOPIC='orders',
  VALUE_FORMAT='JSON',
  TIMESTAMP='order_time'
);

-- Continuous aggregation
CREATE TABLE product_stats AS
SELECT
  product_id,
  COUNT(*) AS order_count,
  SUM(amount) AS total_revenue,
  AVG(amount) AS avg_order_value
FROM orders
WINDOW TUMBLING (SIZE 5 MINUTES)
GROUP BY product_id;

Key ksqlDB features:

  • Tight integration with Kafka
  • Simpler deployment and operations
  • Stream-table duality
  • Limited window types compared to Flink

Apache Spark Structured Streaming

Spark offers SQL capabilities over streaming data:

// Spark Structured Streaming with SQL syntax
val orderStream = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "kafka:9092")
  .option("subscribe", "orders")
  .load()
  .selectExpr("CAST(value AS STRING)")
  .select(from_json($"value", orderSchema).as("order"))
  .select("order.*")

// Create a temporary view for SQL queries
orderStream.createOrReplaceTempView("orders")

// SQL query on the stream
val results = spark.sql("""
  SELECT
    product_id,
    window(order_time, "5 minutes"),
    COUNT(*) AS order_count,
    SUM(amount) AS total_revenue
  FROM orders
  GROUP BY product_id, window(order_time, "5 minutes")
""")

// Output the results
results.writeStream
  .outputMode("update")
  .format("console")
  .start()

Key Spark features:

  • Integration with Spark’s batch processing
  • Dataset/DataFrame API alongside SQL
  • Support for multiple output modes (append, update, complete)
  • Unified batch and streaming APIs

Google Cloud Dataflow SQL (Apache Beam)

Dataflow SQL enables SQL queries on streaming data in Google Cloud:

-- Google Cloud Dataflow SQL example
CREATE TABLE orders (
  order_id INT64,
  user_id INT64,
  product_id INT64,
  order_time TIMESTAMP,
  amount FLOAT64
)
FROM pubsub
OPTIONS (
  topic = 'projects/my-project/topics/orders',
  format = 'json'
);

-- Streaming query
SELECT
  product_id,
  TUMBLE_END(order_time, INTERVAL 5 MINUTE) AS window_end,
  COUNT(*) AS order_count,
  SUM(amount) AS total_amount
FROM orders
GROUP BY
  product_id,
  TUMBLE(order_time, INTERVAL 5 MINUTE);

Key Dataflow SQL features:

  • Fully managed service
  • Integration with Google Cloud ecosystem
  • Support for both batch and streaming use cases
  • Based on Apache Beam’s unified model

Advanced Streaming SQL Patterns

Pattern Detection (Complex Event Processing)

Detecting patterns in event sequences:

-- Pattern matching using MATCH_RECOGNIZE (Flink SQL)
SELECT *
FROM order_stream
MATCH_RECOGNIZE (
  PARTITION BY user_id
  ORDER BY order_time
  MEASURES
    A.order_time AS start_time,
    LAST(B.order_time) AS end_time,
    COUNT(B.order_id) AS order_count,
    SUM(B.amount) AS total_spent
  PATTERN (A B+)
  DEFINE
    A AS A.amount > 500,
    B AS B.amount > 100 AND B.order_time BETWEEN A.order_time AND A.order_time + INTERVAL '1' HOUR
) AS large_order_sequences;

Anomaly Detection

Real-time identification of unusual patterns:

-- Z-score based anomaly detection
SELECT
  sensor_id,
  event_time,
  temperature,
  (temperature - avg_temp) / stddev_temp AS z_score,
  CASE WHEN ABS((temperature - avg_temp) / stddev_temp) > 3 THEN 'ANOMALY' ELSE 'NORMAL' END AS status
FROM (
  SELECT
    sensor_id,
    event_time,
    temperature,
    AVG(temperature) OVER (
      PARTITION BY sensor_id
      ORDER BY event_time
      ROWS BETWEEN 100 PRECEDING AND 1 PRECEDING
    ) AS avg_temp,
    STDDEV(temperature) OVER (
      PARTITION BY sensor_id
      ORDER BY event_time
      ROWS BETWEEN 100 PRECEDING AND 1 PRECEDING
    ) AS stddev_temp
  FROM sensor_readings
) WITH_STATISTICS
WHERE ABS((temperature - avg_temp) / stddev_temp) > 3;

Streaming Joins

Joining multiple streams or streams with static tables:

-- Stream-to-stream join with time constraint
SELECT
  o.order_id,
  o.user_id,
  o.amount AS order_amount,
  p.payment_id,
  p.payment_method,
  p.amount AS paid_amount,
  p.payment_time - o.order_time AS processing_time
FROM orders o
JOIN payments p
  ON o.order_id = p.order_id
  AND p.payment_time BETWEEN o.order_time AND o.order_time + INTERVAL '15' MINUTES;
-- Stream-to-table join (enrichment)
SELECT
  o.order_id,
  o.product_id,
  p.product_name,
  p.category,
  p.unit_price,
  o.quantity,
  o.quantity * p.unit_price AS expected_total
FROM order_stream o
JOIN product_dimension p
  ON o.product_id = p.product_id;

Sessionization

Grouping user activities into sessions:

-- Session window analytics
SELECT
  user_id,
  SESSION_START(event_time, INTERVAL '30' MINUTES) AS session_start,
  SESSION_END(event_time, INTERVAL '30' MINUTES) AS session_end,
  COUNT(*) AS event_count,
  COUNT(DISTINCT page_id) AS pages_visited,
  MAX(CASE WHEN event_type = 'purchase' THEN 1 ELSE 0 END) AS converted
FROM user_activity_stream
GROUP BY
  user_id,
  SESSION(event_time, INTERVAL '30' MINUTES);

Implementing Streaming SQL: Architectural Patterns

Lambda Architecture

Combines batch and stream processing:

┌─────────────────┐
│                 │
│   Data Source   │
│                 │
└───────┬─────────┘
        │
        ▼
┌───────┴─────────┐
│                 │
│ Message Broker  │
│  (e.g., Kafka)  │
│                 │
└───────┬─────────┘
        │
        ├────────────────────┐
        │                    │
        ▼                    ▼
┌───────────────┐    ┌───────────────┐
│               │    │               │
│ Speed Layer   │    │ Batch Layer   │
│ (Streaming)   │    │               │
│               │    │               │
└───────┬───────┘    └───────┬───────┘
        │                    │
        ▼                    ▼
┌───────────────┐    ┌───────────────┐
│               │    │               │
│ Real-time     │    │ Batch         │
│ Views         │    │ Views         │
│               │    │               │
└───────┬───────┘    └───────┬───────┘
        │                    │
        ▼                    ▼
┌─────────────────────────────────────┐
│                                     │
│         Serving Layer               │
│     (Merges Both Views)             │
│                                     │
└─────────────────────────────────────┘

Implementation considerations:

  • Stream processing provides real-time but potentially approximate results
  • Batch processing provides exact but delayed results
  • Results are reconciled at query time

Kappa Architecture

Uses only the streaming layer:

┌─────────────────┐
│                 │
│   Data Source   │
│                 │
└───────┬─────────┘
        │
        ▼
┌───────┴─────────┐
│                 │
│ Message Broker  │
│  (e.g., Kafka)  │
│                 │
└───────┬─────────┘
        │
        ▼
┌───────────────────┐
│                   │
│  Stream           │
│  Processing       │
│  Engine           │
│                   │
└───────┬───────────┘
        │
        ▼
┌───────────────────┐
│                   │
│  Serving          │
│  Database         │
│                   │
└───────────────────┘

Implementation considerations:

  • Simpler architecture with a single processing path
  • Requires a streaming system capable of handling reprocessing
  • Uses the message broker (e.g., Kafka) as the system of record

Event-Sourcing with CQRS

Separates write and read operations:

┌─────────────────┐
│                 │
│  Client Apps    │
│                 │
└───────┬─────────┘
        │
        ▼
┌───────┴─────────┐     Commands     ┌─────────────────┐
│                 │                  │                 │
│  API Gateway    │ ───────────────► │ Command         │
│                 │                  │ Handlers        │
└───────┬─────────┘                  └────────┬────────┘
        │                                     │
        │                                     │
        │                                     ▼
        │                            ┌─────────────────┐
        │                            │                 │
        │                            │ Event Store     │
        │                            │                 │
        │                            └────────┬────────┘
        │                                     │
        │                                     │
        │                                     ▼
        │                            ┌─────────────────┐
        │      Queries               │                 │
        └────────────────────────────┤ Event Processor │
                                     │ (Stream SQL)    │
                                     └────────┬────────┘
                                              │
                                              ▼
                                     ┌─────────────────┐
                                     │                 │
                                     │ Read Models     │
                                     │                 │
                                     └─────────────────┘

Implementation considerations:

  • Events are the source of truth
  • Streaming SQL transforms events into optimized read models
  • Read models can be specialized for different query patterns

Performance Optimization and Tuning

Partitioning Strategies

Proper partitioning is crucial for scalable stream processing:

-- Kafka topic creation with partitioning
CREATE TABLE orders (
  order_id BIGINT,
  user_id BIGINT,
  region STRING,
  product_id BIGINT,
  amount DECIMAL(10, 2),
  order_time TIMESTAMP(3),
  WATERMARK FOR order_time AS order_time - INTERVAL '10' SECONDS
) WITH (
  'connector' = 'kafka',
  'topic' = 'orders',
  'properties.bootstrap.servers' = 'kafka:9092',
  'format' = 'json',
  'key.format' = 'json',
  'key.fields' = 'region',  -- Partition by region
  'scan.startup.mode' = 'latest-offset'
);

Considerations for choosing partition keys:

  • Choose keys that evenly distribute data
  • Align with common query patterns (e.g., group by clauses)
  • Consider cardinality (too high causes excessive partitions)

State Management

Optimizing state backends for streaming SQL:

// Flink state backend configuration
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// For small state: in-memory with checkpoints to filesystem
env.setStateBackend(new HashMapStateBackend());
env.getCheckpointConfig().setCheckpointStorage("hdfs:///checkpoints/");

// For large state: RocksDB (out-of-core processing)
env.setStateBackend(new EmbeddedRocksDBStateBackend());
env.getCheckpointConfig().setCheckpointStorage("hdfs:///checkpoints/");

State optimization strategies:

  • Use appropriate state backend for workload size
  • Configure state TTL (time-to-live) for cleanup
  • Monitor state size growth
  • Consider local state vs. external state

Query Optimization

Streaming SQL engines apply various optimizations:

-- Query with potential optimizations
EXPLAIN PLAN FOR
SELECT
  product_category,
  COUNT(*) AS order_count,
  SUM(amount) AS revenue
FROM orders o
JOIN products p ON o.product_id = p.product_id
WHERE o.order_time > CURRENT_TIMESTAMP - INTERVAL '1' HOUR
GROUP BY product_category;

Common optimization techniques:

  • Filter pushdown
  • Join reordering
  • Predicate simplification
  • Window merging

Monitoring and Observability

Metrics to monitor for streaming SQL:

  • Processing latency: Time from event creation to processing
  • Watermark lag: How far behind the watermark is from wall-clock time
  • Checkpoint duration: Time taken to complete checkpoints
  • Backpressure: Indication that processing can’t keep up with input rate

Implementing custom metrics:

// Flink custom metrics example
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<Order> orders = ...
orders.map(new RichMapFunction<Order, Order>() {
    private transient Counter lateEventCounter;

    @Override
    public void open(Configuration config) {
        lateEventCounter = getRuntimeContext()
            .getMetricGroup()
            .counter("late-events");
    }

    @Override
    public Order map(Order order) throws Exception {
        if (order.getEventTime().before(currentWatermark())) {
            lateEventCounter.inc();
        }
        return order;
    }
});

Case Studies: Streaming SQL in Action

Real-Time Fraud Detection

A financial services company implemented streaming SQL for fraud detection:

-- Fraud detection using pattern matching
SELECT
  account_id,
  detection_time,
  COLLECT_LIST(transaction_id) AS suspicious_transactions,
  SUM(amount) AS total_amount
FROM transaction_stream
MATCH_RECOGNIZE (
  PARTITION BY account_id
  ORDER BY transaction_time
  MEASURES
    LAST(transaction_time) AS detection_time,
    RUNNING LAST(transaction_id) AS transaction_id
  PATTERN (small+ large)
  DEFINE
    small AS amount < 50,
    large AS amount > 1000 AND
             transaction_time < LAST(small.transaction_time) + INTERVAL '1' HOUR
) AS fraudulent_pattern;

Results:

  • 92% reduction in false positive alerts
  • Fraudulent transactions identified 2.5x faster
  • $4.2M in prevented fraud in first quarter

IoT Monitoring and Alerting

A manufacturing company deployed streaming SQL for equipment monitoring:

-- Equipment monitoring with dynamic thresholds
SELECT
  equipment_id,
  sensor_type,
  reading_time,
  reading_value,
  avg_value,
  stddev_value,
  (reading_value - avg_value) / NULLIF(stddev_value, 0) AS z_score
FROM (
  SELECT
    equipment_id,
    sensor_type,
    reading_time,
    reading_value,
    AVG(reading_value) OVER (
      PARTITION BY equipment_id, sensor_type
      ORDER BY reading_time
      ROWS BETWEEN 720 PRECEDING AND 1 PRECEDING -- 1 hour of readings at 5-second intervals
    ) AS avg_value,
    STDDEV(reading_value) OVER (
      PARTITION BY equipment_id, sensor_type
      ORDER BY reading_time
      ROWS BETWEEN 720 PRECEDING AND 1 PRECEDING
    ) AS stddev_value
  FROM sensor_readings
) AS readings_with_stats
WHERE ABS((reading_value - avg_value) / NULLIF(stddev_value, 0)) > 3.5;

Results:

  • Equipment failures predicted with 86% accuracy
  • Maintenance costs reduced by 27%
  • Unplanned downtime decreased by 31%

E-commerce Personalization

An e-commerce platform implemented real-time personalization:

-- Real-time user behavioral analysis
SELECT
  user_id,
  TUMBLE_END(event_time, INTERVAL '15' MINUTE) AS window_end,
  COLLECT_LIST(DISTINCT category_id) AS browsed_categories,
  COLLECT_LIST(DISTINCT product_id) AS viewed_products,
  SUM(CASE WHEN event_type = 'view_product' THEN 1 ELSE 0 END) AS product_views,
  SUM(CASE WHEN event_type = 'add_to_cart' THEN 1 ELSE 0 END) AS cart_adds,
  MAX(CASE WHEN event_type = 'purchase' THEN 1 ELSE 0 END) AS purchased
FROM user_activity_stream
GROUP BY
  user_id,
  TUMBLE(event_time, INTERVAL '15' MINUTE);

Results:

  • 34% increase in click-through rate
  • 22% improvement in cart conversion
  • 18% higher average order value

Unified Batch and Streaming

The distinction between batch and streaming is blurring:

-- Unified query that works on both batch and stream
SELECT
  product_id,
  window_start,
  window_end,
  COUNT(*) AS order_count,
  SUM(amount) AS total_amount
FROM TABLE(
  TUMBLE(
    TABLE orders,
    DESCRIPTOR(order_time),
    INTERVAL '1' HOUR
  )
)
GROUP BY product_id, window_start, window_end;

Machine Learning Integration

Streaming SQL is being integrated with ML:

-- Real-time feature generation for ML models
CREATE VIEW order_features AS
SELECT
  user_id,
  COUNT(*) OVER (
    PARTITION BY user_id
    ORDER BY order_time
    RANGE BETWEEN INTERVAL '90' DAY PRECEDING AND CURRENT ROW
  ) AS orders_90d,
  SUM(amount) OVER (
    PARTITION BY user_id
    ORDER BY order_time
    RANGE BETWEEN INTERVAL '90' DAY PRECEDING AND CURRENT ROW
  ) AS spend_90d,
  COUNT(DISTINCT product_category) OVER (
    PARTITION BY user_id
    ORDER BY order_time
    RANGE BETWEEN INTERVAL '90' DAY PRECEDING AND CURRENT ROW
  ) AS category_count_90d
FROM orders;

-- Using ML functions inline
SELECT
  user_id,
  TUMBLE_END(event_time, INTERVAL '1' DAY) AS day,
  ML_PREDICT(model_uri, ARRAY[
    orders_90d,
    spend_90d,
    category_count_90d,
    return_rate
  ]) AS churn_probability
FROM order_features;

Serverless Stream Processing

Cloud providers are offering serverless streaming SQL:

-- Example of serverless streaming SQL (conceptual)
CREATE OR REPLACE STREAMING QUERY churn_prediction
USING SOURCE orders_topic (FORMAT 'JSON')
WITH RETENTION '7 DAYS'
EXECUTE EVERY '5 MINUTES'
AS
SELECT
  user_id,
  AVG(amount) AS avg_order_value,
  COUNT(*) AS order_count,
  ML_PREDICT('gs://models/churn_model', STRUCT(
    AVG(amount),
    COUNT(*),
    MAX(order_time) - MIN(order_time)
  )) AS churn_probability
FROM orders
WHERE order_time >= CURRENT_TIMESTAMP() - INTERVAL '90' DAY
GROUP BY user_id
HAVING churn_probability > 0.7;

Decision Rules

Use this checklist for streaming SQL decisions:

  1. If you need sub-second latency, streaming SQL may be overkill; simpler event processing often suffices
  2. If event time ordering matters, implement watermarks and handle late data explicitly
  3. If state grows unbounded, plan for state TTL and periodic compaction
  4. If you need exactly-once semantics, expect higher infrastructure costs
  5. If batch and streaming need to match, test output equivalence on representative data

Streaming SQL adds operational complexity. Only use it when real-time is actually required.

Ready to Implement These AI Data Engineering Solutions?

Get a comprehensive AI Readiness Assessment to determine the best approach for your organization's data infrastructure and AI implementation needs.

Similar Articles

Streaming Data Processing for Fraud Detection
Streaming Data Processing for Fraud Detection
03 Apr, 2024 | 02 Mins read

Fraud detection requires analyzing events as they happen. Batch processing that examines data hours after transactions cannot prevent fraud. Streaming data processing analyzes events in real-time, ena

Embedded Analytics Architecture Patterns
Embedded Analytics Architecture Patterns
05 Oct, 2024 | 04 Mins read

Embedded analytics integrates analytical capabilities directly into operational applications. Users access insights within the applications they already use daily, rather than switching to separate bu