Create datapoint subscriptions

async AsyncCogniteClient.time_series.subscriptions.create(
subscription: DataPointSubscriptionWrite,
) DatapointSubscription

Create a subscription.

Create a subscription that can be used to listen for changes in data points for a set of time series.

Parameters:

subscription (DataPointSubscriptionWrite) – Subscription to create.

Returns:

Created subscription

Return type:

DatapointSubscription

Examples

Create a subscription with explicit time series IDs:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes import DataPointSubscriptionWrite
>>> client = CogniteClient()
>>> # async_client = AsyncCogniteClient()  # another option
>>> sub = DataPointSubscriptionWrite(
...     external_id="my_subscription",
...     name="My subscription",
...     partition_count=1,
...     time_series_ids=["myFistTimeSeries", "mySecondTimeSeries"],
... )
>>> created = client.time_series.subscriptions.create(sub)

Create a subscription with explicit time series IDs given as Node IDs either from CogniteTimeSeries or an extension of CogniteTimeseries:

>>> from cognite.client.data_classes import DataPointSubscriptionWrite
>>> from cognite.client.data_classes.data_modeling import NodeId
>>> sub = DataPointSubscriptionWrite(
...     external_id="my_subscription",
...     name="My subscription with Data Model Ids",
...     partition_count=1,
...     instance_ids=[
...         NodeId("my_space", "myFistTimeSeries"),
...         NodeId("my_space", "mySecondTimeSeries"),
...     ],
... )
>>> created = client.time_series.subscriptions.create(sub)

Create a filter defined subscription for all numeric time series that are stepwise:

>>> from cognite.client.data_classes import DataPointSubscriptionWrite
>>> from cognite.client.data_classes import filters as flt
>>> from cognite.client.data_classes.datapoints_subscriptions import (
...     DatapointSubscriptionProperty,
... )
>>> is_numeric_stepwise = flt.And(
...     flt.Equals(DatapointSubscriptionProperty.is_string, False),
...     flt.Equals(DatapointSubscriptionProperty.is_step, True),
... )
>>> sub = DataPointSubscriptionWrite(
...     external_id="my_subscription",
...     name="My subscription for numeric, stepwise time series",
...     partition_count=1,
...     filter=is_numeric_stepwise,
... )
>>> created = client.time_series.subscriptions.create(sub)