Topic formats & payload requirements
Information on available kafka topics, formats and payloads, with examples.
All payloads must be valid JSON. Each project has two topics, each serving a distinct purpose.
Events topic
Use this topic to record events and actions a user performs (e.g. bets, deposits, logins).
- Topic name: Defined as part of the setup
- Message key: Not required, but can be set to a unique event UUID. Xtremepush will deduplicate events within a configurable time window based on this key. To enable deduplication for your project, contact Xtremepush support.
- Message headers: None
- Message body: JSON encoded item
| Property | Type | Required | Description |
|---|---|---|---|
event | String | Yes | The name of the event, e.g. bet, deposit. |
user_id | String | At least one user identifier required | Unique user ID. A new user will be created automatically if none exists. |
customer_id | String | At least one user identifier require | Additional user identifier if available. |
profile_id | String | At least one user identifier require | Xtremepush profile identifier. |
device_id | String | At least one user identifier require | Xtremepush device identifier. |
user_attributes | Object | No | Additional user information. Attributes are added to the user profile automatically during event processing. |
value | Object | No | Event properties specific to the event type. Can contain any number of key-value pairs, including nested arrays or objects. |
timestamp | String | Yes | The original timestamp of the event. |
Schema
{
"event": "some_event",
"user_id": "some_user",
"user_attributes": {
"any_attr": "any_value"
},
"value": {
"any_key": "any_value"
},
"timestamp": "2025-01-01 12:00:00"
}Example
{
"event": "bet",
"user_id": "3e685367-07d5-4d48-93ae-f007ac336605",
"customer_id": 12345,
"user_attributes": {
"customer_tier": "VIP"
},
"value": {
"bet_id": "3e685367-07d5-4d48-93ae-f007ac336605",
"odds": 12.2,
"stake": 100.0
},
"timestamp": "2024-09-01 12:00:00.123"
}Users topic
Use this topic to create new users or update existing user profiles.
Important — sequencing
Profile messages are processed sequentially and large imports may take significant time to complete.
- Topic name: Defined as part of the setup
- Message key: Recommended. Should be the user identifier (e.g.
user_id). Used to maintain ordering within the Kafka stream — all messages with the same key are routed to the same partition, ensuring profile updates are applied in the correct sequence. - Message headers: None
- Message body: JSON encoded item
| Property | Type | Required | Description |
|---|---|---|---|
| user_id | String | Yes | Unique user ID. A new user will be created automatically if none exists. |
| user_attributes | Object | Yes | Information about the user. Attributes are added to the user profile automatically during processing. |
| customer_id | String | No | Additional user identifier if available. |
| timestamp | String | No | The timestamp of the attribute change. Used to ensure the most recent value is saved. Will default to the timestamp of a message in Kafka if absent. |
Schema
{
"user_id": "some_user",
"user_attributes": {
"any_attr": "any_value"
},
"timestamp": "2025-01-01 12:00:00"
}Example
{
"user_id": "3e685367-07d5-4d48-93ae-f007ac336605",
"customer_id": 12345,
"user_attributes": {
"customer_tier": "VIP",
"email": "[email protected]"
},
"timestamp": "2024-09-01 12:00:00.123"
}Transforming your events to the Xtremepush format
If your events are produced in a flat, client-specific schema, you will need to transform them into the Xtremepush standard envelope format before (or as) they are published to the events topic.
Key principle: The transformation is structural only. You wrap your existing event properties into the standard value: {} object and add the outer envelope fields (event, user_id, timestamp, user_attributes). Your inner property key names (e.g. AmountWagered, BetId) are preserved exactly as-is — you do not need to rename them.
Before and after
Before (client's raw flat Kafka message):
{
"event_type": "bet_placed",
"user_id": "usr_123",
"timestamp": "2025-01-15T14:23:45.000Z",
"AmountWagered": 50.00,
"BetId": "bet_abc_789",
"SportName": "Football"
}After (Xtremepush standard envelope):
{
"event": "bet_placed",
"user_id": "usr_123",
"timestamp": "2025-01-15 14:23:45",
"value": {
"AmountWagered": 50.00,
"BetId": "bet_abc_789",
"SportName": "Football"
},
"user_attributes": {}
}AmountWagered, BetId, and SportName are unchanged — only their structural location shifts from top-level to inside value.
Confluent Cloud (ksqlDB)
Step 1 — Declare the source stream (matching your existing topic schema):
CREATE STREAM client_events_raw (
event_type VARCHAR,
user_id VARCHAR,
`timestamp` VARCHAR,
AmountWagered DOUBLE,
BetId VARCHAR,
SportName VARCHAR
) WITH (
KAFKA_TOPIC = 'client-raw-events',
VALUE_FORMAT = 'JSON'
);Step 2 — Transform to Xtremepush envelope (inner keys preserved):
CREATE STREAM xtremepush_events
WITH (KAFKA_TOPIC = 'xp-events', VALUE_FORMAT = 'JSON')
AS SELECT
event_type AS event,
user_id,
REGEXP_REPLACE(`timestamp`, 'T', ' ') AS `timestamp`,
STRUCT(
AmountWagered := AmountWagered,
BetId := BetId,
SportName := SportName
) AS value,
STRUCT() AS user_attributes
FROM client_events_raw
EMIT CHANGES;STRUCT() builds the nested value object with the client's original property names. If your events have a variable or large set of properties, consider a JSON pass-through approach using AS_VALUE().
Amazon MSK (Kafka Streams — Java)
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import com.fasterxml.jackson.databind.*;
import com.fasterxml.jackson.databind.node.*;
import java.util.*;
import java.util.regex.*;
ObjectMapper mapper = new ObjectMapper();
Set<String> ENVELOPE_KEYS = Set.of("event_type", "user_id", "timestamp");
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> rawStream = builder.stream("client-raw-events");
KStream<String, String> transformed = rawStream.mapValues(raw -> {
try {
JsonNode src = mapper.readTree(raw);
ObjectNode envelope = mapper.createObjectNode();
envelope.put("event", src.get("event_type").asText());
envelope.put("user_id", src.get("user_id").asText());
// Convert ISO 8601 to Xtremepush timestamp format
String ts = src.get("timestamp").asText()
.replace("T", " ")
.replaceAll("\\.\\d+Z$", "");
envelope.put("timestamp", ts);
// Wrap original properties into value — key names preserved as-is
ObjectNode value = mapper.createObjectNode();
src.fields().forEachRemaining(entry -> {
if (!ENVELOPE_KEYS.contains(entry.getKey())) {
value.set(entry.getKey(), entry.getValue());
}
});
envelope.set("value", value);
envelope.set("user_attributes", mapper.createObjectNode());
return mapper.writeValueAsString(envelope);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
transformed.to("xp-events");The mapValues approach dynamically carries over all non-envelope fields into value without enumerating them explicitly. This is robust to schema evolution — new properties a client adds will automatically appear inside value.
Google Cloud Managed Apache Kafka (Apache Beam / Dataflow)
import apache_beam as beam
import json
import re
ENVELOPE_KEYS = {'event_type', 'user_id', 'timestamp'}
class EnvelopeTransform(beam.DoFn):
"""Wraps a flat client event into the Xtremepush standard envelope.
Inner property key names are preserved exactly as-is.
Only structural change: properties move from top-level into value{}.
"""
def process(self, element):
raw = json.loads(element)
# Convert ISO 8601 → Xtremepush timestamp format (YYYY-MM-DD HH:MM:SS)
ts = re.sub(r'T', ' ', raw.get('timestamp', ''))
ts = re.sub(r'\.\d+Z$', '', ts)
# Wrap all non-envelope fields into value — key names preserved as-is
value = {k: v for k, v in raw.items() if k not in ENVELOPE_KEYS}
yield json.dumps({
'event': raw.get('event_type', ''),
'user_id': raw.get('user_id', ''),
'timestamp': ts,
'value': value,
'user_attributes': {}
})
with beam.Pipeline(options=pipeline_options) as p:
(
p
| 'ReadFromKafka' >> beam.io.ReadFromKafka(
consumer_config={'bootstrap.servers': 'BROKER:9092'},
topics=['client-raw-events'])
| 'Transform' >> beam.ParDo(EnvelopeTransform())
| 'WriteToKafka' >> beam.io.WriteToKafka(
producer_config={'bootstrap.servers': 'BROKER:9092'},
topic='xp-events')
)Updated about 5 hours ago