Aggregating high-volume events upstream (sessioning)
This page provides information on managing high-volume events upstream and implementation examples for each supported managed Kafka platform.
Types of Aggregation
Session-based aggregates
Capped by time (e.g. ≤10 minutes) or count (e.g. ≤N spins). Each aggregate should include totals for stakes, wins, net, counts, game code, channel, and start/end timestamps.
Rolling window aggregates
Typically 2–5 minute summaries grouped by user or product.
Why
EMIT FINAL/DISCARDINGaccumulation mattersSession window aggregations can fire multiple times as new events arrive within an open session. If your pipeline uses the default incremental-update behaviour, downstream systems receive multiple partial records for the same session — causing event counts to be inflated. The
EMIT FINAL(ksqlDB), closed-window-only emission (Kafka Streams), andAccumulationMode.DISCARDING(Beam) patterns each guarantee that exactly one record is emitted per completed session, which is the correct input for Xtremepush event-level analytics.
Confluent Cloud (ksqlDB)
Session aggregation
CREATE TABLE session_summaries
WITH (KAFKA_TOPIC = 'xp-session-events', VALUE_FORMAT = 'JSON')
AS SELECT
user_id,
'session' AS event,
AS_VALUE(user_id) AS user_id,
TIMESTAMPTOSTRING(MIN(ROWTIME), 'yyyy-MM-dd HH:mm:ss', 'UTC') AS `timestamp`,
STRUCT(
session_start_ms := MIN(ROWTIME),
session_end_ms := MAX(ROWTIME),
event_count := COUNT(*),
duration_ms := MAX(ROWTIME) - MIN(ROWTIME)
) AS value,
STRUCT() AS user_attributes
FROM xtremepush_events
WINDOW SESSION (10 MINUTES)
GROUP BY user_id
EMIT FINAL;EMIT FINAL is critical. Without it, ksqlDB emits incremental updates as the session is being built. EMIT FINAL ensures exactly one record is emitted per closed session (after the inactivity gap expires), keeping downstream event counts meaningful.
Rolling window (2-minute summary)
CREATE TABLE rolling_2min_summary
WITH (KAFKA_TOPIC = 'xp-rolling-2min', VALUE_FORMAT = 'JSON')
AS SELECT
user_id,
'session_summary' AS event,
AS_VALUE(user_id) AS user_id,
TIMESTAMPTOSTRING(WINDOWSTART, 'yyyy-MM-dd HH:mm:ss', 'UTC') AS `timestamp`,
STRUCT(
window_start_ms := WINDOWSTART,
window_end_ms := WINDOWEND,
event_count := COUNT(*)
) AS value,
STRUCT() AS user_attributes
FROM xtremepush_events
WINDOW TUMBLING (SIZE 2 MINUTES)
GROUP BY user_id
EMIT FINAL;Amazon MSK (Kafka Streams — Java)
Session aggregation
import org.apache.kafka.streams.kstream.SessionWindows;
import java.time.Duration;
KTable<Windowed<String>, Long> sessions = rawStream
.groupByKey()
.windowedBy(
SessionWindows.ofInactivityGapAndGrace(
Duration.ofMinutes(10), // inactivity gap — closes session after 10min silence
Duration.ofMinutes(2) // grace period — accept late-arriving events
)
)
.aggregate(
() -> 0L, // initialiser
(key, value, aggregate) -> aggregate + 1, // incrementor
(key, agg1, agg2) -> agg1 + agg2 // merger (called when two sessions merge)
);
sessions.toStream().map((windowed, count) -> {
try {
ObjectNode envelope = mapper.createObjectNode();
envelope.put("event", "session");
envelope.put("user_id", windowed.key());
String ts = Instant.ofEpochMilli(windowed.window().end())
.toString().replace("T", " ").replaceAll("\\.\\d+Z$", "");
envelope.put("timestamp", ts);
ObjectNode value = mapper.createObjectNode();
value.put("session_start_ms", windowed.window().start());
value.put("session_end_ms", windowed.window().end());
value.put("event_count", count);
value.put("duration_ms", windowed.window().end() - windowed.window().start());
envelope.set("value", value);
envelope.set("user_attributes", mapper.createObjectNode());
return KeyValue.pair(windowed.key(), mapper.writeValueAsString(envelope));
} catch (Exception e) {
throw new RuntimeException(e);
}
}).to("xp-session-events");The merger function (key, agg1, agg2) -> agg1 + agg2 is required for session windows. Kafka Streams may merge two adjacent session windows when a late-arriving event bridges the gap between them — the merger combines the counts from both windows.
Rolling window (2-minute summary)
import org.apache.kafka.streams.kstream.TimeWindows;
KTable<Windowed<String>, Long> rolling = rawStream
.groupByKey()
.windowedBy(
TimeWindows.ofSizeAndGrace(Duration.ofMinutes(2), Duration.ofMinutes(1))
)
.count();Google Cloud Managed Apache Kafka (Apache Beam / Dataflow)
Session aggregation
import apache_beam as beam
from apache_beam.transforms.window import Sessions
from apache_beam.transforms.trigger import AfterWatermark, AccumulationMode
import json
from datetime import datetime, timezone
class SessionOutputFn(beam.DoFn):
def process(self, element, window=beam.DoFn.WindowParam):
user_id, events = element
ts = datetime.fromtimestamp(window.end, tz=timezone.utc).strftime('%Y-%m-%d %H:%M:%S')
yield json.dumps({
'event': 'session',
'user_id': user_id,
'timestamp': ts,
'value': {
'session_start_ms': int(window.start * 1000),
'session_end_ms': int(window.end * 1000),
'event_count': len(list(events)),
'duration_ms': int((window.end - window.start) * 1000),
},
'user_attributes': {}
})
(
events_pcoll
| 'ExtractUserId' >> beam.Map(lambda e: (json.loads(e)['user_id'], e))
| 'SessionWindow' >> beam.WindowInto(
Sessions(gap_size=600), # 10-minute inactivity gap
trigger=AfterWatermark(),
accumulation_mode=AccumulationMode.DISCARDING)
| 'GroupByKey' >> beam.GroupByKey()
| 'BuildSession' >> beam.ParDo(SessionOutputFn())
| 'WriteToKafka' >> beam.io.WriteToKafka(
producer_config={'bootstrap.servers': 'BROKER:9092'},
topic='xp-session-events')
)AccumulationMode.DISCARDING is critical. Beam windows fire when the watermark passes the end of the window. With DISCARDING, each pane discards previously accumulated data after firing — so exactly one complete session record is emitted per closed session. Without this (ACCUMULATING mode), you would receive incremental partial-session updates that would overcount events downstream.
Rolling window (2-minute summary)
from apache_beam.transforms.window import FixedWindows
from apache_beam.transforms.trigger import AfterWatermark, AccumulationMode
(
events_pcoll
| 'KeyByUser' >> beam.Map(lambda e: (json.loads(e)['user_id'], 1))
| 'FixedWindow' >> beam.WindowInto(
FixedWindows(120), # 2-minute fixed windows
trigger=AfterWatermark(),
accumulation_mode=AccumulationMode.DISCARDING)
| 'CountPerUser' >> beam.CombinePerKey(sum)
| 'BuildSummary' >> beam.Map(lambda kv: json.dumps({
'event': 'session_summary',
'user_id': kv[0],
'timestamp': datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S'),
'value': {'event_count': kv[1]},
'user_attributes': {}
}))
| 'WriteToKafka' >> beam.io.WriteToKafka(...)
)Updated about 5 hours ago