-
Notifications
You must be signed in to change notification settings - Fork 5
Streaming API
The Omnia Streaming API provides an interface for consuming applications to subscribe to timeseries streams for specified tags.
The API is a supplement to the Omnia Timeseries API. Customer need to use the Omnia Timeseries API to find the timeseries IDs they want to subscribe to, as well as the corresponding metadata.
In order to enable the data streaming, the users are required to register (through the API) a connection string (with write access) to a designated Azure Event Hub to which the streaming data will be delivered.
Then, by listening to this (which can be implemented without much effort in various popular programming languages, see below), the users will be able to continuously receive, process and react to new timeseries data.
Note that the streaming solely consists of live data, meaning that historical timeseries data will not be sent to the customer's stream destination Event Hub. For such a service, the user is referred to the Omnia Timeseries API, which allows for historical data queries.
- Customers will need a Timeseries.Read permission to create subscriptions. A client with the required permissions will be delivered by the Plant Team during on-boarding.
- Customers will need an Azure Event Hub to serve as their message broker for the incoming stream data. If the customer does not have access to an Azure subscription for hosting the Event Hub, this can be delivered by the Plant Team during on-boarding. With regards to the Event Hub configuration, the recommended settings is to use 1 throughput unit (TU), 32 partitions and having auto-scaling enabled.
- Customers will need an Azure Blob storage container for their listener application to store stream checkpoints. As for the Event Hub, this can be set up by the Plant Team during on-boarding if needed.
The stream destination is where the timeseries event data (that has been subscribed to) will be sent to. A single stream destination must be set for each customer. Currently, Azure Event Hubs is the only type of message broker supported.
The stream destination is set for the user's customer instance using the endpoint
POST /streaming/destinationwith request body
{
"connectionString": "Endpoint=sb://<FQDN>/;SharedAccessKeyName=<KeyName>;SharedAccessKey=<KeyValue>;EntityPath=<EventhubName>"
}A subscription is created using the endpoint
POST /streaming/subscriptionsThe timeseries IDs that should be subscribed to must be specified in the request body in the following manner:
In the Azure documentation, sample code for implementing Event Hub listeners is provided in several popular programming languages, including C# (.NET), Python and Java.
As described in the documentation, the user should provide an Azure Blob storage container for the receiver application to store checkpoints so that it continues "where it left off" when restarted.
The data will be delivered to the Event Hub in the following format:
{
"data": {
"items": [
{
"id": "f69ec89b-494b-4e07-9da2-7c89489e3346",
"datapoints": [
{
"time": "2018-11-30T10:00:00.000Z",
"value": 124.5,
"status": 192
},
{
"time": "2018-11-30T10:01:00.000Z",
"value": 124.6,
"status": 192
}
// ...
]
},
{
"id": "f69ec89b-494b-4e07-9da2-7c89489e3346",
"datapoints": [
{
"time": "2018-11-30T10:00:00.000Z",
"value": 224.5,
"status": 192
},
{
"time": "2018-11-30T10:01:00.000Z",
"value": 224.6,
"status": 192
},
// ...
]
},
// ...
]
},
"schema":"equinor/plant/schemas/ingest/datapoints_v2"
}In Python, the following event handler will retrieve and print the timeseries values included in the events received by the listener.
...
import pandas as pd
async def on_event(partition_context, event):
event_data = event.body_as_json()['data']['items']
print(f'New event from partition {partition_context.partition_id}')
for data_item in event_data:
tag_id = data_item['id']
datapoints = data_item['datapoints']
df = pd.DataFrame(datapoints)
print(f'Received data item from tag {tag_id}:')
print(df)
print()
await partition_context.update_checkpoint(event)Duplicates, originating for instance from the IMS exporter, may easily appear on the common ingestion hub and thus, in the end, end up in the user's listening service.
Depending on the application, the user should implement de-duplication functionality as a part of their listening application to handle this.
Azure Event Hubs requires the use of TLS at all times. This ensure the safety and security of data streamed to the user.
The time series subscriptions, which the users create for themselves using the Omnia Streaming API, are stored in a cloud-hosted database.
Every time new timeseries data is transmitted to the common ingestion hub, whether it be from Historians or from third-party data sources, a routing function will identify the customers with registered subscriptions to the timeseries in question. Based on that, it will forward the event to the customers' respective Event Hubs using the connection strings that are saved for each customer in a cryptographic key storage.
The following (clickable) architecture diagram provides an overview of the data flow:
There are three potential failure modes that would put the streaming to a halt. An important question is to what extent the streaming data that is missed out on during the downtime period will be sent to the user after service recovery, so that it is not lost.
Should the user's listening application (i.e. the service that pulls data from the user's destination Event Hub) fail, the retention policy of the Event Hub (1-7 days, as specified by customer when creating the Event Hub) ensures that historic streaming data from this period will be accessible to the listening application as soon as it restarts. This means that, even if the user's listener application was inactive at some given time with streaming activity, it will be able to "catch up" to the events it missed out on.
Azure Event Hubs has a built-in retry functionality to ensure that unsuccessful transmissions will not get lost. However, if the transmission of an event continues to fail, due to for instance the customer Event Hub being offline, or the connection string having expired, there is currently no mechanism for resending the data at later time.
If the routing function responsible for forwarding the events to customers fails, the retention policy of the common ingestion hub ensures that data from the last 7 days will be streamed to customers as soon as the routing function is running again.
[ { "id": "4ec07552-7334-47f8-ac9f-666e65304b1a" }, { "id": "ebe027bd-38d1-40ee-adeb-3e275b073d5d" }, // ... ]