Telemetry ingress
Consume real-time telemetry and 1-minute meter readings forwarded by Voke from plant MQTT to your AMQP queue.
Overview
Telemetry flows from plant to partner: a PLC publishes snapshots on the MQTT topic cpi/{plantId}/telemetry, Voke's SubDeviceTelemetryService decodes and stores the signals using the plant's DeviceTemplate, and VCP forwarders publish partner-facing payloads to vcp.{slug}.event.telemetry.
Two distinct payload shapes arrive on the same queue:
CanonicalTelemetry— real-time snapshot, published on routing key{slug}.event.telemetry.realtime.{siteId}.MeterReadingPayload— 1-minute meter register snapshot, published on routing key{slug}.event.telemetry.meter.{siteId}.
Both arrive on the default queue (vcp.{slug}.event.telemetry), which is bound with a # wildcard so it receives every site. Distinguish the two shapes by the routing-key type segment (.realtime. vs .meter. — match with contains/startsWith, not an exact suffix, since the key now ends with the site UUID), or by payload shape (readingAt + meters[] means meter reading). To receive one PLC instead of the firehose, bind your own queue to a specific siteId — see Subscribe to one PLC.
Payload shapes
CanonicalTelemetry — real-time snapshot
Published by VcpAmqpService.publishTelemetry. Fired on every verified plant telemetry event.
interface CanonicalTelemetry {
gridPowerKw: number | null; // Net grid power; positive = import
fvePowerKw: number | null; // Solar / FVE generation
batteryPowerKw: number | null; // Battery power; positive = charging
consumptionPowerKw: number | null; // Total site consumption
socPercent: number | null; // Battery state of charge (0–100)
availableBatteryEnergyKwh: number | null; // Available battery energy
batteryTemperatureCelsius: number | null; // Battery temperature
currentOperatingMode: OperatingMode; // Active control mode
dataQuality: 'GOOD' | 'INTERPOLATED' | 'STALE' | 'MISSING';
devices?: DeviceTelemetry[]; // Per-device breakdown (optional)
}| Field | Description |
|---|---|
gridPowerKw | Net grid power. Positive values indicate grid import; negative indicate export. |
fvePowerKw | FVE / solar generation in kW. |
batteryPowerKw | Battery power. Positive = charging; negative = discharging. |
consumptionPowerKw | Total site load in kW. |
socPercent | Battery state of charge as a percentage. null if no battery or data unavailable. |
availableBatteryEnergyKwh | Energy the battery can currently deliver. |
batteryTemperatureCelsius | Battery temperature in °C. |
currentOperatingMode | The active control strategy — see OperatingMode enum below. |
dataQuality | GOOD = fresh measurement; INTERPOLATED = estimated; STALE = last-known; MISSING = no data available. |
devices | Per-asset breakdown when the plant has sub-devices. See DeviceTelemetry below. |
OperatingMode values:
enum OperatingMode {
STANDARD = 'STANDARD',
ZERO_EXPORT = 'ZERO_EXPORT',
MAX_EXPORT = 'MAX_EXPORT',
PEAK_SHAVING = 'PEAK_SHAVING',
LOCAL_OPTIMIZATION = 'LOCAL_OPTIMIZATION',
GRID_TARGET = 'GRID_TARGET',
LDS_SUPPORT = 'LDS_SUPPORT',
}DeviceTelemetry — per-device breakdown
Present in CanonicalTelemetry.devices[] when the plant has sub-devices declared in its DeviceTemplate.
interface DeviceTelemetry {
deviceId: string; // Sub-device externalId (e.g. 'B1', 'S2')
assetType: AssetType; // Asset class
powerKw: number; // Device power output / input in kW
socPercent?: number; // Battery SOC (batteries only)
temperatureCelsius?: number; // Device temperature
availableCapacityKwh?: number; // Available energy capacity (batteries)
regulationPercent?: number; // Regulation setpoint tracking accuracy (%)
}deviceId maps to the sub-device's operator-assigned externalId on the Voke plant, not the Voke UUID.
MeterReadingPayload — 1-minute meter registers
Published by VcpAmqpService.publishMeterReading once per minute for VCP-enabled plants that have meter register signals.
type MeterRole = 'GRID' | 'FVE' | 'BESS' | 'CONSUMPTION';
interface MeterEntry {
deviceId: string; // Sub-device externalId — same id as DeviceTelemetry.deviceId
role: MeterRole;
importRegisterKwh?: number;
exportRegisterKwh?: number;
productionRegisterKwh?: number;
chargeRegisterKwh?: number;
dischargeRegisterKwh?: number;
consumptionRegisterKwh?: number;
}
interface MeterReadingPayload {
readingAt: string; // ISO 8601 UTC
eanCons?: string | null; // Consumption metering-point EAN (18 digits)
eanProd?: string | null; // Production metering-point EAN (18 digits)
meters: MeterEntry[];
dataQuality: 'GOOD' | 'DEGRADED' | 'ESTIMATED';
}| Field | Description |
|---|---|
readingAt | Clock-aligned minute timestamp. |
eanCons / eanProd | The site's metering-point EANs, when configured. Match readings to your own EAN-keyed model without a mapping table — see Addressing sites by EAN. |
meters | Absolute meter-register values keyed by sub-device externalId (deviceId). |
dataQuality | GOOD, DEGRADED, or ESTIMATED. |
importRegisterKwh / exportRegisterKwh | Grid import/export registers. |
productionRegisterKwh | FVE production register. |
chargeRegisterKwh / dischargeRegisterKwh | BESS charge/discharge registers. |
consumptionRegisterKwh | Site consumption register. |
VCP v1.1 replaced the old 15-minute IntervalTelemetry payload with MeterReadingPayload.
Consumers should not expect intervalStart, intervalEnd, or gridEnergyKwh15min.
Forwarding and filtering
VcpTelemetryForwarder forwards every decoded telemetry event to the partner queue without signal-level filtering. There is no featured-flag or signal-selection step in the ESM forwarding path — all signals present in the canonical payload are included. Partners that need a subset of signals should filter on the consumer side.
The flow for each real-time event:
- PLC publishes a snapshot to
cpi/{plantId}/telemetryover MQTT. SubDeviceTelemetryServiceverifies the HMAC, decodes sub-device signals viaDeviceTemplate, and emitstelemetry.verified.VcpTelemetryForwarderlistens fortelemetry.verified, confirms the plant is bound to a partner viaexternalPlantId(the trading-enablement gate), and confirms the org has VCP trading enabled.- The org's adapter normalises the raw data into
CanonicalTelemetryand formats it for partner delivery. VcpAmqpService.publishTelemetrywraps the payload in a VCP envelope (withsiteId= plant UUID) and publishes to{slug}.event.telemetry.realtime.{siteId}on thevcpexchange.MeterReadingForwarderqueries meter register signals once per minute and publishes{slug}.event.telemetry.meter.{siteId}.
Plants without a configured externalPlantId are silently skipped; their telemetry is stored in Voke but not forwarded to the partner. (externalPlantId is the trading-binding flag, not the wire identifier.)
Subscribing to telemetry
The queue vcp.{slug}.event.telemetry is asserted and bound by Voke when trading is enabled. Your consumer connects with orgSlug as the AMQP username and an API key (vcp:connect scope) as the password.
Using the consume-telemetry.ts example:
import { connectVoke, type VokeAmqpCreds } from './examples/esm/amqp-connect';
interface CanonicalTelemetryPayload {
readingAt?: string; // present on MeterReadingPayload
gridPowerKw: number | null;
fvePowerKw: number | null;
batteryPowerKw: number | null;
consumptionPowerKw: number | null;
socPercent: number | null;
currentOperatingMode: string;
dataQuality: string;
devices?: Array<{ deviceId: string; assetType: string; powerKw: number }>;
}
async function startTelemetryConsumer(creds: VokeAmqpCreds) {
const { ch, outbound } = await connectVoke(creds);
const seen = new Set<string>(); // deduplicate on messageId
await ch.consume(outbound.telemetry, (msg) => {
if (!msg) return;
try {
const envelope = JSON.parse(msg.content.toString()) as {
version: '1.1';
messageId: string;
timestamp: string;
siteId: string;
payload: CanonicalTelemetryPayload;
};
// Deduplicate on messageId
if (seen.has(envelope.messageId)) {
ch.ack(msg);
return;
}
seen.add(envelope.messageId);
const { payload } = envelope;
const isMeterReading = 'readingAt' in payload && 'meters' in payload;
if (isMeterReading) {
console.log(`[${envelope.siteId}] meter reading`, payload);
} else {
console.log(`[${envelope.siteId}] real-time`, payload);
}
ch.ack(msg);
} catch {
// Malformed — dead-letter, do not requeue
ch.nack(msg, false, false);
}
});
console.log(`Consuming telemetry from ${outbound.telemetry}`);
}The messageId in the outer VCP envelope is a Voke-assigned UUIDv4. Use it for deduplication — on consumer reconnect, RabbitMQ may redeliver messages that were not acknowledged before the channel dropped.
Subscribe to one PLC
The default queue vcp.{slug}.event.telemetry is a firehose — Voke binds it with a # wildcard, so it carries every site you operate. When you run many PLCs and a worker only cares about one, you can subscribe per site instead of filtering client-side: the routing key ends with the site UUID (…telemetry.realtime.{siteId} / …telemetry.meter.{siteId}), and the vcp exchange is a topic exchange, so you bind a queue to whichever sites you want.
The per-key vhost is yours — you declare and own these extra queues; Voke neither creates nor lifecycle-manages them. Bind on the vcp topic exchange:
| Binding pattern | Receives |
|---|---|
{slug}.event.telemetry.realtime.# | realtime for all sites (what the default queue does) |
{slug}.event.telemetry.realtime.{siteId} | realtime for one PLC |
{slug}.event.telemetry.meter.{siteId} | 1-minute meter readings for one PLC |
two bindings …realtime.{A} + …realtime.{B} on one queue | a chosen subset |
Name any queue you declare with the vcp.{slug}. prefix (e.g. vcp.{slug}.site.{siteId}). The
broker authorizes queue operations by prefix — a queue whose name doesn't start with vcp.{slug}.
is refused. Routing keys you bind are prefix-gated the same way (they already start with {slug}.).
// Declare a private queue bound to ONE site's realtime + meter stream.
// `siteId` comes from GET /vcp/sites (map EAN → siteId once at boot).
// Queue name MUST start with `vcp.{slug}.` or the broker refuses it.
async function subscribeToSite(ch: amqplib.Channel, slug: string, siteId: string) {
const q = await ch.assertQueue(`vcp.${slug}.site.${siteId}`, { durable: true });
await ch.bindQueue(q.queue, 'vcp', `${slug}.event.telemetry.realtime.${siteId}`);
await ch.bindQueue(q.queue, 'vcp', `${slug}.event.telemetry.meter.${siteId}`);
await ch.consume(q.queue, (msg) => {
if (!msg) return;
const envelope = JSON.parse(msg.content.toString());
console.log(`[${siteId}]`, envelope.payload);
ch.ack(msg);
});
}siteId is the routing discriminator because it is stable and present on every message. The
EAN is intentionally not in the routing key — it is nullable, can change with a metering-point
contract, and is absent on realtime. Keep the EAN → siteId map (built once from
GET /vcp/sites) and bind siteIds; the EAN still rides in each meter payload
for your EAN-keyed accounting.
A site you bind by siteId that hasn't published yet simply delivers nothing until its PLC comes online — there's no error and no need to pre-register the binding with Voke.
Related pages
- Concepts / Signals — how plant signals are decoded from sub-device snapshots
- Concepts / Data retention — TimescaleDB aggregation windows and raw data lifetime
- VCP data model — full field reference for
CanonicalTelemetryandMeterReadingPayload - Commands — observe post-command effects by correlating
siteIdtimestamps in the telemetry stream