These are notes about the Streaming API while studying for the Integration Architect exam.
I find the Streaming API and event driven architecture and messaging systems fascinating. It catches my imagination. Event driven architecture revolves around the creation and publication of events and the subscription and consumption or reaction to those events.
The Streaming API
The Streaming API uses push technology to enable the streaming of events and a subscription mechanism for receiving those events in near real time. This subscription mechanism supports Platform Events, Change Data Capture Events, PushTopic Events and Generic Events.
The Streaming API in Salesforce uses CometD for long polling and implements the Bayeux protocol.
As a scalable HTTP-based event routing bus “CometD implements a web messaging system …. based on the publish/subscribe paradigm.” https://docs.cometd.org
And
“The primary purpose of Bayeux is to support responsive bidirectional interactions between web clients … and the web server.
Bayeux is a protocol for transporting asynchronous messages (primarily over HTTP), with low latency between a web server and a web client. The messages are routed via named channels and can be delivered:
* server to client
* client to server
* client to client (via the server)
By default, publish/subscribe routing semantics are applied to the channels.”
https://docs.cometd.org
Long Polling was developed to allow a client to poll a server requesting new information. The server holds the request open until new data is available. When the new data becomes available, the server responds to the client and sends that data. When the Client receives the information, it immediately sends another request and waits for the server to respond again. So it continues. Essentially the client requests from the server without expecting an immediate response.
The External Client needs to login to the Streaming Channel in Salesforce, verify that the handshake is successful and then make the subscribe request to the channel. Then the long polling connecting and reconnecting process commences. The reconnection must happen within 40 seconds or the subscription is expired. If it does expire then the external client must start again from the handshake.
Key Elements of Event-Driven Architecture
Event-Driven Architecture is extremely scalable and responsive.
- An event is a change of state In a Salesforce perspective, that can be a field update (here is the new committed value).
- What is published and subscribed to is the event message containing information about the event (there was a field update).
- Event Messages are published by the event producer/publisher/emitter on an event channel on an event bus (I published the fact there was a field update).
- And consumed by the event subscriber/listener (I listened to the channel and learned there was a field update).
- The event producer/publisher does not need to know anything about the event subscriber. The event bus decouples the producer/publisher from the subscriber/listener.
The event message has a header and a body. The event header will contain information such as the date/time of the event, the event type, and its name. The event body will contain information about the state change that triggered the message.
The messages are processed asynchronously and in near real-time. They are key for capturing events from devices, applications, etc (the Internet of Things).
In a Salesforce context, events can be used to trigger actions within the same org or externally. The Salesforce Enterprise Messaging Platform (EMP) utilising Platform Events and Change Data Capture on the Streaming API enables event-driven architecture within Salesforce.
Event Messages Durability and Reliability
The Streaming API stores PushTopic events, generic events and standard volume events for 24 hours and high-volume events such as Platform events and Changed Data Capture events for 72 hours.
Each event, irrespective of its durability, has a unique replay Id for that topic. The replay Id mechanism is part of the CometD extension provided by Salesforce. It allows for event messages to be replayed by a subscriber. This is particularly useful if the subscriber is offline for a period of time. This means that when a client disconnects they won’t miss messages, providing they reconnect within the 24 or 72-hour window (depending on the message type they have subscribed to). One key step to take to ensure message reliability is to use My Domain in the Streaming APIs endpoint.
Replaying Events
The ReplayId holds the value that the subscriber can determine to retrieve only new events or all within the retention window. The default is to receive only new events sent after subscribing and events outside the retention period are discarded.
- The default value is -1. The subscriber received events broadcast after subscription.
- The value -2 enables the Subscriber to receive all events that are within the retention window as well as new events.
Option -2 should not be used all the time. This is because a large number of event messages being stored can slow performance.
Errors and Error handling
Some common errors that occur can have a simple cause: Client authentication can become invalid due to a revoked OAuth taken or revoked Salesforce session. It can also happen if the client logs out. This is a 401 authentication error. Note that a Salesforce session never expires in a CometD client unless it is actually revoked. Salesforce extends the timeout interval frequently as long as the CometD client stays connected. Should a client become disconnected from a network disruption, for example, and the CometD servers timeout and delete the client state, apon the client’s attempt to reconnect, they could receive a 403 unknown client error. The client needs to be able to reconnect with a new handshake to subscribe to the channel. A full list of Streaming API Error Codes can be found here.
Enterprise Messaging Platform Events
Platform Events are event messages. They are similar to custom objects and are defined in a similar way. They are not queryable using SOQL or SOSL. They enable a publisher to send custom event data via code, flows or an API (note up until Salesforce retires Process Builder, they can also be created here). There are three types:
- Custom Platform Events
- Standard Platform Events
- High Volume Platform Events
Regardless of the type of Platform Event, it is not possible to amend the event message once it has been published.
Custom Platform Events:
These are defined in Setup, custom platform events definitions are named and provided with fields in a similar way to a custom object. They support a subset of field types (Checkbox, Date and Date/Time. Number, Text and Long Text Area).
Go to Integrations / Platform Events in setup and click on New to create a new platform event. Note that Custom Platform Events are high volume event types by default (The checkbox is checked by default, and greyed out from API version 45.0 onwards).
On this screen you can choose to Publish After Commit (the default), or Publish Immediately for the Publish Behaviour. Publish after commit ensures that the event message is only published after a transaction has been committed successfully. Publish immediately will be published as soon as the publish call executes and does not wait for the transaction to complete successfully. This is extremely useful for logging that an event has occurred regardless of whether the transaction was committed. It is possible to change the Publish Behaviour after creation.
Once you click save, the Platform Event you have just defined appears.
Select the field types that you wish to create, and create the custom fields and relationships.
When a Platform Event has been published, it cannot be updated or deleted. If however, a platform event definition is deleted, it is permanently deleted.
Standard Platform Events
Standard Platform Events are events with predefined fields. An example is the AssetTokenEvent for OAuth 2.0 authentication activity, ProcessExceptionEvent for payment processing errors, or BatchApexErrorEvent for errors encountered in batch apex jobs. See Standard Platform Event Object List for more examples.
High-Volume Platform Events
Advantages of High Volume Platform events include asynchronous publishing and separate event allocations. The asynchronous publishing assists with processing high numbers of event messages efficiently. Publishing calls are queued and published when resources allow. The separate event allocations vary per edition and are set out here: Platform Event Allocations.
How to subscribe to Platform Events:
There are different methods to subscribe to Platform Events for different purposes. For an external client, it is generally done using CometD (a scalable HTTP-based event routing bus that uses an AJAX push technology pattern known as CometD) or by Long Polling (which emulates an information push from a server to a client by keeping the connection active.)
- Apex Triggers: Triggers come with an auto subscription mechanism, there is no need to create a channel: Simply set up the trigger to listen for an after insert on the event object type. These triggers are different from standard or custom object triggers as the apex is not executed in the same transaction as the published event. The triggers can also be configured to batch events to a maximum of 2000 event messages at a time. There is therefore generally a delay from Publish to Action from the Trigger. Read more here.
The Trigger will listen to the exact event, eg:
trigger PlatformEventTrigger on NewPlatformEvent__e (after insert) {} - CometD: When an external Client wishes to subscribe to platform events, CometD is used by creating a CometD client or using the EMP Connector (GitHub info here). The channel name that the client needs to subscribe to is a case-sensitive channel: eg /event/NewPlatformEvent__c. The endpoint will be /cometd/53.0 (or latest API version).
- Debug Logs: In Environments / Logs / Debug Logs, User Tracer Flags or Debug Logs can be created to listen for a period of time to Automated Processes or Users, depending on which publishes the event. This helps with troubleshooting what is occurring.
- Flows: These can be launched or resumed when a platform event is received. Flows that are subscribed to platform event messages can listen for apex, API, or other flows or processes publishing events. Either the Start element or a Pause element in a flow can listen for Platform Events.
- Lightning Components: In Lightning Components it is possible to subscribe to platform events with the empApi component. It uses CometD to listen to the Streaming API. All streaming channels are supported (Platform Events, Change Data Capture, PushTopic, and Generic Events). Read more here. Interestingly, the empApi module is not supported in the mobile App, but is in mobile (with web worker or shared worker support) and desktop browsers. Information about Web Workers from Mozilla is here.
- Process Builder: At present, it is possible to subscribe through Process Builder. However, Process Builder is going to be retired within the next 6 months at the time of writing this, So I am not exploring deeper.
Once you have subscribers in place, you can view them from the subscriptions at the bottom of the platform event definition.
You can also locate what Triggers are listening:
From the Metadata API, it is also possible to obtain processes that subscribe to platform events from event subscriptions or run an SOQL query to find them.
One final note, in order to read or create a particular platform event, the permissions for the platform event object need to be granted and assigned. There is no permission to edit or delete a Platform Event.
Change Data Capture
Change Data Capture publishes event messages about change events on Salesforce records. Namely: creation, updates, deletion, and undeletion. The event messages are published in near real-time.
The header for each Change Data Capture Event has information about the object and what DML event occurred (eg, update, delete, etc).
The Change Data Capture event message is sent as JSON and includes the same fields on the associated parent object except for the IsDeleted and SystemModStamp system fields. Any field whose value isn’t on the record and is derived from another record or from a formula, except roll-up summary fields, which are included. Examples are formula fields. Examples of fields with derived values include LastActivityDate and PhotoUrl”. See Change Data Capture Change Event Message Structure.
Each event message also has a replayId used for retrieving those past events.
Note: When a record is created, the event message contains all fields – regardless of whether they contain data and system fields. When updated, only the changed fields will have data in them in the event message. Regardless of whether data exists in other fields, they will be empty in the event message as they have not changed. When a record is deleted – the body of the message does not include any fields or system fields. When a record is undeleted, the body will contain all non-empty fields from the record including system fields.
To turn on Change Data Capture Events, go to Integrations / Change Data Capture.
Any record changes on the selected objects will appear as change events in the default channel /data/ChangeEvents but also on a Single-Entity Channel. For example, on turning on Change Data Capture for Accounts, an AccountChangeEvent channel will publish those record changes. If it was a custom object, the Single-Entity Channel would be /data/CustomObject__ChangeEvent. It is possible to create a custom channel. More about Subscription channels here.
How to subscribe to Change Data Capture Events:
Apex triggers or CometD can be used to subscribe to change events.
- A trigger can only subscribe to an event rather than a channel, eg: trigger AccountCDCTrigger on AccountChangeEvent (after insert) {}
A trigger runs asynchronously after the transaction is completed. The characteristics for subscription are similar to Platform Events. More information on Subscribing with Apex Triggers is here. - CometD base apps (such as EMP Connector referenced above), or the empApi Lightning component can subscribe to the channels by using the channel endpoint. See the Subscription channel link above for further information.
To monitor Change Data Capture Events, query the PlatformEventUsageMetric. Usage data is available for at least 45 days. It is possible to query for Change Events that have been Published and Delivered as well as Platform Events that have been Published and Delivered.
Permissions for Change Data Capture Events
As Change Data Capture ignores sharing settings, to receive change events on a channel, the subscribed User must have ‘View All’ for the object that they are subscribing to, or ‘View All Data’ if it is a task or event (which do not have a view all permission) or the subscriber wants to view all entities on the channel. If information about users is needed – then the ‘View All Users’ permission is required.
Despite Change Data Capture ignoring sharing settings prior to publishing, it will check what the subscribers’ field-level security settings are and only deliver the field that a subscribed user can view.
I am not covering this in detail here, but I would like to flag that there are steps that need to be taken with an Event Bus when the Salesforce Records have fields encrypted with Shield Platform Encryption. Please see this section of the Change Data Capture Developer Guide.
PushTopics Events
These are used for subscriptions. PushTopic events enable notifications to be received for record changes (create, update, delete and undelete). They are typically used for refreshing the User Interface of a custom app from Salesforce record changes. The notifications are for the changes that match a SOQL query that has been defined for those records a user has access to based on sharing rules, Read Access to Objects or PushTopics and field-level security. The notifications can be limited to events that match a subscription filter.
PushTopic events can be received in and outside Salesforce. In Salesforce, it is generally pages; outside it can be in applications or clients subscribing to them.
Creating PushTopics can be done in WorkBench or in Developer Console.
The example I tried was to use Developer Console and the following:
pushTopic pushTopic = new PushTopic();
pushTopic.Name = ‘CasePushTopic’;
pushTopic.Query = ‘SELECT Id, Subject, Priority FROM Case’;
pushTopic.ApiVersion = 53.0;
pushTopic.NotifyForOperationCreate = true;
pushTopic.NotifyForOperationUpdate = true;
pushTopic.NotifyForOperationUndelete = true;
pushTopic.NotifyForOperationDelete = true;
pushTopic.NotifyForFields = ‘Referenced’;
insert pushTopic;
To set up a PushTopic use this reference guide. Note that the SELECT clause must include ID, and only one entity per query. Additionally, the object must be valid for the specified API version. When building the SOQL query for the PushTopic, keep an eye on supported and unsupported SOQL Statements. Some unsupported SOQL Statements include not having ID in the selected fields, semi-joins and anti-joins and Aggregate queries.
PushTopic based events for 24 hours and allows the retrieval of stored and new events. They can be subscribed to on the case-sensitive channel name starting with /topic/… such as /topic/CasePushTopic. As soon as a PushTopic record is created, the system starts evaluating record creations, updates, deletions, and undeletions for matches. A Bayeux Client can receive streamed events from this channel.
To monitor PushTopic Event and Generic Event Usage in the User Interface, use the Company Information page. Under Organisation Detail is a Field called Streaming API Events, Last 24 hours:
Through the REST API, PushTopic and Generic Event Usage can be queried using the REST API Endpoint: /vXX.X/limits/ (where the vXX.X is the API version you are on). See more here.
Generic Events
These are used for unstructured notifications. To create a Streaming Channel for Generic Events, select Streaming Channels from the App Launcher. Click New from the Tab List View.
Each Channel must start with /u/…
Streaming Channels are Public Read/Write by default.
These will no doubt need to be controlled. Profiles and Permission Sets can be applied to users to permit or restrict access to the Streaming Channel:
The REST API can be used to generate event notifications to channel Subscribers. See this entry in the Salesforce Developer documentation.
With Generic Events, a Client can only receive streaming events after subscription and while a Salesforce session is active. It cannot recall generic events that do not occur during that live subscription and session. Missed events can however be recalled for the last 24 hours through the retention window by using Durable Generic Streaming.
How to subscribe?
It is possible to use a Java Client with an EMP Connector to subscribe with username and password authentication or OAuth Bearer Token.
In summary:
- Platform Events can define a custom event schema with strongly typed fields, publish and subscribe via Apex and be published declaratively by Process Builder and Flows.
- Both Generic Events and Platform Events include user-defined payloads, they can publish via one or more APIs.
- Only PushTopics and Change Data Capture get the field-level security and receive auto-published event notifications on Salesforce record changes.
- All types of events can be subscribed to via CometD and replay retained event notifications.
- Only PushTopic events can filter subscriptions, get record-sharing support, and choose the fields to include in event notifications for Salesforce record changes.
- Only Change Data Capture and Platform Events will have encrypted field data with Shield Platform Encryption.