Iterate over subscriptions data
- async AsyncCogniteClient.time_series.subscriptions.iterate_data(
- external_id: str,
- start: str | None = None,
- limit: int = 25,
- partition: int = 0,
- poll_timeout: int = 5,
- cursor: str | None = None,
- include_status: bool = False,
- ignore_bad_datapoints: bool = True,
- treat_uncertain_as_bad: bool = True,
Iterate over data from a given subscription.
Data can be ingested datapoints and time ranges where data is deleted. This endpoint will also return changes to the subscription itself, that is, if time series are added or removed from the subscription.
Warning
This endpoint will store updates from when the subscription was created, but updates older than 7 days may be discarded.
- Parameters:
external_id (str) – The external ID of the subscription.
start (str | None) – When to start the iteration. If set to None, the iteration will start from the beginning. The format is “N[timeunit]-ago”, where timeunit is w,d,h,m (week, day, hour, minute). For example, “12h-ago” will start the iteration from 12 hours ago. You can also set it to “now” to jump straight to the end. Defaults to None.
limit (int) – Approximate number of results to return across all partitions.
partition (int) – The partition to iterate over. Defaults to 0.
poll_timeout (int) – How many seconds to wait for new data, until an empty response is sent. Defaults to 5.
cursor (str | None) – Optional cursor to start iterating from.
include_status (bool) – Also return the status code, an integer, for each datapoint in the response.
ignore_bad_datapoints (bool) – Do not return bad datapoints. Default: True.
treat_uncertain_as_bad (bool) – Treat datapoints with uncertain status codes as bad. If false, treat datapoints with uncertain status codes as good. Default: True.
- Yields:
DatapointSubscriptionBatch – Changes to the subscription and data in the subscribed time series.
Examples
Iterate over changes to subscription timeseries since the beginning until there is no more data:
>>> from cognite.client import CogniteClient, AsyncCogniteClient >>> client = CogniteClient() >>> # async_client = AsyncCogniteClient() # another option >>> for batch in client.time_series.subscriptions.iterate_data("my_subscription"): ... # Changes to the subscription itself: ... print(f"Added {len(batch.subscription_changes.added)} timeseries") ... print(f"Removed {len(batch.subscription_changes.removed)} timeseries") ... print(f"Changed timeseries data in {len(batch.updates)} updates") ... # Changes to datapoints for time series in the subscription: ... for update in batch.updates: ... upserts.time_series # The time series the update belongs to ... upserts.upserts # The upserted datapoints, if any ... upserts.deletes # Ranges of deleted periods, if any ... if not batch.has_next: ... break
Iterate continuously over all changes to the subscription newer than 3 days:
>>> for batch in client.time_series.subscriptions.iterate_data( ... "my_subscription", "3d-ago" ... ): ... pass # do something