Topic formats & payload requirements

Information on available kafka topics, formats and payloads, with examples.

All payloads must be valid JSON. Each project has two topics, each serving a distinct purpose.

Events topic

Use this topic to record events and actions a user performs (e.g. bets, deposits, logins).

  • Topic name: Defined as part of the setup
  • Message key: Not required, but can be set to a unique event UUID. Xtremepush will deduplicate events within a configurable time window based on this key. To enable deduplication for your project, contact Xtremepush support.
  • Message headers: None
  • Message body: JSON encoded item
PropertyTypeRequiredDescription
eventStringYesThe name of the event, e.g. bet, deposit.
user_idStringAt least one user identifier requiredUnique user ID. A new user will be created automatically if none exists.
customer_idStringAt least one user identifier requireAdditional user identifier if available.
profile_idStringAt least one user identifier requireXtremepush profile identifier.
device_idStringAt least one user identifier requireXtremepush device identifier.
user_attributesObjectNoAdditional user information. Attributes are added to the user profile automatically during event processing.
valueObjectNoEvent properties specific to the event type. Can contain any number of key-value pairs, including nested arrays or objects.
timestampStringYesThe original timestamp of the event.

Schema

{
  "event": "some_event",
  "user_id": "some_user",
  "user_attributes": {
    "any_attr": "any_value"
  },
  "value": {
    "any_key": "any_value"
  },
  "timestamp": "2025-01-01 12:00:00"
}

Example

{
  "event": "bet",
  "user_id": "3e685367-07d5-4d48-93ae-f007ac336605",
  "customer_id": 12345,
  "user_attributes": {
    "customer_tier": "VIP"
  },
  "value": {
    "bet_id": "3e685367-07d5-4d48-93ae-f007ac336605",
    "odds": 12.2,
    "stake": 100.0
  },
  "timestamp": "2024-09-01 12:00:00.123"
}

Users topic

Use this topic to create new users or update existing user profiles.

🚧

Important — sequencing

Profile messages are processed sequentially and large imports may take significant time to complete.

  • Topic name: Defined as part of the setup
  • Message key: Recommended. Should be the user identifier (e.g. user_id). Used to maintain ordering within the Kafka stream — all messages with the same key are routed to the same partition, ensuring profile updates are applied in the correct sequence.
  • Message headers: None
  • Message body: JSON encoded item
PropertyTypeRequiredDescription
user_idStringYesUnique user ID. A new user will be created automatically if none exists.
user_attributesObjectYesInformation about the user. Attributes are added to the user profile automatically during processing.
customer_idStringNoAdditional user identifier if available.
timestampStringNoThe timestamp of the attribute change. Used to ensure the most recent value is saved. Will default to the timestamp of a message in Kafka if absent.

Schema

{
  "user_id": "some_user",
  "user_attributes": {
    "any_attr": "any_value"
  },
  "timestamp": "2025-01-01 12:00:00"
}

Example

{
  "user_id": "3e685367-07d5-4d48-93ae-f007ac336605",
  "customer_id": 12345,
  "user_attributes": {
    "customer_tier": "VIP",
    "email": "[email protected]"
  },
  "timestamp": "2024-09-01 12:00:00.123"
}


Transforming your events to the Xtremepush format

If your events are produced in a flat, client-specific schema, you will need to transform them into the Xtremepush standard envelope format before (or as) they are published to the events topic.

Key principle: The transformation is structural only. You wrap your existing event properties into the standard value: {} object and add the outer envelope fields (event, user_id, timestamp, user_attributes). Your inner property key names (e.g. AmountWagered, BetId) are preserved exactly as-is — you do not need to rename them.

Before and after

Before (client's raw flat Kafka message):

{
  "event_type":    "bet_placed",
  "user_id":       "usr_123",
  "timestamp":     "2025-01-15T14:23:45.000Z",
  "AmountWagered": 50.00,
  "BetId":         "bet_abc_789",
  "SportName":     "Football"
}

After (Xtremepush standard envelope):

{
  "event":     "bet_placed",
  "user_id":   "usr_123",
  "timestamp": "2025-01-15 14:23:45",
  "value": {
    "AmountWagered": 50.00,
    "BetId":         "bet_abc_789",
    "SportName":     "Football"
  },
  "user_attributes": {}
}

AmountWagered, BetId, and SportName are unchanged — only their structural location shifts from top-level to inside value.


Confluent Cloud (ksqlDB)

Step 1 — Declare the source stream (matching your existing topic schema):

CREATE STREAM client_events_raw (
  event_type    VARCHAR,
  user_id       VARCHAR,
  `timestamp`   VARCHAR,
  AmountWagered DOUBLE,
  BetId         VARCHAR,
  SportName     VARCHAR
) WITH (
  KAFKA_TOPIC  = 'client-raw-events',
  VALUE_FORMAT = 'JSON'
);

Step 2 — Transform to Xtremepush envelope (inner keys preserved):

CREATE STREAM xtremepush_events
  WITH (KAFKA_TOPIC = 'xp-events', VALUE_FORMAT = 'JSON')
AS SELECT
  event_type                                         AS event,
  user_id,
  REGEXP_REPLACE(`timestamp`, 'T', ' ')             AS `timestamp`,
  STRUCT(
    AmountWagered := AmountWagered,
    BetId         := BetId,
    SportName     := SportName
  )                                                  AS value,
  STRUCT()                                           AS user_attributes
FROM client_events_raw
EMIT CHANGES;

STRUCT() builds the nested value object with the client's original property names. If your events have a variable or large set of properties, consider a JSON pass-through approach using AS_VALUE().


Amazon MSK (Kafka Streams — Java)

import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import com.fasterxml.jackson.databind.*;
import com.fasterxml.jackson.databind.node.*;
import java.util.*;
import java.util.regex.*;

ObjectMapper mapper = new ObjectMapper();
Set<String> ENVELOPE_KEYS = Set.of("event_type", "user_id", "timestamp");

StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> rawStream = builder.stream("client-raw-events");

KStream<String, String> transformed = rawStream.mapValues(raw -> {
    try {
        JsonNode src = mapper.readTree(raw);
        ObjectNode envelope = mapper.createObjectNode();

        envelope.put("event",   src.get("event_type").asText());
        envelope.put("user_id", src.get("user_id").asText());

        // Convert ISO 8601 to Xtremepush timestamp format
        String ts = src.get("timestamp").asText()
            .replace("T", " ")
            .replaceAll("\\.\\d+Z$", "");
        envelope.put("timestamp", ts);

        // Wrap original properties into value — key names preserved as-is
        ObjectNode value = mapper.createObjectNode();
        src.fields().forEachRemaining(entry -> {
            if (!ENVELOPE_KEYS.contains(entry.getKey())) {
                value.set(entry.getKey(), entry.getValue());
            }
        });
        envelope.set("value", value);
        envelope.set("user_attributes", mapper.createObjectNode());

        return mapper.writeValueAsString(envelope);
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
});

transformed.to("xp-events");

The mapValues approach dynamically carries over all non-envelope fields into value without enumerating them explicitly. This is robust to schema evolution — new properties a client adds will automatically appear inside value.


Google Cloud Managed Apache Kafka (Apache Beam / Dataflow)

import apache_beam as beam
import json
import re

ENVELOPE_KEYS = {'event_type', 'user_id', 'timestamp'}

class EnvelopeTransform(beam.DoFn):
    """Wraps a flat client event into the Xtremepush standard envelope.

    Inner property key names are preserved exactly as-is.
    Only structural change: properties move from top-level into value{}.
    """

    def process(self, element):
        raw = json.loads(element)

        # Convert ISO 8601 → Xtremepush timestamp format (YYYY-MM-DD HH:MM:SS)
        ts = re.sub(r'T', ' ', raw.get('timestamp', ''))
        ts = re.sub(r'\.\d+Z$', '', ts)

        # Wrap all non-envelope fields into value — key names preserved as-is
        value = {k: v for k, v in raw.items() if k not in ENVELOPE_KEYS}

        yield json.dumps({
            'event':           raw.get('event_type', ''),
            'user_id':         raw.get('user_id', ''),
            'timestamp':       ts,
            'value':           value,
            'user_attributes': {}
        })


with beam.Pipeline(options=pipeline_options) as p:
    (
        p
        | 'ReadFromKafka'   >> beam.io.ReadFromKafka(
                                   consumer_config={'bootstrap.servers': 'BROKER:9092'},
                                   topics=['client-raw-events'])
        | 'Transform'       >> beam.ParDo(EnvelopeTransform())
        | 'WriteToKafka'    >> beam.io.WriteToKafka(
                                   producer_config={'bootstrap.servers': 'BROKER:9092'},
                                   topic='xp-events')
    )