Aggregating high-volume events upstream (sessioning)

This page provides information on managing high-volume events upstream and implementation examples for each supported managed Kafka platform.

Types of Aggregation

Session-based aggregates

Capped by time (e.g. ≤10 minutes) or count (e.g. ≤N spins). Each aggregate should include totals for stakes, wins, net, counts, game code, channel, and start/end timestamps.

Rolling window aggregates

Typically 2–5 minute summaries grouped by user or product.


📘

Why EMIT FINAL / DISCARDING accumulation matters

Session window aggregations can fire multiple times as new events arrive within an open session. If your pipeline uses the default incremental-update behaviour, downstream systems receive multiple partial records for the same session — causing event counts to be inflated. The EMIT FINAL (ksqlDB), closed-window-only emission (Kafka Streams), and AccumulationMode.DISCARDING (Beam) patterns each guarantee that exactly one record is emitted per completed session, which is the correct input for Xtremepush event-level analytics.


Confluent Cloud (ksqlDB)

Session aggregation

CREATE TABLE session_summaries
  WITH (KAFKA_TOPIC = 'xp-session-events', VALUE_FORMAT = 'JSON')
AS SELECT
  user_id,
  'session'                                          AS event,
  AS_VALUE(user_id)                                  AS user_id,
  TIMESTAMPTOSTRING(MIN(ROWTIME), 'yyyy-MM-dd HH:mm:ss', 'UTC') AS `timestamp`,
  STRUCT(
    session_start_ms := MIN(ROWTIME),
    session_end_ms   := MAX(ROWTIME),
    event_count      := COUNT(*),
    duration_ms      := MAX(ROWTIME) - MIN(ROWTIME)
  )                                                  AS value,
  STRUCT()                                           AS user_attributes
FROM xtremepush_events
WINDOW SESSION (10 MINUTES)
GROUP BY user_id
EMIT FINAL;

EMIT FINAL is critical. Without it, ksqlDB emits incremental updates as the session is being built. EMIT FINAL ensures exactly one record is emitted per closed session (after the inactivity gap expires), keeping downstream event counts meaningful.

Rolling window (2-minute summary)

CREATE TABLE rolling_2min_summary
  WITH (KAFKA_TOPIC = 'xp-rolling-2min', VALUE_FORMAT = 'JSON')
AS SELECT
  user_id,
  'session_summary'                                  AS event,
  AS_VALUE(user_id)                                  AS user_id,
  TIMESTAMPTOSTRING(WINDOWSTART, 'yyyy-MM-dd HH:mm:ss', 'UTC') AS `timestamp`,
  STRUCT(
    window_start_ms := WINDOWSTART,
    window_end_ms   := WINDOWEND,
    event_count     := COUNT(*)
  )                                                  AS value,
  STRUCT()                                           AS user_attributes
FROM xtremepush_events
WINDOW TUMBLING (SIZE 2 MINUTES)
GROUP BY user_id
EMIT FINAL;

Amazon MSK (Kafka Streams — Java)

Session aggregation

import org.apache.kafka.streams.kstream.SessionWindows;
import java.time.Duration;

KTable<Windowed<String>, Long> sessions = rawStream
    .groupByKey()
    .windowedBy(
        SessionWindows.ofInactivityGapAndGrace(
            Duration.ofMinutes(10),   // inactivity gap — closes session after 10min silence
            Duration.ofMinutes(2)     // grace period — accept late-arriving events
        )
    )
    .aggregate(
        () -> 0L,                                      // initialiser
        (key, value, aggregate) -> aggregate + 1,      // incrementor
        (key, agg1, agg2) -> agg1 + agg2              // merger (called when two sessions merge)
    );

sessions.toStream().map((windowed, count) -> {
    try {
        ObjectNode envelope = mapper.createObjectNode();
        envelope.put("event",   "session");
        envelope.put("user_id", windowed.key());

        String ts = Instant.ofEpochMilli(windowed.window().end())
            .toString().replace("T", " ").replaceAll("\\.\\d+Z$", "");
        envelope.put("timestamp", ts);

        ObjectNode value = mapper.createObjectNode();
        value.put("session_start_ms", windowed.window().start());
        value.put("session_end_ms",   windowed.window().end());
        value.put("event_count",      count);
        value.put("duration_ms",      windowed.window().end() - windowed.window().start());
        envelope.set("value", value);
        envelope.set("user_attributes", mapper.createObjectNode());

        return KeyValue.pair(windowed.key(), mapper.writeValueAsString(envelope));
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
}).to("xp-session-events");

The merger function (key, agg1, agg2) -> agg1 + agg2 is required for session windows. Kafka Streams may merge two adjacent session windows when a late-arriving event bridges the gap between them — the merger combines the counts from both windows.

Rolling window (2-minute summary)

import org.apache.kafka.streams.kstream.TimeWindows;

KTable<Windowed<String>, Long> rolling = rawStream
    .groupByKey()
    .windowedBy(
        TimeWindows.ofSizeAndGrace(Duration.ofMinutes(2), Duration.ofMinutes(1))
    )
    .count();

Google Cloud Managed Apache Kafka (Apache Beam / Dataflow)

Session aggregation

import apache_beam as beam
from apache_beam.transforms.window import Sessions
from apache_beam.transforms.trigger import AfterWatermark, AccumulationMode
import json
from datetime import datetime, timezone

class SessionOutputFn(beam.DoFn):
    def process(self, element, window=beam.DoFn.WindowParam):
        user_id, events = element
        ts = datetime.fromtimestamp(window.end, tz=timezone.utc).strftime('%Y-%m-%d %H:%M:%S')
        yield json.dumps({
            'event':   'session',
            'user_id': user_id,
            'timestamp': ts,
            'value': {
                'session_start_ms': int(window.start * 1000),
                'session_end_ms':   int(window.end   * 1000),
                'event_count':      len(list(events)),
                'duration_ms':      int((window.end - window.start) * 1000),
            },
            'user_attributes': {}
        })


(
    events_pcoll
    | 'ExtractUserId'  >> beam.Map(lambda e: (json.loads(e)['user_id'], e))
    | 'SessionWindow'  >> beam.WindowInto(
                              Sessions(gap_size=600),        # 10-minute inactivity gap
                              trigger=AfterWatermark(),
                              accumulation_mode=AccumulationMode.DISCARDING)
    | 'GroupByKey'     >> beam.GroupByKey()
    | 'BuildSession'   >> beam.ParDo(SessionOutputFn())
    | 'WriteToKafka'   >> beam.io.WriteToKafka(
                              producer_config={'bootstrap.servers': 'BROKER:9092'},
                              topic='xp-session-events')
)

AccumulationMode.DISCARDING is critical. Beam windows fire when the watermark passes the end of the window. With DISCARDING, each pane discards previously accumulated data after firing — so exactly one complete session record is emitted per closed session. Without this (ACCUMULATING mode), you would receive incremental partial-session updates that would overcount events downstream.

Rolling window (2-minute summary)

from apache_beam.transforms.window import FixedWindows
from apache_beam.transforms.trigger import AfterWatermark, AccumulationMode

(
    events_pcoll
    | 'KeyByUser'     >> beam.Map(lambda e: (json.loads(e)['user_id'], 1))
    | 'FixedWindow'   >> beam.WindowInto(
                             FixedWindows(120),              # 2-minute fixed windows
                             trigger=AfterWatermark(),
                             accumulation_mode=AccumulationMode.DISCARDING)
    | 'CountPerUser'  >> beam.CombinePerKey(sum)
    | 'BuildSummary'  >> beam.Map(lambda kv: json.dumps({
                             'event':   'session_summary',
                             'user_id': kv[0],
                             'timestamp': datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S'),
                             'value':   {'event_count': kv[1]},
                             'user_attributes': {}
                         }))
    | 'WriteToKafka'  >> beam.io.WriteToKafka(...)
)